Skip to content
Snippets Groups Projects
Commit 59e77f80 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

emit workflow events

parent 84ade572
No related branches found
No related tags found
1 merge request!51Ewoks events
import sys
import pprint
from typing import Optional, List
import logging
from contextlib import contextmanager
from typing import Iterable, Optional, List
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
......@@ -15,15 +17,16 @@ from pypushflow.persistence import register_actorinfo_filter
from . import ppfrunscript
from ewokscore import load_graph
from ewokscore import ppftasks
from ewokscore import execute_graph_decorator
from ewokscore.variable import value_from_transfer
from ewokscore.inittask import task_executable
from ewokscore.inittask import get_varinfo
from ewokscore.inittask import task_executable_info
from ewokscore.graph import TaskGraph
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.node import node_id_as_string
from ewokscore.node import NodeIdType
from ewokscore.node import get_varinfo
from ewokscore import events
def ppfname(node_id: NodeIdType) -> str:
......@@ -252,16 +255,18 @@ class InputMergeActor(AbstractActor):
class EwoksWorkflow(Workflow):
def __init__(self, ewoksgraph: TaskGraph, varinfo: Optional[dict] = None):
def __init__(
self,
ewoksgraph: TaskGraph,
):
name = repr(ewoksgraph)
super().__init__(name)
super().__init__(name, level=logging.WARNING)
# When triggering a task, the output dict of the previous task
# is merged with the input dict of the current task.
if varinfo is None:
varinfo = dict()
self.startargs = {ppfrunscript.INFOKEY: {"varinfo": varinfo}}
self.startargs = {ppfrunscript.INFOKEY: {"varinfo": None, "execinfo": None}}
self.graph_to_actors(ewoksgraph)
self.__ewoksgraph = ewoksgraph
def _clean_workflow(self):
# task_name -> EwoksPythonActor
......@@ -501,6 +506,20 @@ class EwoksWorkflow(Workflow):
source_actor = taskactors[source_id]
self._connect_actors(source_actor, stop_actor)
@contextmanager
def _run_context(
self,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
**pool_options,
) -> Iterable[Optional[dict]]:
self.startargs[ppfrunscript.INFOKEY]["varinfo"] = varinfo
graph = self.__ewoksgraph.graph
with events.workflow_context(execinfo, workflow=graph) as execinfo:
self.startargs[ppfrunscript.INFOKEY]["execinfo"] = execinfo
with super()._run_context(**pool_options):
yield execinfo
def run(
self,
startargs: Optional[dict] = None,
......@@ -508,12 +527,13 @@ class EwoksWorkflow(Workflow):
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
timeout: Optional[float] = None,
**pool_options,
**execute_options,
):
with self._run_context(**pool_options):
with self._run_context(**execute_options) as execinfo:
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
self._start_actor.trigger(startindata)
self._stop_actor.join(timeout=timeout)
result = self._stop_actor.outData
......@@ -522,15 +542,21 @@ class EwoksWorkflow(Workflow):
info = result.pop(ppfrunscript.INFOKEY, dict())
result = self.__parse_result(result)
ex = result.get("WorkflowException")
if ex is not None:
if not ex["errorMessage"]:
node_id = info.get("node_id")
ex["errorMessage"] = f"Task {node_id} failed"
execinfo["error"] = True
execinfo["error_message"] = ex["errorMessage"]
if isinstance(ex["traceBack"], str):
execinfo["error_traceback"] = ex["traceBack"]
else:
execinfo["error_traceback"] = "".join(ex["traceBack"])
if ex is None or not raise_on_error:
return result
else:
print("\n".join(ex["traceBack"]), file=sys.stderr)
node_id = info.get("node_id")
err_msg = f"Task {node_id} failed"
if ex["errorMessage"]:
err_msg += " ({})".format(ex["errorMessage"])
raise RuntimeError(err_msg)
raise RuntimeError(ex["errorMessage"])
def __parse_result(self, result) -> dict:
varinfo = varinfo_from_indata(self.startargs)
......@@ -540,19 +566,15 @@ class EwoksWorkflow(Workflow):
}
@execute_graph_decorator(binding="ppf")
def execute_graph(
graph,
inputs: Optional[List[dict]] = None,
startargs: Optional[dict] = None,
varinfo: Optional[dict] = None,
timeout: Optional[float] = None,
load_options: Optional[dict] = None,
**execute_options,
):
if load_options is None:
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if inputs:
graph_io.update_default_inputs(ewoksgraph.graph, inputs)
ppfgraph = EwoksWorkflow(ewoksgraph, varinfo=varinfo)
return ppfgraph.run(startargs=startargs, timeout=timeout, **execute_options)
ewoksgraph = load_graph(graph, inputs=inputs, **load_options)
ppfgraph = EwoksWorkflow(ewoksgraph)
return ppfgraph.run(**execute_options)
......@@ -9,7 +9,5 @@ test_ppf_actors.SLEEP_TIME = 0
def ppf_log_config():
DEFAULT_DB_TYPE = persistence.DEFAULT_DB_TYPE
persistence.DEFAULT_DB_TYPE = "memory"
yield
persistence.DEFAULT_DB_TYPE = DEFAULT_DB_TYPE
......@@ -49,7 +49,7 @@ def workflow():
{"source": "task5", "target": "task2", "map_all_data": True},
]
graph = {"links": links, "nodes": nodes}
graph = {"graph": {"id": "test_graph"}, "links": links, "nodes": nodes}
expected_results = {"a": 9, "b": 9}
......
......@@ -12,6 +12,7 @@ def workflow20():
links = [{"source": "task1", "target": "task2", "map_all_data": True}]
graph = {
"graph": {"id": "workflow20"},
"links": links,
"nodes": nodes,
}
......
......@@ -75,7 +75,7 @@ def workflow():
},
]
graph = {"links": links, "nodes": nodes}
graph = {"graph": {"id": "workflow22"}, "links": links, "nodes": nodes}
expected_results = {"a": 6, "a_is_5": False, "b": 3, "b_is_4": False}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment