Commit bf38d26b authored by Wout De Nolf's avatar Wout De Nolf
Browse files

introduce the ConditionalActor (the previous solution does not work with loops)

parent b1566453
Pipeline #63625 passed with stages
in 1 minute and 4 seconds
......@@ -22,19 +22,8 @@ Conditional link with one condition
.. mermaid::
graph LR;
EwoksPythonActor-->DecodeRouterActor;
DecodeRouterActor-->NameMapperActor;
Conditional link with multiple conditions
.. mermaid::
graph LR;
EwoksPythonActor-->DecodeRouterActor1;
EwoksPythonActor-->DecodeRouterActor2;
DecodeRouterActor1-->JoinActor;
DecodeRouterActor2-->JoinActor;
JoinActor-->NameMapperActor;
EwoksPythonActor-->ConditionalActor;
ConditionalActor-->NameMapperActor;
On-Error conditional link
......@@ -71,5 +60,5 @@ These actors are not provided by *pypushflow*
* *EwoksythonActor*: like *PythonActor* but it passes node name and attributes to the next actor.
* *InputMergeActor*: like *Joinactor* (merges the input data dictionaries) triggers the downstream actors when all required input has been provided at least once. Only one non-required input passed.
* *DecodeRouterActor*: line *RouterActor* but it dereferences thet input hashes to get the values.
* *ConditionalActor*: triggers downstream actors when all conditions are fulfilled.
* *NameMapperActor*: before triggering the next task it applies filtering and name mapping to the input data.
import sys
import pprint
from typing import Optional, List
import numpy
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.JoinActor import JoinActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ErrorHandler import ErrorHandler
from pypushflow.AbstractActor import AbstractActor
from pypushflow.ThreadCounter import ThreadCounter
......@@ -21,8 +19,9 @@ from ewokscore.variable import value_from_transfer
from ewokscore.inittask import task_executable
from ewokscore.inittask import get_varinfo
from ewokscore.inittask import task_executable_info
from ewokscore.graph import CONDITIONS_ELSE_VALUE
from ewokscore.graph import TaskGraph
from ewokscore.node import get_node_label
from ewokscore.node import NodeIdType
# Scheme: task graph
# Workflow: instance of a task graph
......@@ -31,7 +30,7 @@ from ewokscore.node import get_node_label
# in subprocess (python's multiprocessing)
def varinfo_from_indata(inData) -> Optional[dict]:
def varinfo_from_indata(inData: dict) -> Optional[dict]:
if ppfrunscript.INFOKEY not in inData:
return None
varinfo = inData[ppfrunscript.INFOKEY].get("varinfo", None)
......@@ -75,11 +74,7 @@ class EwoksPythonActor(PythonActor):
kw["name"] = get_node_label(node_attrs, node_id=node_id)
super().__init__(**kw)
def trigger(self, inData):
"""
:param dict inData: output from the previous task
"""
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
def trigger(self, inData: dict):
infokey = ppfrunscript.INFOKEY
inData[infokey] = dict(inData[infokey])
inData[infokey]["node_id"] = self.node_id
......@@ -87,47 +82,57 @@ class EwoksPythonActor(PythonActor):
return super().trigger(inData)
class DecodeRouterActor(RouterActor):
"""For PPF methods, the conditions do not apply to the
output value (which is a dict) but to the values
of the dict.
"""
class ConditionalActor(AbstractActor):
"""Triggers downstream actors when conditions are fulfilled."""
def __init__(self, is_ppfmethod=False, **kw):
def __init__(
self,
conditions: dict,
all_conditions: dict,
conditions_else_value,
is_ppfmethod: bool = False,
**kw,
):
self.conditions = conditions
self.all_conditions = all_conditions
self.conditions_else_value = conditions_else_value
self.is_ppfmethod = is_ppfmethod
super().__init__(**kw)
def _extractValue(self, inData):
if self.is_ppfmethod:
value = inData[ppftasks.PPF_DICT_ARGUMENT]
else:
if self.itemName in inData:
value = inData[self.itemName]
else:
return CONDITIONS_ELSE_VALUE
def _conditions_fulfilled(self, inData: dict) -> bool:
if not self.conditions:
return True
varinfo = varinfo_from_indata(inData)
value = value_from_transfer(value, varinfo=varinfo)
compareDict = dict(inData)
if self.is_ppfmethod:
if self.itemName in value:
value = value[self.itemName]
ppfdict = compareDict.pop(ppftasks.PPF_DICT_ARGUMENT, None)
compareDict.update(value_from_transfer(ppfdict, varinfo=varinfo))
compareDict.pop(ppfrunscript.INFOKEY)
for varname, value in self.conditions.items():
if varname not in compareDict:
return False
invalue = value_from_transfer(compareDict[varname], varinfo=varinfo)
if value == self.conditions_else_value:
if (
invalue != self.conditions_else_value
and invalue in self.all_conditions[varname]
):
return False
else:
return CONDITIONS_ELSE_VALUE
if isinstance(value, numpy.ndarray):
value = value.item()
if value in self.dictValues:
return value
else:
return CONDITIONS_ELSE_VALUE
if invalue != value:
return False
return True
def trigger(self, inData):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
trigger = self._conditions_fulfilled(inData)
self.setFinished()
value = self._extractValue(inData)
actors = self.dictValues.get(value, list())
for actor in actors:
actor.trigger(inData)
if trigger:
for actor in self.listDownStreamActor:
actor.trigger(inData)
class NameMapperActor(AbstractActor):
......@@ -155,7 +160,7 @@ class NameMapperActor(AbstractActor):
if isinstance(actor, InputMergeActor):
actor._required_actor(self)
def trigger(self, inData):
def trigger(self, inData: dict):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
is_error = "WorkflowException" in inData
if is_error and not self.trigger_on_error:
......@@ -198,7 +203,7 @@ class InputMergeActor(AbstractActor):
if actor.required:
self.requiredInData[actor] = None
def trigger(self, inData, source=None):
def trigger(self, inData: dict, source=None):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
......@@ -233,7 +238,7 @@ class InputMergeActor(AbstractActor):
class EwoksWorkflow(Workflow):
def __init__(self, ewoksgraph, varinfo: Optional[dict] = None):
def __init__(self, ewoksgraph: TaskGraph, varinfo: Optional[dict] = None):
name = repr(ewoksgraph)
super().__init__(name)
......@@ -242,16 +247,13 @@ class EwoksWorkflow(Workflow):
if varinfo is None:
varinfo = dict()
self.startargs = {ppfrunscript.INFOKEY: {"varinfo": varinfo}}
self.graph_to_actors(ewoksgraph, varinfo)
self.graph_to_actors(ewoksgraph)
def _clean_workflow(self):
# task_name -> EwoksPythonActor
self._taskactors = dict()
self.listActorRef = list() # values of taskactors
# source_id -> condition_name -> DecodeRouterActor
self._routeractors = dict()
# source_id -> target_id -> NameMapperActor
self._sourceactors = dict()
......@@ -270,10 +272,9 @@ class EwoksWorkflow(Workflow):
def _actor_arguments(self):
return {"parent": self, "thread_counter": self._threadcounter}
def graph_to_actors(self, taskgraph, varinfo):
def graph_to_actors(self, taskgraph: TaskGraph):
self._clean_workflow()
self._create_task_actors(taskgraph)
self._create_router_actors(taskgraph)
self._compile_source_actors(taskgraph)
self._compile_target_actors(taskgraph)
self._connect_start_actor(taskgraph)
......@@ -284,21 +285,12 @@ class EwoksWorkflow(Workflow):
on_error |= isinstance(target_actor, ErrorHandler)
if on_error:
source_actor.connectOnError(target_actor, **kw)
msg = "\nPpf actor connection (on error):\n source ({}): {}\n ->\n target ({}): {}"
else:
source_actor.connect(target_actor, **kw)
msg = "\nPpf actor connection:\n source ({}): {}\n ->\n target ({}): {}"
if isinstance(target_actor, JoinActor):
target_actor.increaseNumberOfThreads()
msg = msg.format(
type(source_actor).__name__,
source_actor.name,
type(target_actor).__name__,
target_actor.name,
)
def _create_task_actors(self, taskgraph):
def _create_task_actors(self, taskgraph: TaskGraph):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
error_actor = self._error_actor
......@@ -322,53 +314,33 @@ class EwoksWorkflow(Workflow):
taskactors[node_id] = actor
self.addActorRef(actor)
def _create_router_actors(self, taskgraph):
"""Insert router actors (one per target and output name) behind
actors with conditions.
"""
# source_id -> condition_name -> DecodeRouterActor
routeractors = self._routeractors
# task_name -> EwoksPythonActor
taskactors = self._taskactors
for source_id in taskgraph.graph.nodes:
# We will get 1 router for each output variable
routers = routeractors[source_id] = dict()
source_actor = taskactors[source_id]
for target_id in taskgraph.successors(source_id):
link_attrs = taskgraph.graph[source_id][target_id]
conditions = link_attrs.get("conditions", list())
for item in conditions:
outname = item["source_output"]
outvalue = item["value"]
router = routers.get(outname)
if router is None:
router = self._create_router_actor(
source_actor,
source_id,
outname,
taskgraph,
)
routers[outname] = router
if outvalue not in router.listPort:
router.listPort.append(outvalue)
def _create_router_actor(self, source_actor, source_id, outname, taskgraph):
"""
:returns DecodeRouterActor:
"""
def _create_conditional_actor(
self,
source_actor,
source_id: NodeIdType,
target_id: NodeIdType,
taskgraph: TaskGraph,
conditions: dict,
all_conditions: dict,
conditions_else_value,
) -> ConditionalActor:
source_attrs = taskgraph.graph.nodes[source_id]
target_attrs = taskgraph.graph.nodes[target_id]
source_label = get_node_label(source_attrs, node_id=source_id)
routername = f"Route output {repr(outname)} of {source_label}"
router = DecodeRouterActor(
name=routername,
itemName=outname,
target_label = get_node_label(target_attrs, node_id=target_id)
name = f"Conditional actor between {source_label} and {target_label}"
actor = ConditionalActor(
conditions,
all_conditions,
conditions_else_value,
is_ppfmethod=is_ppfmethod(source_attrs),
name=name,
**self._actor_arguments,
)
self._connect_actors(source_actor, router)
return router
self._connect_actors(source_actor, actor)
return actor
def _compile_source_actors(self, taskgraph):
def _compile_source_actors(self, taskgraph: TaskGraph):
"""Compile a dictionary NameMapperActor instances for each link.
These actors will serve as the source actor of each link.
"""
......@@ -380,57 +352,46 @@ class EwoksWorkflow(Workflow):
actor = self._create_source_actor(taskgraph, source_id, target_id)
sourceactors[source_id][target_id] = actor
def _create_source_actor(self, taskgraph, source_id, target_id) -> NameMapperActor:
def _create_source_actor(
self, taskgraph: TaskGraph, source_id: NodeIdType, target_id: NodeIdType
) -> NameMapperActor:
# task_name -> EwoksPythonActor
taskactors = self._taskactors
# source_id -> condition_name -> DecodeRouterActor
routeractors = self._routeractors
link_attrs = taskgraph.graph[source_id][target_id]
conditions = link_attrs.get("conditions", list())
conditions = link_attrs.get("conditions", None)
on_error = link_attrs.get("on_error", False)
if on_error:
return self._create_source_on_error_actor(taskgraph, source_id, target_id)
# EwoksTaskActor
source_actor = taskactors[source_id]
if conditions:
conditions = {item["source_output"]: item["value"] for item in conditions}
# One router actor for each output name
routers = dict()
for outname in conditions:
routers[outname] = routeractors[source_id][outname]
# Merge routers into one single source actor
connectkw = dict()
nrouters = len(routers)
if nrouters == 0:
# EwoksTaskActor
source_actor = taskactors[source_id]
elif nrouters == 1:
# DecodeRouterActor
for outname, router_actor in routers.items():
value = conditions[outname]
source_actor = router_actor
connectkw["expectedValue"] = value
else:
# JoinActor
source_attrs = taskgraph.graph.nodes[source_id]
target_attrs = taskgraph.graph.nodes[target_id]
source_label = get_node_label(source_attrs, node_id=source_id)
target_label = get_node_label(target_attrs, node_id=target_id)
name = f"Join routers {source_label} -> {target_label}"
source_actor = JoinActor(name=name, **self._actor_arguments)
for outname, router_actor in routers.items():
value = conditions[outname]
self._connect_actors(router_actor, source_actor, expectedValue=value)
conditions = {c["source_output"]: c["value"] for c in conditions}
all_conditions = taskgraph.node_condition_values(source_id)
conditions_else_value = taskgraph.graph.nodes[source_id].get(
"conditions_else_value", None
)
# ConditionalActor
source_actor = self._create_conditional_actor(
source_actor,
source_id,
target_id,
taskgraph,
conditions,
all_conditions,
conditions_else_value,
)
# The final actor of this link does the name mapping
final_source = self._create_name_mapper(taskgraph, source_id, target_id)
self._connect_actors(source_actor, final_source, **connectkw)
self._connect_actors(source_actor, final_source)
return final_source
def _create_source_on_error_actor(
self, taskgraph, source_id, target_id
self, taskgraph: TaskGraph, source_id: NodeIdType, target_id: NodeIdType
) -> NameMapperActor:
# task_name -> EwoksPythonActor
taskactors = self._taskactors
......@@ -447,7 +408,9 @@ class EwoksWorkflow(Workflow):
return final_source
def _create_name_mapper(self, taskgraph, source_id, target_id) -> NameMapperActor:
def _create_name_mapper(
self, taskgraph: TaskGraph, source_id: NodeIdType, target_id: NodeIdType
) -> NameMapperActor:
link_attrs = taskgraph.graph[source_id][target_id]
map_all_data = link_attrs.get("map_all_data", False)
data_mapping = link_attrs.get("data_mapping", list())
......@@ -474,7 +437,7 @@ class EwoksWorkflow(Workflow):
**self._actor_arguments,
)
def _compile_target_actors(self, taskgraph):
def _compile_target_actors(self, taskgraph: TaskGraph):
"""Compile a dictionary of InputMergeActor actors for each node
with predecessors. The actors will serve as the destination of
each link.
......@@ -497,7 +460,7 @@ class EwoksWorkflow(Workflow):
self._connect_actors(targetactor, taskactors[target_id])
targetactors[target_id] = targetactor
def _connect_sources_to_targets(self, taskgraph):
def _connect_sources_to_targets(self, taskgraph: TaskGraph):
# source_id -> target_id -> NameMapperActor
sourceactors = self._sourceactors
# target_id -> EwoksPythonActor or InputMergeActor
......@@ -507,7 +470,7 @@ class EwoksWorkflow(Workflow):
target_actor = targetactors[target_id]
self._connect_actors(source_actor, target_actor)
def _connect_start_actor(self, taskgraph):
def _connect_start_actor(self, taskgraph: TaskGraph):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
# target_id -> EwoksPythonActor or InputMergeActor
......@@ -519,7 +482,7 @@ class EwoksWorkflow(Workflow):
target_actor = taskactors[target_id]
self._connect_actors(start_actor, target_actor)
def _connect_stop_actor(self, taskgraph):
def _connect_stop_actor(self, taskgraph: TaskGraph):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
stop_actor = self._stop_actor
......
......@@ -3,9 +3,9 @@ def run(b=None, **kwargs):
raise RuntimeError("Missing argument 'b'!")
b += 1
print(f"In AddB, b={b}")
if b == 5:
if b == 4:
print(f"b reached {b}!")
b_is_5 = True
b_is_4 = True
else:
b_is_5 = False
return {"b": b, "b_is_5": b_is_5}
b_is_4 = False
return {"b": b, "b_is_4": b_is_4}
import pytest
from ewoksppf import execute_graph
from ewokscore import Task
from ewokscore.utils import qualname
......@@ -19,11 +18,11 @@ class MyTask(Task, optional_input_names=["a", "b"], output_names=["a", "b"]):
def workflow():
myclass = qualname(MyTask)
nodes = [
{"id": "task1", "class": myclass},
{"id": "task2", "class": myclass},
{"id": "task3", "class": myclass},
{"id": "task4", "class": myclass},
{"id": "task5", "class": myclass},
{"id": "task1", "task_type": "class", "task_identifier": myclass},
{"id": "task2", "task_type": "class", "task_identifier": myclass},
{"id": "task3", "task_type": "class", "task_identifier": myclass},
{"id": "task4", "task_type": "class", "task_identifier": myclass},
{"id": "task5", "task_type": "class", "task_identifier": myclass},
]
links = [
{"source": "task1", "target": "task2", "map_all_data": True},
......@@ -43,7 +42,7 @@ def workflow():
"map_all_data": True,
"conditions": [
{"source_output": "a", "value": 6},
{"source_output": "b", "value": "other"},
{"source_output": "b", "value": None},
],
},
{"source": "task4", "target": "task2", "map_all_data": True},
......@@ -52,12 +51,11 @@ def workflow():
graph = {"links": links, "nodes": nodes}
expected_results = {"a": 10}
expected_results = {"a": 9, "b": 9}
return graph, expected_results
@pytest.mark.skip("TODO")
def test_ppf_end(ppf_log_config):
graph, expected = workflow()
result = execute_graph(graph)
......
......@@ -23,7 +23,12 @@ def workflow19():
],
"inputs_complete": True,
},
{"id": "task2", "task_type": "ppfmethod", "task_identifier": move_d_to_a},
{
"id": "task2",
"task_type": "ppfmethod",
"task_identifier": move_d_to_a,
"conditions_else_value": "__other__",
},
{
"id": "task3",
"task_type": "ppfmethod",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment