Skip to content
Snippets Groups Projects
Commit 0b87bd50 authored by payno's avatar payno
Browse files

Merge branch 'ewoks_events' into 'main'

Ewoks events

See merge request !51
parents 9315d3c9 7bf45a2d
No related branches found
No related tags found
1 merge request!51Ewoks events
Pipeline #69165 passed
import sys
import pprint
from typing import Optional, List
import logging
from contextlib import contextmanager
from typing import Iterable, Optional, List
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
......@@ -15,15 +17,16 @@ from pypushflow.persistence import register_actorinfo_filter
from . import ppfrunscript
from ewokscore import load_graph
from ewokscore import ppftasks
from ewokscore import execute_graph_decorator
from ewokscore.variable import value_from_transfer
from ewokscore.inittask import task_executable
from ewokscore.inittask import get_varinfo
from ewokscore.inittask import task_executable_info
from ewokscore.graph import TaskGraph
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.node import node_id_as_string
from ewokscore.node import NodeIdType
from ewokscore.node import get_varinfo
from ewokscore import events
def ppfname(node_id: NodeIdType) -> str:
......@@ -38,8 +41,8 @@ def varinfo_from_indata(inData: dict) -> Optional[dict]:
return get_varinfo(node_attrs, varinfo=varinfo)
def is_ppfmethod(node_attrs: dict) -> bool:
task_type, _ = task_executable_info(node_attrs)
def is_ppfmethod(node_id: NodeIdType, node_attrs: dict) -> bool:
task_type, _ = task_executable_info(node_id, node_attrs)
return task_type in ("ppfmethod", "ppfport")
......@@ -252,16 +255,18 @@ class InputMergeActor(AbstractActor):
class EwoksWorkflow(Workflow):
def __init__(self, ewoksgraph: TaskGraph, varinfo: Optional[dict] = None):
def __init__(
self,
ewoksgraph: TaskGraph,
):
name = repr(ewoksgraph)
super().__init__(name)
super().__init__(name, level=logging.WARNING)
# When triggering a task, the output dict of the previous task
# is merged with the input dict of the current task.
if varinfo is None:
varinfo = dict()
self.startargs = {ppfrunscript.INFOKEY: {"varinfo": varinfo}}
self.startargs = {ppfrunscript.INFOKEY: {"varinfo": None, "execinfo": None}}
self.graph_to_actors(ewoksgraph)
self.__ewoksgraph = ewoksgraph
def _clean_workflow(self):
# task_name -> EwoksPythonActor
......@@ -311,7 +316,7 @@ class EwoksWorkflow(Workflow):
imported = set()
for node_id, node_attrs in taskgraph.graph.nodes.items():
# Pre-import to speedup execution
name, importfunc = task_executable(node_attrs, node_id=node_id)
name, importfunc = task_executable(node_id, node_attrs)
if name not in imported:
imported.add(name)
if importfunc:
......@@ -339,7 +344,7 @@ class EwoksWorkflow(Workflow):
all_conditions: dict,
conditions_else_value,
) -> ConditionalActor:
source_is_ppfmethod = is_ppfmethod(taskgraph.graph.nodes[source_id])
source_is_ppfmethod = is_ppfmethod(source_id, taskgraph.graph.nodes[source_id])
source_label = ppfname(source_id)
target_label = ppfname(target_id)
name = f"Conditional actor between '{source_label}' and '{target_label}'"
......@@ -501,6 +506,20 @@ class EwoksWorkflow(Workflow):
source_actor = taskactors[source_id]
self._connect_actors(source_actor, stop_actor)
@contextmanager
def _run_context(
self,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
**pool_options,
) -> Iterable[Optional[dict]]:
self.startargs[ppfrunscript.INFOKEY]["varinfo"] = varinfo
graph = self.__ewoksgraph.graph
with events.workflow_context(execinfo, workflow=graph) as execinfo:
self.startargs[ppfrunscript.INFOKEY]["execinfo"] = execinfo
with super()._run_context(**pool_options):
yield execinfo
def run(
self,
startargs: Optional[dict] = None,
......@@ -508,12 +527,13 @@ class EwoksWorkflow(Workflow):
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
timeout: Optional[float] = None,
**pool_options,
**execute_options,
):
with self._run_context(**pool_options):
with self._run_context(**execute_options) as execinfo:
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
self._start_actor.trigger(startindata)
self._stop_actor.join(timeout=timeout)
result = self._stop_actor.outData
......@@ -522,15 +542,21 @@ class EwoksWorkflow(Workflow):
info = result.pop(ppfrunscript.INFOKEY, dict())
result = self.__parse_result(result)
ex = result.get("WorkflowException")
if ex is not None:
if not ex["errorMessage"]:
node_id = info.get("node_id")
ex["errorMessage"] = f"Task {node_id} failed"
execinfo["error"] = True
execinfo["error_message"] = ex["errorMessage"]
if isinstance(ex["traceBack"], str):
execinfo["error_traceback"] = ex["traceBack"]
else:
execinfo["error_traceback"] = "".join(ex["traceBack"])
if ex is None or not raise_on_error:
return result
else:
print("\n".join(ex["traceBack"]), file=sys.stderr)
node_id = info.get("node_id")
err_msg = f"Task {node_id} failed"
if ex["errorMessage"]:
err_msg += " ({})".format(ex["errorMessage"])
raise RuntimeError(err_msg)
raise RuntimeError(ex["errorMessage"])
def __parse_result(self, result) -> dict:
varinfo = varinfo_from_indata(self.startargs)
......@@ -540,19 +566,15 @@ class EwoksWorkflow(Workflow):
}
@execute_graph_decorator(binding="ppf")
def execute_graph(
graph,
inputs: Optional[List[dict]] = None,
startargs: Optional[dict] = None,
varinfo: Optional[dict] = None,
timeout: Optional[float] = None,
load_options: Optional[dict] = None,
**execute_options,
):
if load_options is None:
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if inputs:
graph_io.update_default_inputs(ewoksgraph.graph, inputs)
ppfgraph = EwoksWorkflow(ewoksgraph, varinfo=varinfo)
return ppfgraph.run(startargs=startargs, timeout=timeout, **execute_options)
ewoksgraph = load_graph(graph, inputs=inputs, **load_options)
ppfgraph = EwoksWorkflow(ewoksgraph)
return ppfgraph.run(**execute_options)
......@@ -12,12 +12,14 @@ def run(**inputs):
"""
info = inputs.pop(INFOKEY)
varinfo = info["varinfo"]
execinfo = info["execinfo"]
task = instantiate_task(
info["node_id"],
info["node_attrs"],
varinfo=varinfo,
inputs=inputs,
node_id=info["node_id"],
varinfo=varinfo,
execinfo=execinfo,
)
task.execute()
......
......@@ -9,7 +9,5 @@ test_ppf_actors.SLEEP_TIME = 0
def ppf_log_config():
DEFAULT_DB_TYPE = persistence.DEFAULT_DB_TYPE
persistence.DEFAULT_DB_TYPE = "memory"
yield
persistence.DEFAULT_DB_TYPE = DEFAULT_DB_TYPE
......@@ -49,7 +49,7 @@ def workflow():
{"source": "task5", "target": "task2", "map_all_data": True},
]
graph = {"links": links, "nodes": nodes}
graph = {"graph": {"id": "test_graph"}, "links": links, "nodes": nodes}
expected_results = {"a": 9, "b": 9}
......
......@@ -12,6 +12,7 @@ def workflow20():
links = [{"source": "task1", "target": "task2", "map_all_data": True}]
graph = {
"graph": {"id": "workflow20"},
"links": links,
"nodes": nodes,
}
......
......@@ -75,7 +75,7 @@ def workflow():
},
]
graph = {"links": links, "nodes": nodes}
graph = {"graph": {"id": "workflow22"}, "links": links, "nodes": nodes}
expected_results = {"a": 6, "a_is_5": False, "b": 3, "b_is_4": False}
......
from ewoksppf import execute_graph
from ewokscore.tests.test_workflow_events import fetch_events
from ewokscore.tests.test_workflow_events import run_succesfull_workfow
from ewokscore.tests.test_workflow_events import run_failed_workfow
from ewokscore.tests.test_workflow_events import assert_succesfull_workfow_events
from ewokscore.tests.test_workflow_events import assert_failed_workfow_events
def test_succesfull_workfow(tmpdir):
# TODO: pypushflow does not work will asynchronous handlers because
# a worker could die before all queued events have been processed.
uri = run_succesfull_workfow(
tmpdir, execute_graph, execinfo={"asynchronous": False}
)
events = fetch_events(uri, 10)
assert_succesfull_workfow_events(events)
def test_failed_workfow(tmpdir):
uri = run_failed_workfow(tmpdir, execute_graph, execinfo={"asynchronous": False})
events = fetch_events(uri, 8)
assert_failed_workfow_events(events)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment