Skip to content
Snippets Groups Projects
Commit 068beb1f authored by Wout De Nolf's avatar Wout De Nolf
Browse files

harmonize execute_graph output

parent 084b9895
No related branches found
No related tags found
1 merge request!59Resolve "Harmonize execute_graph outputs"
Pipeline #74715 passed
Showing
with 73 additions and 88 deletions
......@@ -520,11 +520,15 @@ class EwoksWorkflow(Workflow):
self,
startargs: Optional[dict] = None,
raise_on_error: Optional[bool] = True,
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
merge_outputs: Optional[bool] = True,
timeout: Optional[float] = None,
**execute_options,
):
if outputs and outputs != [{"all": True}] or not merge_outputs:
raise ValueError(
"The Pypushflow binding can only return the merged results of all tasks"
)
with self._run_context(**execute_options) as execinfo:
startindata = dict(self.startargs)
if startargs:
......
......@@ -3,9 +3,7 @@ from ewokscore import load_graph
from ewoksppf import execute_graph
from ewokscore.tests.examples.graphs import graph_names
from ewokscore.tests.examples.graphs import get_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_values
from ewokscore.tests.utils.results import filter_expected_results
from ewokscore.tests.utils.results import assert_execute_graph_default_result
@pytest.mark.parametrize("graph_name", graph_names())
......@@ -18,27 +16,4 @@ def test_execute_graph(graph_name, scheme, ppf_log_config, tmpdir):
varinfo = None
ewoksgraph = load_graph(graph)
result = execute_graph(graph, varinfo=varinfo, timeout=10)
assert_results(graph, ewoksgraph, result, expected, varinfo)
def assert_results(graph, ewoksgraph, result, expected, varinfo):
if varinfo:
scheme = varinfo.get("scheme")
else:
scheme = None
if ewoksgraph.is_cyclic:
expected = filter_expected_results(
ewoksgraph, expected, end_only=True, merge=True
)
assert_execute_graph_values(result, expected, varinfo)
elif scheme:
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
expected = filter_expected_results(
ewoksgraph, expected, end_only=True, merge=True
)
assert_execute_graph_values(result, expected, varinfo)
else:
expected = filter_expected_results(
ewoksgraph, expected, end_only=True, merge=True
)
assert_execute_graph_values(result, expected, varinfo)
assert_execute_graph_default_result(ewoksgraph, result, expected, varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow1():
......@@ -30,5 +30,5 @@ def workflow1():
def test_workflow1(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow1()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel11a():
......@@ -132,5 +132,5 @@ def workflow11():
def test_workflow11(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow11()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
import pytest
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel12():
......@@ -95,5 +95,5 @@ def test_workflow12(startvalue, ppf_log_config, tmpdir):
withsubmodel_startvalue = 1
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow12(startvalue, withsubmodel_startvalue)
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
import pytest
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel13():
......@@ -92,5 +92,5 @@ def test_workflow13(startvalue, ppf_log_config, tmpdir):
withlastnode_startvalue = 1
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow13(startvalue, withlastnode_startvalue)
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel14a():
......@@ -118,5 +118,5 @@ def workflow14():
def test_workflow14(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow14()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel15(name):
......@@ -106,5 +106,5 @@ def test_workflow15(ppf_log_config, tmpdir):
"""Test connecting nodes from submodels directly"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow15()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel16a():
......@@ -135,5 +135,5 @@ def test_workflow16(ppf_log_config, tmpdir):
"""Test connecting nodes from sub-submodels to the top model"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow16()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
import pytest
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow18(dotask4=True):
......@@ -37,16 +37,10 @@ def workflow18(dotask4=True):
"nodes": nodes,
}
expected_results = {
"task1": {"_ppfdict": {"value": 1}},
"task2": {"_ppfdict": {"value": 11}},
"task3": {"_ppfdict": {"value": 2}},
}
if dotask4:
expected_results["task4"] = {"_ppfdict": {"value": 12}}
expected_results = {"_ppfdict": {"value": 12}}
else:
expected_results["task4"] = None
expected_results = {"_ppfdict": {"value": 2}}
return graph, expected_results
......@@ -56,5 +50,5 @@ def test_workflow18(dotask4, ppf_log_config, tmpdir):
"""Test conditional links"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow18(dotask4=dotask4)
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow19():
......@@ -75,7 +76,7 @@ def workflow19():
def test_workflow19(ppf_log_config, tmpdir):
"""Test 2 unconditional upstream tasks, one coming from a feedback loop"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow19()
result = execute_graph(graph)
for k in expected:
assert result[k] == expected[k]
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow2():
......@@ -30,6 +30,6 @@ def test_workflow2(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow2()
result = execute_graph(graph, varinfo=varinfo, raise_on_error=False)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
err_msg = "Task 'Python Error Handler Test' failed"
assert result["WorkflowException"]["errorMessage"] == err_msg
import pytest
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow20():
......@@ -30,4 +31,5 @@ def test_workflow20(persist, ppf_log_config, tmpdir):
result = execute_graph(
graph, inputs=[{"name": "value", "value": 5}], varinfo=varinfo
)
assert result["_ppfdict"]["value"] == 7
expected = {"_ppfdict": {"value": 7}}
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
import pytest
from ewoksppf import execute_graph
from ewokscore.utils import qualname
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def passthrough(**kw):
......@@ -164,6 +165,12 @@ def test_workflow21(args, on_error, persist, ppf_log_config, tmpdir):
varinfo = None
graph = workflow21(on_error=on_error)
inputs = [{"name": k, "value": v} for k, v in args["inputs"].items()]
result1 = execute_graph(graph, inputs=inputs, varinfo=varinfo)
assert result1
assert result1["return_value"] == args["return_value"]
result = execute_graph(graph, inputs=inputs, varinfo=varinfo)
assert result
assert result["return_value"] == args["return_value"]
if args == ARG_SUCCESS:
expected = {"a": 1, "return_value": args["return_value"]}
else:
expected = {"b": 2, "return_value": args["return_value"]}
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def workflow():
......@@ -82,7 +83,7 @@ def workflow():
return graph, expected_results
def test_ppf_workflow22(ppf_log_config):
def test_ppf_workflow22(ppf_log_config, tmpdir):
"""This is a test of a loop with several conditions for exiting the loop."""
# The execution should be like this:
# Initial conditions: {"a": 1, "a_is_5": False, "b": 1, "b_is_4": False}
......@@ -95,6 +96,6 @@ def test_ppf_workflow22(ppf_log_config):
# After 'AddA outside loop': {"a": 6, "a_is_5": False, "b": 1, "b_is_4": False}
graph, expected = workflow()
result = execute_graph(graph)
for k, v in expected.items():
assert result["_ppfdict"][k] == v, k
varinfo = {"root_uri": str(tmpdir)}
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
import pytest
from ewoksppf import execute_graph
from ewokscore.utils import qualname
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def raise_not_greater_than(**kwargs):
......@@ -80,7 +81,7 @@ def workflow():
@pytest.mark.skip("Conditional branches that merge again are not handled yet")
@pytest.mark.parametrize("on_error", [True, False])
def test_ppf_workflow23(on_error, ppf_log_config):
def test_ppf_workflow23(on_error, ppf_log_config, tmpdir):
"""Test error conditions."""
graph = workflow()
......@@ -94,6 +95,6 @@ def test_ppf_workflow23(on_error, ppf_log_config):
{"name": "tasks", "value": set()},
{"name": "groups", "value": set()},
]
result = execute_graph(graph, inputs=inputs)
for k, v in expected.items():
assert result["_ppfdict"][k] == v, k
varinfo = {"root_uri": str(tmpdir)}
result = execute_graph(graph, inputs=inputs, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel1():
......@@ -76,5 +76,5 @@ def workflow3():
def test_workflow3(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow3()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel6():
......@@ -85,5 +85,5 @@ def workflow6():
def test_workflow6(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow6()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel7():
......@@ -71,5 +71,5 @@ def workflow7():
def test_workflow7(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow7()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
from ewoksppf import execute_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_default_result
def submodel8():
......@@ -71,5 +71,5 @@ def workflow8():
def test_workflow8(ppf_log_config, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow8()
execute_graph(graph, varinfo=varinfo)
assert_execute_graph_all_tasks(graph, expected, varinfo=varinfo)
result = execute_graph(graph, varinfo=varinfo)
assert_execute_graph_default_result(graph, result, expected, varinfo=varinfo)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment