Commit 1e36d916 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

refactor example graph tests

parent 15b39199
Pipeline #62681 passed with stages
in 57 seconds
import sys
import logging
import pytest
import itertools
from ewoksdask import execute_graph
from ewokscore.tests.examples.graphs import graph_names
from ewokscore.tests.examples.graphs import get_graph
from ewokscore.tests.utils import assert_taskgraph_result
from ewokscore.tests.utils import assert_workflow_result
from ewokscore import load_graph
logging.getLogger("dask").setLevel(logging.DEBUG)
logging.getLogger("dask").addHandler(logging.StreamHandler(sys.stdout))
logging.getLogger("ewoksdask").setLevel(logging.DEBUG)
logging.getLogger("ewoksdask").addHandler(logging.StreamHandler(sys.stdout))
from ewokscore.tests.examples.graphs import graph_names
from ewokscore.tests.examples.graphs import get_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_tasks
from ewokscore.tests.utils.results import filter_expected_results
@pytest.mark.parametrize(
"graph_name,scheduler,persist",
itertools.product(
graph_names(), (None, "multithreading", "multiprocessing"), (True, False)
),
)
def test_examples(graph_name, tmpdir, scheduler, persist):
@pytest.mark.parametrize("graph_name", graph_names())
@pytest.mark.parametrize("scheduler", (None, "multithreading", "multiprocessing"))
@pytest.mark.parametrize("scheme", (None, "json"))
def test_examples(graph_name, tmpdir, scheduler, scheme):
graph, expected = get_graph(graph_name)
ewoksgraph = load_graph(graph)
if persist:
varinfo = {"root_uri": str(tmpdir)}
if scheme:
varinfo = {"root_uri": str(tmpdir), "scheme": scheme}
else:
varinfo = None
if ewoksgraph.is_cyclic or ewoksgraph.has_conditional_links:
with pytest.raises(RuntimeError):
execute_graph(graph, scheduler=scheduler, varinfo=varinfo)
else:
return
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=True
)
if persist:
assert_taskgraph_result(ewoksgraph, expected, varinfo=varinfo)
assert_workflow_result(result, expected, varinfo=varinfo)
assert_all_results(ewoksgraph, result, expected, varinfo)
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=False
)
assert_end_results(ewoksgraph, result, expected, varinfo)
def assert_all_results(ewoksgraph, result, expected, varinfo):
if varinfo:
scheme = varinfo.get("scheme")
else:
scheme = None
if scheme:
assert_execute_graph_all_tasks(ewoksgraph, expected, varinfo=varinfo)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
def assert_end_results(ewoksgraph, result, expected, varinfo):
expected = filter_expected_results(ewoksgraph, expected, end_only=True)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment