Commit 544b7ffd authored by payno's avatar payno
Browse files

Merge branch 'execute_graph_api' into 'main'

make execute_graph uniform with the one from ewokscore

See merge request !14
parents 4ab3c0f1 d3db71a4
Pipeline #59993 passed with stages
in 3 minutes and 18 seconds
......@@ -4,6 +4,7 @@ https://docs.dask.org/en/latest/scheduler-overview.html
import json
import logging
from typing import Optional
from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler
......@@ -57,7 +58,7 @@ def execute_task(execinfo, *inputs):
return task.output_transfer_data
def convert_graph(ewoksgraph, varinfo, enable_logging=False):
def convert_graph(ewoksgraph, **execute_options):
daskgraph = dict()
for target_id, node_attrs in ewoksgraph.graph.nodes.items():
source_ids = tuple(ewoksgraph.predecessors(target_id))
......@@ -65,35 +66,34 @@ def convert_graph(ewoksgraph, varinfo, enable_logging=False):
ewoksgraph.graph[source_id][target_id] for source_id in source_ids
)
node_label = get_node_label(node_attrs, node_id=target_id)
execinfo = {
"node_id": target_id,
"node_label": node_label,
"node_attrs": node_attrs,
"link_attrs": link_attrs,
"varinfo": varinfo,
"enable_logging": enable_logging,
}
# Note: the execinfo is serialized to prevent dask
execute_options["node_id"] = target_id
execute_options["node_label"] = node_label
execute_options["node_attrs"] = node_attrs
execute_options["link_attrs"] = link_attrs
# Note: the execute_options is serialized to prevent dask
# from interpreting node names as task results
daskgraph[target_id] = (execute_task, json.dumps(execinfo)) + source_ids
daskgraph[target_id] = (execute_task, json.dumps(execute_options)) + source_ids
return daskgraph
def execute_graph(
graph,
representation=None,
varinfo=None,
scheduler=None,
log_task_execution=False,
results_of_all_nodes=False,
**load_options,
load_options: Optional[dict] = None,
**execute_options
):
ewoksgraph = load_graph(source=graph, representation=representation, **load_options)
if load_options is None:
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo, enable_logging=log_task_execution)
daskgraph = convert_graph(
ewoksgraph, enable_logging=log_task_execution, **execute_options
)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment