Commit 10c8dc98 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

node_id is mandatory

parent 2a3c1ddd
......@@ -3,31 +3,33 @@ https://docs.dask.org/en/latest/scheduler-overview.html
"""
import json
from typing import List, Optional
from typing import List, Optional, Union
from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler
from dask import get as sequential_scheduler
from ewokscore import load_graph
from ewokscore import execute_graph_decorator
from ewokscore.inittask import instantiate_task
from ewokscore.inittask import add_dynamic_inputs
from ewokscore.graph.serialize import ewoks_jsonload_hook
from ewokscore.node import get_node_label
from ewokscore.graph import analysis
from ewokscore import events
def execute_task(execinfo, *inputs):
execinfo = json.loads(execinfo, object_pairs_hook=ewoks_jsonload_hook)
def execute_task(execute_options, *inputs):
execute_options = json.loads(execute_options, object_pairs_hook=ewoks_jsonload_hook)
dynamic_inputs = dict()
for source_results, link_attrs in zip(inputs, execinfo["link_attrs"]):
for source_results, link_attrs in zip(inputs, execute_options["link_attrs"]):
add_dynamic_inputs(dynamic_inputs, link_attrs, source_results)
task = instantiate_task(
execinfo["node_id"],
execinfo["node_attrs"],
execute_options["node_id"],
execute_options["node_attrs"],
inputs=dynamic_inputs,
varinfo=execinfo["varinfo"],
varinfo=execute_options.get("varinfo"),
)
task.execute()
......@@ -53,44 +55,70 @@ def convert_graph(ewoksgraph, **execute_options):
return daskgraph
def execute_dask_graph(
daskgraph,
nodes: List[str],
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
if scheduler_options is None:
scheduler_options = dict()
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "multiprocessing":
# num_workers: CPU_COUNT by default
results = multiprocessing_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "multithreading":
# num_workers: CPU_COUNT by default
results = multithreading_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "cluster":
# n_worker: n worker with m threads (n_worker= n * m)
with Client(**scheduler_options) as client:
results = client.get(daskgraph, nodes)
elif isinstance(scheduler, str):
with Client(address=scheduler, **scheduler_options) as client:
results = client.get(daskgraph, nodes)
elif isinstance(scheduler, Client):
results = client.get(daskgraph, nodes)
else:
raise ValueError("Unknown scheduler")
return dict(zip(nodes, results))
def _execute_graph(
ewoksgraph,
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
with events.workflow_context(execinfo, workflow=ewoksgraph.graph) as execinfo:
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo=varinfo, execinfo=execinfo)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
else:
nodes = list(analysis.end_nodes(ewoksgraph.graph))
return execute_dask_graph(
daskgraph, nodes, scheduler=scheduler, scheduler_options=scheduler_options
)
@execute_graph_decorator
def execute_graph(
graph,
scheduler=None,
inputs: Optional[List[dict]] = None,
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
load_options: Optional[dict] = None,
**execute_options
):
if load_options is None:
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
if inputs:
ewoksgraph.update_default_inputs(inputs)
daskgraph = convert_graph(ewoksgraph, **execute_options)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
else:
nodes = list(analysis.end_nodes(ewoksgraph.graph))
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes)
elif isinstance(scheduler, str):
if scheduler == "multiprocessing":
results = multiprocessing_scheduler(daskgraph, nodes)
elif scheduler == "multithreading":
results = multithreading_scheduler(daskgraph, nodes)
else:
raise ValueError("Unknown scheduler")
elif isinstance(scheduler, dict):
with Client(**scheduler) as scheduler:
results = scheduler.get(daskgraph, nodes)
else:
results = scheduler.get(daskgraph, nodes)
return dict(zip(nodes, results))
ewoksgraph = load_graph(graph, inputs=inputs, **load_options)
return _execute_graph(ewoksgraph, **execute_options)
......@@ -3,6 +3,14 @@ from dask_jobqueue import SLURMCluster
def local_scheduler(**kw):
"""
:param n_worker:
:param processes: True by default
:param threads_per_worker:
:param scheduler_port:
:param dashboard_address:
:param worker_dashboard_address:
"""
# Run this on any machine
kw.setdefault("n_workers", 1)
......@@ -14,6 +22,15 @@ def local_scheduler(**kw):
def slurm_scheduler(**kw):
"""
:param address:
:param n_workers:
:param minimum_jobs:
:param maximum_jobs:
:param cores:
:param processes:
:param memory:
"""
# Run this on slurm-access
# Parameters for each execute_graph:
kw.setdefault("project", "esrftaskgraph")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment