Skip to content
Snippets Groups Projects
Commit 1cc39713 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

fix test_ppf_workflow21 and add failing test_ppf_workflow23

parent c3b371db
No related branches found
No related tags found
1 merge request!46Resolve "Fix bug with on_error links"
Pipeline #65994 passed
......@@ -158,7 +158,7 @@ class NameMapperActor(AbstractActor):
def connect(self, actor):
super().connect(actor)
if isinstance(actor, InputMergeActor):
actor._required_actor(self)
actor.require_input_from_actor(self)
def trigger(self, inData: dict):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
......@@ -199,7 +199,7 @@ class InputMergeActor(AbstractActor):
self.requiredInData = dict()
self.nonrequiredInData = dict()
def _required_actor(self, actor):
def require_input_from_actor(self, actor):
if actor.required:
self.requiredInData[actor] = None
......@@ -217,7 +217,7 @@ class InputMergeActor(AbstractActor):
missing = {k: v for k, v in self.requiredInData.items() if v is None}
if missing:
self.logger.info(
"not triggering downstream actors (missing inputs %s)",
"not triggering downstream actors because missing inputs from actors %s",
[actor.name for actor in missing],
)
return
......@@ -309,7 +309,7 @@ class EwoksWorkflow(Workflow):
script=ppfrunscript.__name__ + ".dummy",
**self._actor_arguments,
)
if not taskgraph.has_successors(node_id, on_error=True):
if not taskgraph.has_successors(node_id, link_has_on_error=True):
self._connect_actors(actor, error_actor)
taskactors[node_id] = actor
self.addActorRef(actor)
......@@ -419,7 +419,6 @@ class EwoksWorkflow(Workflow):
}
on_error = link_attrs.get("on_error", False)
required = taskgraph.link_is_required(source_id, target_id)
source_attrs = taskgraph.graph.nodes[source_id]
target_attrs = taskgraph.graph.nodes[target_id]
source_label = get_node_label(source_attrs, node_id=source_id)
......@@ -497,27 +496,29 @@ class EwoksWorkflow(Workflow):
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
timeout: Optional[float] = None,
shared_pool: bool = False,
):
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
self._start_actor.trigger(startindata)
self._stop_actor.join(timeout=timeout)
result = self._stop_actor.outData
if result is None:
return None
info = result.pop(ppfrunscript.INFOKEY, dict())
result = self.__parse_result(result)
ex = result.get("WorkflowException")
if ex is None or not raise_on_error:
return result
else:
print("\n".join(ex["traceBack"]), file=sys.stderr)
node_id = info.get("node_id")
err_msg = f"Task {node_id} failed"
if ex["errorMessage"]:
err_msg += " ({})".format(ex["errorMessage"])
raise RuntimeError(err_msg)
with self._run_context(shared_pool=shared_pool):
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
self._start_actor.trigger(startindata)
self._stop_actor.join(timeout=timeout)
result = self._stop_actor.outData
if result is None:
return None
info = result.pop(ppfrunscript.INFOKEY, dict())
result = self.__parse_result(result)
ex = result.get("WorkflowException")
if ex is None or not raise_on_error:
return result
else:
print("\n".join(ex["traceBack"]), file=sys.stderr)
node_id = info.get("node_id")
err_msg = f"Task {node_id} failed"
if ex["errorMessage"]:
err_msg += " ({})".format(ex["errorMessage"])
raise RuntimeError(err_msg)
def __parse_result(self, result) -> dict:
varinfo = varinfo_from_indata(self.startargs)
......
......@@ -158,8 +158,6 @@ ARG_FAILURE = {"inputs": {"a": 0}, "return_value": 2}
@pytest.mark.parametrize("persist", [True, False])
def test_workflow21(args, on_error, persist, ppf_log_config, tmpdir):
"""Test conditions in output nodes"""
if args == ARG_FAILURE and on_error:
pytest.skip("Bug to fix for `on_error`")
if persist:
varinfo = {"root_uri": str(tmpdir)}
else:
......
import pytest
from ewoksppf import execute_graph
from ewokscore.utils import qualname
def raise_not_greater_than(**kwargs):
name = kwargs.get("variable", None)
if name in kwargs:
assert kwargs[name] > kwargs["limit"]
return kwargs
def work(**kwargs):
tasks = kwargs["tasks"]
groups = kwargs["groups"]
new_tasks = list()
for name in ["a", "b", "c"]:
if kwargs.get(name) and name not in tasks:
tasks.add(name)
new_tasks.append(name)
groups.add(tuple(new_tasks))
return kwargs
def passthrough(**kwargs):
return kwargs
def workflow():
nodes = [
{
"id": "in",
"task_type": "ppfmethod",
"task_identifier": qualname(passthrough),
},
{
"id": "out",
"task_type": "ppfmethod",
"task_identifier": qualname(passthrough),
},
{
"id": "gt",
"task_type": "ppfmethod",
"task_identifier": qualname(raise_not_greater_than),
"default_inputs": [{"name": "variable", "value": "value"}],
},
{
"id": "worka",
"task_type": "ppfmethod",
"task_identifier": qualname(work),
"default_inputs": [{"name": "a", "value": True}],
},
{
"id": "workb",
"task_type": "ppfmethod",
"task_identifier": qualname(work),
"default_inputs": [{"name": "b", "value": True}],
},
{
"id": "workc",
"task_type": "ppfmethod",
"task_identifier": qualname(work),
"default_inputs": [{"name": "c", "value": True}],
},
]
links = [
{"source": "in", "target": "gt", "map_all_data": True},
{"source": "gt", "target": "worka", "map_all_data": True},
{"source": "gt", "target": "workb", "map_all_data": True},
{"source": "gt", "target": "workc", "map_all_data": True, "on_error": True},
{"source": "worka", "target": "out", "map_all_data": True},
{"source": "workb", "target": "out", "map_all_data": True},
{"source": "workc", "target": "out", "map_all_data": True},
]
graph = {"links": links, "nodes": nodes}
return graph
@pytest.mark.skip("Conditional branches that merge again are not handled yet")
@pytest.mark.parametrize("on_error", [True, False])
def test_ppf_workflow23(on_error, ppf_log_config):
"""Test error conditions."""
graph = workflow()
if on_error:
expected = {"tasks": {"c"}, "groups": ({"c"},)}
else:
expected = {"tasks": {"a", "b"}, "groups": ({"a", "b"},)}
inputs = [
{"name": "limit", "value": 10},
{"name": "value", "value": 0 if on_error else 20},
{"name": "tasks", "value": set()},
{"name": "groups", "value": set()},
]
result = execute_graph(graph, inputs=inputs)
for k, v in expected.items():
assert result["_ppfdict"][k] == v, k
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment