......@@ -3,7 +3,7 @@
import json
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler
......@@ -14,8 +14,11 @@ from ewokscore import execute_graph_decorator
from ewokscore.inittask import instantiate_task
from ewokscore.inittask import add_dynamic_inputs
from ewokscore.graph.serialize import ewoks_jsonload_hook
from ewokscore.node import NodeIdType
from ewokscore.node import get_node_label
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.graph import TaskGraph
from ewokscore import events
......@@ -68,44 +71,44 @@ def convert_graph(ewoksgraph, **execute_options):
def execute_dask_graph(
nodes: List[str],
node_ids: List[NodeIdType],
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
if scheduler_options is None:
scheduler_options = dict()
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes, **scheduler_options)
results = sequential_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "multiprocessing":
# num_workers: CPU_COUNT by default
results = multiprocessing_scheduler(daskgraph, nodes, **scheduler_options)
results = multiprocessing_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "multithreading":
# num_workers: CPU_COUNT by default
results = multithreading_scheduler(daskgraph, nodes, **scheduler_options)
results = multithreading_scheduler(daskgraph, node_ids, **scheduler_options)
elif scheduler == "cluster":
# n_worker: n worker with m threads (n_worker= n * m)
with Client(**scheduler_options) as client:
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
elif isinstance(scheduler, str):
with Client(address=scheduler, **scheduler_options) as client:
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
elif isinstance(scheduler, Client):
results = client.get(daskgraph, nodes)
results = client.get(daskgraph, node_ids)
raise ValueError("Unknown scheduler")
return dict(zip(nodes, results))
return dict(zip(node_ids, results))
def _execute_graph(
results_of_all_nodes: Optional[bool] = False,
ewoksgraph: TaskGraph,
outputs: Optional[List[dict]] = None,
merge_outputs: Optional[bool] = True,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
) -> Dict[NodeIdType, Any]:
with events.workflow_context(execinfo, workflow=ewoksgraph.graph) as execinfo:
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
......@@ -113,13 +116,17 @@ def _execute_graph(
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo=varinfo, execinfo=execinfo)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
nodes = list(analysis.end_nodes(ewoksgraph.graph))
return execute_dask_graph(
daskgraph, nodes, scheduler=scheduler, scheduler_options=scheduler_options
outputs = graph_io.parse_outputs(ewoksgraph.graph, outputs)
node_ids = list({output["id"] for output in outputs})
result = execute_dask_graph(
daskgraph, node_ids, scheduler=scheduler, scheduler_options=scheduler_options
output_values = dict()
for node_id, task_outputs in result.items():
output_values, node_id, task_outputs, outputs, merge_outputs=merge_outputs
return output_values
......@@ -5,9 +5,7 @@ from ewokscore import load_graph
from ewokscore.tests.examples.graphs import graph_names
from ewokscore.tests.examples.graphs import get_graph
from ewokscore.tests.utils.results import assert_execute_graph_all_tasks
from ewokscore.tests.utils.results import assert_execute_graph_tasks
from ewokscore.tests.utils.results import filter_expected_results
from ewokscore.tests.utils.results import assert_default_results
@pytest.mark.parametrize("graph_name", graph_names())
......@@ -23,28 +21,6 @@ def test_examples(graph_name, tmpdir, scheduler, scheme):
if ewoksgraph.is_cyclic or ewoksgraph.has_conditional_links:
with pytest.raises(RuntimeError):
execute_graph(graph, scheduler=scheduler, varinfo=varinfo)
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=True
assert_all_results(ewoksgraph, result, expected, varinfo)
result = execute_graph(
graph, scheduler=scheduler, varinfo=varinfo, results_of_all_nodes=False
assert_end_results(ewoksgraph, result, expected, varinfo)
def assert_all_results(ewoksgraph, result, expected, varinfo):
if varinfo:
scheme = varinfo.get("scheme")
scheme = None
if scheme:
assert_execute_graph_all_tasks(ewoksgraph, expected, varinfo=varinfo)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
def assert_end_results(ewoksgraph, result, expected, varinfo):
expected = filter_expected_results(ewoksgraph, expected, end_only=True)
assert_execute_graph_tasks(result, expected, varinfo=varinfo)
result = execute_graph(graph, scheduler=scheduler, varinfo=varinfo)
assert_default_results(ewoksgraph, result, expected, varinfo)
