Commit 33c4a2a6 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

ewoks graph representation changes

parent 6ffb6b1e
Pipeline #54804 passed with stages
in 1 minute and 26 seconds
......@@ -124,7 +124,7 @@ class NameMapperActor(AbstractActor):
def __init__(
self,
namemap=None,
mapall=False,
map_all_data=False,
name="Name mapper",
trigger_on_error=False,
required=False,
......@@ -132,7 +132,7 @@ class NameMapperActor(AbstractActor):
):
super().__init__(name=name, **kw)
self.namemap = namemap
self.mapall = mapall
self.map_all_data = map_all_data
self.trigger_on_error = trigger_on_error
self.required = required
......@@ -150,7 +150,7 @@ class NameMapperActor(AbstractActor):
if not is_error:
# Map output names of this task to input
# names of the downstream task
if self.mapall:
if self.map_all_data:
newInData.update(inData)
for input_name, output_name in self.namemap.items():
newInData[input_name] = inData[output_name]
......@@ -316,8 +316,10 @@ class EwoksWorkflow(Workflow):
source_actor = taskactors[source_name]
for target_name in taskgraph.successors(source_name):
link_attrs = taskgraph.graph[source_name][target_name]
conditions = link_attrs.get("conditions", dict())
for outname, outvalue in conditions.items():
conditions = link_attrs.get("conditions", list())
for item in conditions:
outname = item["source_output"]
outvalue = item["value"]
router = routers.get(outname)
if router is None:
router = self._create_router_actor(
......@@ -366,12 +368,14 @@ class EwoksWorkflow(Workflow):
routeractors = self._routeractors
link_attrs = taskgraph.graph[source_name][target_name]
conditions = link_attrs.get("conditions", dict())
conditions = link_attrs.get("conditions", list())
on_error = link_attrs.get("on_error", False)
if on_error:
return self._create_source_on_error_actor(
taskgraph, source_name, target_name
)
if conditions:
conditions = {item["source_output"]: item["value"] for item in conditions}
# One router actor for each output name
routers = dict()
......@@ -426,8 +430,11 @@ class EwoksWorkflow(Workflow):
self, taskgraph, source_name, target_name
) -> NameMapperActor:
link_attrs = taskgraph.graph[source_name][target_name]
mapall = link_attrs.get("all_arguments", dict())
arguments = link_attrs.get("arguments", dict())
map_all_data = link_attrs.get("map_all_data", False)
data_mapping = link_attrs.get("data_mapping", list())
data_mapping = {
item["target_input"]: item["source_output"] for item in data_mapping
}
on_error = link_attrs.get("on_error", False)
required = taskgraph.link_is_required(source_name, target_name)
......@@ -439,8 +446,8 @@ class EwoksWorkflow(Workflow):
name = f"Name mapper <{source_name} - {target_name}>"
return NameMapperActor(
name=name,
namemap=dict(arguments),
mapall=mapall,
namemap=data_mapping,
map_all_data=map_all_data,
trigger_on_error=on_error,
required=required,
**self._actor_arguments,
......
......@@ -17,9 +17,24 @@ def run(**inputs):
info = inputs.pop(INFOKEY)
log = info.get("enable_logging")
varinfo = info["varinfo"]
task = instantiate_task(
info["node_attrs"], varinfo=varinfo, inputs=inputs, node_name=info["node_name"]
)
try:
task = instantiate_task(
info["node_attrs"],
varinfo=varinfo,
inputs=inputs,
node_name=info["node_name"],
)
except Exception as e:
if log:
logger.error(
"\nINSTANTIATE {}\n ATTRIBUTES: {}\n ERROR: {}".format(
info["node_name"],
info["node_attrs"],
e,
),
)
raise
try:
task.execute()
......
......@@ -26,22 +26,28 @@ def workflow():
{"id": "task5", "class": myclass},
]
links = [
{"source": "task1", "target": "task2", "all_arguments": True},
{"source": "task2", "target": "task3", "all_arguments": True},
{"source": "task1", "target": "task2", "map_all_data": True},
{"source": "task2", "target": "task3", "map_all_data": True},
{
"source": "task3",
"target": "task4",
"all_arguments": True,
"conditions": {"a": 3, "b": 3},
"map_all_data": True,
"conditions": [
{"source_output": "a", "value": 3},
{"source_output": "b", "value": 3},
],
},
{
"source": "task3",
"target": "task5",
"all_arguments": True,
"conditions": {"a": 6, "b": "other"},
"map_all_data": True,
"conditions": [
{"source_output": "a", "value": 6},
{"source_output": "b", "value": "other"},
],
},
{"source": "task4", "target": "task2", "all_arguments": True},
{"source": "task5", "target": "task2", "all_arguments": True},
{"source": "task4", "target": "task2", "map_all_data": True},
{"source": "task5", "target": "task2", "map_all_data": True},
]
graph = {"links": links, "nodes": nodes}
......
......@@ -6,7 +6,7 @@ def workflow1():
nodes = [
{
"id": "Python Actor Test",
"inputs": {"name": "myname"},
"default_inputs": [{"name": "name", "value": "myname"}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorTest.run",
},
......
......@@ -5,10 +5,11 @@ from ewokscore.tests.utils import assert_workflow_merged_result
def workflow10(inputs):
default_inputs = [{"name": name, "value": value} for name, value in inputs.items()]
nodes = [
{
"id": "addWithoutSleep",
"inputs": inputs,
"default_inputs": default_inputs,
"inputs_complete": True,
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAddWithoutSleep.run",
......@@ -24,13 +25,13 @@ def workflow10(inputs):
{
"source": "addWithoutSleep",
"target": "check",
"all_arguments": True,
"map_all_data": True,
},
{
"source": "check",
"target": "addWithoutSleep",
"conditions": {"doContinue": "true"},
"all_arguments": True,
"conditions": [{"source_output": "doContinue", "value": "true"}],
"map_all_data": True,
},
]
......
......@@ -19,9 +19,9 @@ def submodel11a():
]
links = [
{"source": "in11a", "target": "addtask2aa", "all_arguments": True},
{"source": "addtask2aa", "target": "addtask2ab", "all_arguments": True},
{"source": "addtask2ab", "target": "out11a", "all_arguments": True},
{"source": "in11a", "target": "addtask2aa", "map_all_data": True},
{"source": "addtask2aa", "target": "addtask2ab", "map_all_data": True},
{"source": "addtask2ab", "target": "out11a", "map_all_data": True},
]
graph = {
......@@ -51,24 +51,20 @@ def submodel11b():
]
links = [
{"source": "in11b", "target": "addtask2ba", "all_arguments": True},
{"source": "in11b", "target": "addtask2ba", "map_all_data": True},
{
"source": "addtask2ba",
"target": "submodel11a",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in11a",
},
"sub_target": "in11a",
"map_all_data": True,
},
{
"source": "submodel11a",
"sub_source": "out11a",
"target": "addtask2bb",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out11a",
},
"map_all_data": True,
},
{"source": "addtask2bb", "target": "out11b", "all_arguments": True},
{"source": "addtask2bb", "target": "out11b", "map_all_data": True},
]
graph = {
......@@ -84,7 +80,7 @@ def workflow11():
nodes = [
{
"id": "addtask1",
"inputs": {"value": 1},
"default_inputs": [{"name": "value", "value": 1}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -100,18 +96,14 @@ def workflow11():
{
"source": "addtask1",
"target": "submodel11b",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in11b",
},
"sub_target": "in11b",
"map_all_data": True,
},
{
"source": "submodel11b",
"sub_source": "out11b",
"target": "addtask3",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out11b",
},
"map_all_data": True,
},
]
......
......@@ -20,9 +20,9 @@ def submodel12():
]
links = [
{"source": "in", "target": "addtask2a", "all_arguments": True},
{"source": "addtask2a", "target": "addtask2b", "all_arguments": True},
{"source": "addtask2b", "target": "out", "all_arguments": True},
{"source": "in", "target": "addtask2a", "map_all_data": True},
{"source": "addtask2a", "target": "addtask2b", "map_all_data": True},
{"source": "addtask2b", "target": "out", "map_all_data": True},
]
graph = {
......@@ -38,7 +38,7 @@ def workflow12(startvalue, withsubmodel_startvalue):
nodes = [
{
"id": "addtask1",
"inputs": {"value": startvalue},
"default_inputs": [{"name": "value", "value": startvalue}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -54,19 +54,17 @@ def workflow12(startvalue, withsubmodel_startvalue):
{
"source": "addtask1",
"target": "submodel12",
"all_arguments": True,
"conditions": {"value": withsubmodel_startvalue + 1},
"sub_graph_nodes": {
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
"conditions": [
{"source_output": "value", "value": withsubmodel_startvalue + 1}
],
},
{
"source": "submodel12",
"sub_source": "out",
"target": "addtask2",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out",
},
"map_all_data": True,
},
]
......
......@@ -20,9 +20,9 @@ def submodel13():
]
links = [
{"source": "in", "target": "addtask2a", "all_arguments": True},
{"source": "addtask2a", "target": "addtask2b", "all_arguments": True},
{"source": "addtask2b", "target": "out", "all_arguments": True},
{"source": "in", "target": "addtask2a", "map_all_data": True},
{"source": "addtask2a", "target": "addtask2b", "map_all_data": True},
{"source": "addtask2b", "target": "out", "map_all_data": True},
]
graph = {
......@@ -38,7 +38,7 @@ def workflow13(startvalue, withlastnode_startvalue):
nodes = [
{
"id": "addtask1",
"inputs": {"value": startvalue},
"default_inputs": [{"name": "value", "value": startvalue}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -54,19 +54,17 @@ def workflow13(startvalue, withlastnode_startvalue):
{
"source": "addtask1",
"target": "submodel13",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
},
{
"source": "submodel13",
"sub_source": "out",
"target": "addtask2",
"all_arguments": True,
"conditions": {"value": withlastnode_startvalue + 3},
"sub_graph_nodes": {
"sub_source": "out",
},
"map_all_data": True,
"conditions": [
{"source_output": "value", "value": withlastnode_startvalue + 3}
],
},
]
......
......@@ -19,9 +19,9 @@ def submodel14a():
]
links = [
{"source": "In", "target": "addtask2aa", "all_arguments": True},
{"source": "addtask2aa", "target": "addtask2ab", "all_arguments": True},
{"source": "addtask2ab", "target": "Out", "all_arguments": True},
{"source": "In", "target": "addtask2aa", "map_all_data": True},
{"source": "addtask2aa", "target": "addtask2ab", "map_all_data": True},
{"source": "addtask2ab", "target": "Out", "map_all_data": True},
]
graph = {
......@@ -44,18 +44,14 @@ def submodel14b():
{
"source": "In",
"target": "submodel14a",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "In",
},
"sub_target": "In",
"map_all_data": True,
},
{
"source": "submodel14a",
"sub_source": "Out",
"target": "Out",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "Out",
},
"map_all_data": True,
},
]
......@@ -72,7 +68,7 @@ def workflow14():
nodes = [
{
"id": "addtask1",
"inputs": {"value": 1},
"default_inputs": [{"name": "value", "value": 1}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -88,14 +84,14 @@ def workflow14():
{
"source": "addtask1",
"target": "submodel14b",
"all_arguments": True,
"sub_graph_nodes": {"sub_target": "In"},
"sub_target": "In",
"map_all_data": True,
},
{
"source": "submodel14b",
"sub_source": "Out",
"target": "addtask3",
"all_arguments": True,
"sub_graph_nodes": {"sub_source": "Out"},
"map_all_data": True,
},
]
......
......@@ -19,9 +19,9 @@ def submodel15(name):
]
links = [
{"source": "in", "target": "addtask1", "all_arguments": True},
{"source": "addtask1", "target": "addtask2", "all_arguments": True},
{"source": "addtask2", "target": "out", "all_arguments": True},
{"source": "in", "target": "addtask1", "map_all_data": True},
{"source": "addtask1", "target": "addtask2", "map_all_data": True},
{"source": "addtask2", "target": "out", "map_all_data": True},
]
graph = {
......@@ -37,7 +37,7 @@ def workflow15():
nodes = [
{
"id": "addtask1",
"inputs": {"value": 1},
"default_inputs": [{"name": "value", "value": 1}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -62,27 +62,21 @@ def workflow15():
{
"source": "addtask1",
"target": "submodel15a",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
},
{
"source": "submodel15a",
"sub_source": "out",
"target": "submodel15b",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out",
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
},
{
"source": "submodel15b",
"sub_source": "out",
"target": "addtask2",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out",
},
"map_all_data": True,
},
]
......
......@@ -19,9 +19,9 @@ def submodel16a():
]
links = [
{"source": "in", "target": "addtask1", "all_arguments": True},
{"source": "addtask1", "target": "addtask2", "all_arguments": True},
{"source": "addtask2", "target": "out", "all_arguments": True},
{"source": "in", "target": "addtask1", "map_all_data": True},
{"source": "addtask1", "target": "addtask2", "map_all_data": True},
{"source": "addtask2", "target": "out", "map_all_data": True},
]
graph = {
......@@ -51,24 +51,20 @@ def submodel16b():
]
links = [
{"source": "in", "target": "addtask1", "all_arguments": True},
{"source": "in", "target": "addtask1", "map_all_data": True},
{
"source": "addtask1",
"target": "submodel16a",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
},
{
"source": "submodel16a",
"sub_source": "out",
"target": "addtask2",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": "out",
},
"map_all_data": True,
},
{"source": "addtask2", "target": "out", "all_arguments": True},
{"source": "addtask2", "target": "out", "map_all_data": True},
]
graph = {
......@@ -84,7 +80,7 @@ def workflow16():
nodes = [
{
"id": "addtask1",
"inputs": {"value": 1},
"default_inputs": [{"name": "value", "value": 1}],
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAdd.run",
},
......@@ -100,18 +96,14 @@ def workflow16():
{
"source": "addtask1",
"target": "submodel16b",
"all_arguments": True,
"sub_graph_nodes": {
"sub_target": "in",
},
"sub_target": "in",
"map_all_data": True,
},
{
"source": "submodel16b",
"sub_source": ("submodel16a", "out"),
"target": "addtask2",
"all_arguments": True,
"sub_graph_nodes": {
"sub_source": ("submodel16a", "out"),
},
"map_all_data": True,
},
]
......
......@@ -16,7 +16,7 @@ def workflow17(doloop=True):
"id": "task1",
"task_type": "ppfmethod",
"task_identifier": ppfmethod,
"inputs": {"value": 0},
"default_inputs": [{"name": "value", "value": 0}],
},
{"id": "task2", "task_type": "ppfmethod", "task_identifier": ppfmethod},
{"id": "task3", "task_type": "ppfmethod", "task_identifier": ppfmethod},
......@@ -24,16 +24,16 @@ def workflow17(doloop=True):
{"id": "task5", "task_type": "ppfmethod", "task_identifier": ppfmethod},
]
links = [
{"source": "task1", "target": "task2", "all_arguments": True},
{"source": "task2", "target": "task3", "all_arguments": True},
{"source": "task1", "target": "task2", "map_all_data": True},
{"source": "task2", "target": "task3", "map_all_data": True},
{
"source": "task3",
"target": "task4",
"all_arguments": True,
"conditions": {"value": condition},
"map_all_data": True,
"conditions": [{"source_output": "value", "value": condition}],
},
{"source": "task4", "target": "task5", "all_arguments": True},
{"source": "task5", "target": "task2", "all_arguments": True},
{"source": "task4", "target": "task5", "map_all_data": True},
{"source": "task5", "target": "task2", "map_all_data": True},
]
graph = {
"graph": {"name": "workflow17"},
......
......@@ -10,25 +10,25 @@ def workflow18(dotask4=True):
"id": "task1",
"task_type": "ppfmethod",
"task_identifier": ppfmethod,
"inputs": {"value": 0},
"default_inputs": [{"name": "value", "value": 0}],
},
{
"id": "task2",
"task_type": "ppfmethod",
"task_identifier": ppfmethod,
"inputs": {"value": 10},
"default_inputs": [{"name": "value", "value": 10}],