Commit d3db71a4 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

make execute_graph uniform with the one from ewokscore

parent 4ab3c0f1
Pipeline #59390 passed with stages
in 1 minute and 17 seconds
......@@ -4,6 +4,7 @@
import json
import logging
from typing import Optional
from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler
......@@ -57,7 +58,7 @@ def execute_task(execinfo, *inputs):
return task.output_transfer_data
def convert_graph(ewoksgraph, varinfo, enable_logging=False):
def convert_graph(ewoksgraph, **execute_options):
daskgraph = dict()
for target_id, node_attrs in ewoksgraph.graph.nodes.items():
source_ids = tuple(ewoksgraph.predecessors(target_id))
......@@ -65,35 +66,34 @@ def convert_graph(ewoksgraph, varinfo, enable_logging=False):
ewoksgraph.graph[source_id][target_id] for source_id in source_ids
node_label = get_node_label(node_attrs, node_id=target_id)
execinfo = {
"node_id": target_id,
"node_label": node_label,
"node_attrs": node_attrs,
"link_attrs": link_attrs,
"varinfo": varinfo,
"enable_logging": enable_logging,
# Note: the execinfo is serialized to prevent dask
execute_options["node_id"] = target_id
execute_options["node_label"] = node_label
execute_options["node_attrs"] = node_attrs
execute_options["link_attrs"] = link_attrs
# Note: the execute_options is serialized to prevent dask
# from interpreting node names as task results
daskgraph[target_id] = (execute_task, json.dumps(execinfo)) + source_ids
daskgraph[target_id] = (execute_task, json.dumps(execute_options)) + source_ids
return daskgraph
def execute_graph(
load_options: Optional[dict] = None,
ewoksgraph = load_graph(source=graph, representation=representation, **load_options)
if load_options is None:
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo, enable_logging=log_task_execution)
daskgraph = convert_graph(
ewoksgraph, enable_logging=log_task_execution, **execute_options
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
