Skip to content
Snippets Groups Projects
Commit f2b83b87 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch 'reraise_pypushflow_exception' into 'main'

re-raise the pypushflow exception by default

See merge request !90
parents 35850371 ebf257ad
No related branches found
No related tags found
1 merge request!90re-raise the pypushflow exception by default
Pipeline #132986 passed
......@@ -24,7 +24,7 @@ packages=find:
python_requires = >=3.6
install_requires =
ewokscore >=0.4.1
pypushflow >=0.4.0
pypushflow >=0.6.0
[options.packages.find]
where=src
......
import sys
import pprint
from contextlib import contextmanager
from typing import Iterable, Optional, List, Sequence
from typing import Generator, Optional, List, Sequence
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
......@@ -175,13 +174,15 @@ class NameMapperActor(AbstractActor):
def trigger(self, inData: dict):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
is_error = "WorkflowException" in inData and inData.get("NewWorkflowException")
is_error = "WorkflowExceptionInstance" in inData and inData.get(
"_NewWorkflowException"
)
if is_error and not self.trigger_on_error:
return
try:
if is_error:
inData = dict(inData)
inData["NewWorkflowException"] = False
inData["_NewWorkflowException"] = False
# Map output names of this task to input
# names of the downstream task
newInData = dict()
......@@ -510,13 +511,13 @@ class EwoksWorkflow(Workflow):
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
**pool_options,
) -> Iterable[Optional[dict]]:
) -> Generator[None, None, None]:
self.startargs[ppfrunscript.INFOKEY]["varinfo"] = varinfo
graph = self.__ewoksgraph.graph
with events.workflow_context(execinfo, workflow=graph) as execinfo:
self.startargs[ppfrunscript.INFOKEY]["execinfo"] = execinfo
with super()._run_context(**pool_options):
yield execinfo
yield
def run(
self,
......@@ -536,7 +537,7 @@ class EwoksWorkflow(Workflow):
"the Pypushflow engine can only return the merged results of end tasks"
)
self._stop_actor.reset()
with self._run_context(**execute_options) as execinfo:
with self._run_context(**execute_options):
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
......@@ -546,27 +547,13 @@ class EwoksWorkflow(Workflow):
result = self._stop_actor.outData
if result is None:
return dict()
info = result.pop(ppfrunscript.INFOKEY, dict())
result = self.__parse_result(result)
ex = result.get("WorkflowException")
if ex is not None:
if not ex["errorMessage"]:
node_id = info.get("node_id")
ex["errorMessage"] = f"Task {node_id} failed"
execinfo["error"] = True
execinfo["error_message"] = ex["errorMessage"]
if isinstance(ex["traceBack"], str):
execinfo["error_traceback"] = ex["traceBack"]
else:
execinfo["error_traceback"] = "".join(ex["traceBack"])
if ex is None or not raise_on_error:
if outputs:
return result
else:
return dict()
else:
print("\n".join(ex["traceBack"]), file=sys.stderr)
raise RuntimeError(ex["errorMessage"])
ex = result.get("WorkflowExceptionInstance")
if ex is not None and raise_on_error:
raise ex
if outputs:
return result
return dict()
def __parse_result(self, result) -> dict:
varinfo = varinfo_from_indata(self.startargs)
......
......@@ -32,4 +32,4 @@ def test_workflow2(ppf_log_config, tmpdir):
result = execute_graph(graph, varinfo=varinfo, raise_on_error=False)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
err_msg = "Task 'Python Error Handler Test' failed"
assert result["WorkflowException"]["errorMessage"] == err_msg
assert str(result["WorkflowExceptionInstance"]) == err_msg
......@@ -162,7 +162,7 @@ def test_ppf_workflow24(ppf_log_config):
result = execute_graph(workflow(), inputs=inputs, raise_on_error=False)
succeeded = "task1", "task2", "task3"
assert result["_ppfdict"]["succeeded"] == succeeded
assert "WorkflowException" not in result["_ppfdict"]
assert "WorkflowExceptionInstance" not in result["_ppfdict"]
inputs = [
{"name": "succeeded", "value": tuple()},
......@@ -171,7 +171,7 @@ def test_ppf_workflow24(ppf_log_config):
result = execute_graph(workflow(), inputs=inputs, raise_on_error=False)
succeeded = "task1", "task2", "subtask1", "subtask2", "subtask3"
assert result["_ppfdict"]["succeeded"] == succeeded
errorMessage = result["_ppfdict"]["WorkflowException"]["errorMessage"]
errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"])
assert errorMessage == "Task 'task3' failed"
inputs = [
......@@ -188,7 +188,7 @@ def test_ppf_workflow24(ppf_log_config):
"subsubtask3",
)
assert result["_ppfdict"]["succeeded"] == succeeded
errorMessage = result["_ppfdict"]["WorkflowException"]["errorMessage"]
errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"])
assert errorMessage == "Task 'task3' failed"
inputs = [
......@@ -198,5 +198,5 @@ def test_ppf_workflow24(ppf_log_config):
result = execute_graph(workflow(), inputs=inputs, raise_on_error=False)
succeeded = "task1", "task2", "subtask1", "subtask2", "subsub_handler"
assert result["_ppfdict"]["succeeded"] == succeeded
errorMessage = result["_ppfdict"]["WorkflowException"]["errorMessage"]
errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"])
assert errorMessage == "Task 'task3' failed"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment