Commit 2a87cdf6 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch '3-harmonize-execute_graph-outputs' into 'main'

Resolve "Harmonize execute_graph outputs"

Closes #3

See merge request !31
parents 58202bf5 4d4de92f
Pipeline #74714 passed with stages
in 2 minutes and 37 seconds
......@@ -3,7 +3,7 @@ https://docs.dask.org/en/latest/scheduler-overview.html
"""
import json
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler
......@@ -14,8 +14,11 @@ from ewokscore import execute_graph_decorator
from ewokscore.inittask import instantiate_task
from ewokscore.inittask import add_dynamic_inputs
from ewokscore.graph.serialize import ewoks_jsonload_hook
from ewokscore.node import NodeIdType
from ewokscore.node import get_node_label
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.graph import TaskGraph
from ewokscore import events
......@@ -68,44 +71,44 @@ def convert_graph(ewoksgraph, **execute_options):
def execute_dask_graph(
daskgraph,
nodes: List[str],
node_ids: List[NodeIdType],
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
) -> Dict[NodeIdType, Any]:
if scheduler_options is None:
scheduler_options = dict()
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes, **scheduler_options)
results = sequential_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "multiprocessing":
# num_workers: CPU_COUNT by default
results = multiprocessing_scheduler(daskgraph, nodes, **scheduler_options)
results = multiprocessing_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "multithreading":
# num_workers: CPU_COUNT by default
results = multithreading_scheduler(daskgraph, nodes, **scheduler_options)
results = multithreading_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "cluster":
# n_worker: n worker with m threads (n_worker= n * m)
with Client(**scheduler_options) as client:
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
elif isinstance(scheduler, str):
with Client(address=scheduler, **scheduler_options) as client:
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
elif isinstance(scheduler, Client):
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
else:
raise ValueError("Unknown scheduler")
return dict(zip(nodes, results))
return dict(zip(node_ids, results))
def _execute_graph(
ewoksgraph,
results_of_all_nodes: Optional[bool] = False,
ewoksgraph: TaskGraph,
outputs: Optional[List[dict]] = None,
merge_outputs: Optional[bool] = True,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
) -> Dict[NodeIdType, Any]:
with events.workflow_context(execinfo, workflow=ewoksgraph.graph) as execinfo:
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
......@@ -113,13 +116,25 @@ def _execute_graph(
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo=varinfo, execinfo=execinfo)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
else:
nodes = list(analysis.end_nodes(ewoksgraph.graph))
return execute_dask_graph(
daskgraph, nodes, scheduler=scheduler, scheduler_options=scheduler_options
outputs = graph_io.parse_outputs(ewoksgraph.graph, outputs)
node_ids = list(analysis.topological_sort(ewoksgraph.graph))
result = execute_dask_graph(
daskgraph,
node_ids,
scheduler=scheduler,
scheduler_options=scheduler_options,
)
output_values = dict()
for node_id, task_outputs in result.items():
graph_io.add_output_values(
output_values,
node_id,
task_outputs,
outputs,
merge_outputs=merge_outputs,
)
return output_values
@execute_graph_decorator(binding="dask")
......
......@@ -5,9 +5,7 @@ from ewokscore import load_graph
from ewokscore.tests.examples.graphs import graph_names
from ewokscore.tests.examples.graphs import get_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_tasks
from ewokscore.tests.utils.results import filter_expected_results
from ewokscore.tests.utils.results import assert_execute_graph_default_result
@pytest.mark.parametrize("graph_name", graph_names())
......@@ -23,28 +21,6 @@ def test_examples(graph_name, tmpdir, scheduler, scheme):
if ewoksgraph.is_cyclic or ewoksgraph.has_conditional_links:
with pytest.raises(RuntimeError):
execute_graph(graph, scheduler=scheduler, varinfo=varinfo)
return
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=True
)
assert_all_results(ewoksgraph, result, expected, varinfo)
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=False
)
assert_end_results(ewoksgraph, result, expected, varinfo)
def assert_all_results(ewoksgraph, result, expected, varinfo):
if varinfo:
scheme = varinfo.get("scheme")
else:
scheme = None
if scheme:
assert_execute_graph_all_tasks(ewoksgraph, expected, varinfo=varinfo)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
def assert_end_results(ewoksgraph, result, expected, varinfo):
expected = filter_expected_results(ewoksgraph, expected, end_only=True)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
result = execute_graph(graph, scheduler=scheduler, varinfo=varinfo)
assert_execute_graph_default_result(ewoksgraph, result, expected, varinfo)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment