Commit 5e66977e authored by payno's avatar payno
Browse files

Merge branch 'ewoks_events' into 'main'

Ewoks events

See merge request !24
parents 534f0605 d8e147a9
Pipeline #69166 passed with stages
in 3 minutes and 52 seconds
...@@ -3,31 +3,34 @@ https://docs.dask.org/en/latest/scheduler-overview.html ...@@ -3,31 +3,34 @@ https://docs.dask.org/en/latest/scheduler-overview.html
""" """
import json import json
from typing import List, Optional from typing import List, Optional, Union
from dask.distributed import Client from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler from dask.threaded import get as multithreading_scheduler
from dask.multiprocessing import get as multiprocessing_scheduler from dask.multiprocessing import get as multiprocessing_scheduler
from dask import get as sequential_scheduler from dask import get as sequential_scheduler
from ewokscore import load_graph from ewokscore import load_graph
from ewokscore import execute_graph_decorator
from ewokscore.inittask import instantiate_task from ewokscore.inittask import instantiate_task
from ewokscore.inittask import add_dynamic_inputs from ewokscore.inittask import add_dynamic_inputs
from ewokscore.graph.serialize import ewoks_jsonload_hook from ewokscore.graph.serialize import ewoks_jsonload_hook
from ewokscore.node import get_node_label from ewokscore.node import get_node_label
from ewokscore.graph import analysis from ewokscore.graph import analysis
from ewokscore import events
def execute_task(execinfo, *inputs): def execute_task(execute_options, *inputs):
execinfo = json.loads(execinfo, object_pairs_hook=ewoks_jsonload_hook) execute_options = json.loads(execute_options, object_pairs_hook=ewoks_jsonload_hook)
dynamic_inputs = dict() dynamic_inputs = dict()
for source_results, link_attrs in zip(inputs, execinfo["link_attrs"]): for source_results, link_attrs in zip(inputs, execute_options["link_attrs"]):
add_dynamic_inputs(dynamic_inputs, link_attrs, source_results) add_dynamic_inputs(dynamic_inputs, link_attrs, source_results)
task = instantiate_task( task = instantiate_task(
execinfo["node_attrs"], execute_options["node_id"],
node_id=execinfo["node_id"], execute_options["node_attrs"],
inputs=dynamic_inputs, inputs=dynamic_inputs,
varinfo=execinfo["varinfo"], varinfo=execute_options.get("varinfo"),
execinfo=execute_options.get("execinfo"),
) )
task.execute() task.execute()
...@@ -42,7 +45,7 @@ def convert_graph(ewoksgraph, **execute_options): ...@@ -42,7 +45,7 @@ def convert_graph(ewoksgraph, **execute_options):
link_attrs = tuple( link_attrs = tuple(
ewoksgraph.graph[source_id][target_id] for source_id in source_ids ewoksgraph.graph[source_id][target_id] for source_id in source_ids
) )
node_label = get_node_label(node_attrs, node_id=target_id) node_label = get_node_label(target_id, node_attrs)
execute_options["node_id"] = target_id execute_options["node_id"] = target_id
execute_options["node_label"] = node_label execute_options["node_label"] = node_label
execute_options["node_attrs"] = node_attrs execute_options["node_attrs"] = node_attrs
...@@ -53,44 +56,70 @@ def convert_graph(ewoksgraph, **execute_options): ...@@ -53,44 +56,70 @@ def convert_graph(ewoksgraph, **execute_options):
return daskgraph return daskgraph
def execute_dask_graph(
daskgraph,
nodes: List[str],
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
if scheduler_options is None:
scheduler_options = dict()
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "multiprocessing":
# num_workers: CPU_COUNT by default
results = multiprocessing_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "multithreading":
# num_workers: CPU_COUNT by default
results = multithreading_scheduler(daskgraph, nodes, **scheduler_options)
elif scheduler == "cluster":
# n_worker: n worker with m threads (n_worker= n * m)
with Client(**scheduler_options) as client:
results = client.get(daskgraph, nodes)
elif isinstance(scheduler, str):
with Client(address=scheduler, **scheduler_options) as client:
results = client.get(daskgraph, nodes)
elif isinstance(scheduler, Client):
results = client.get(daskgraph, nodes)
else:
raise ValueError("Unknown scheduler")
return dict(zip(nodes, results))
def _execute_graph(
ewoksgraph,
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
scheduler: Union[dict, str, None, Client] = None,
scheduler_options: Optional[dict] = None,
):
with events.workflow_context(execinfo, workflow=ewoksgraph.graph) as execinfo:
if ewoksgraph.is_cyclic:
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph(ewoksgraph, varinfo=varinfo, execinfo=execinfo)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
else:
nodes = list(analysis.end_nodes(ewoksgraph.graph))
return execute_dask_graph(
daskgraph, nodes, scheduler=scheduler, scheduler_options=scheduler_options
)
@execute_graph_decorator(binding="dask")
def execute_graph( def execute_graph(
graph, graph,
scheduler=None,
inputs: Optional[List[dict]] = None, inputs: Optional[List[dict]] = None,
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
load_options: Optional[dict] = None, load_options: Optional[dict] = None,
**execute_options **execute_options
): ):
if load_options is None: if load_options is None:
load_options = dict() load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options) ewoksgraph = load_graph(graph, inputs=inputs, **load_options)
if ewoksgraph.is_cyclic: return _execute_graph(ewoksgraph, **execute_options)
raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links")
if inputs:
ewoksgraph.update_default_inputs(inputs)
daskgraph = convert_graph(ewoksgraph, **execute_options)
if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes)
else:
nodes = list(analysis.end_nodes(ewoksgraph.graph))
if scheduler is None:
results = sequential_scheduler(daskgraph, nodes)
elif isinstance(scheduler, str):
if scheduler == "multiprocessing":
results = multiprocessing_scheduler(daskgraph, nodes)
elif scheduler == "multithreading":
results = multithreading_scheduler(daskgraph, nodes)
else:
raise ValueError("Unknown scheduler")
elif isinstance(scheduler, dict):
with Client(**scheduler) as scheduler:
results = scheduler.get(daskgraph, nodes)
else:
results = scheduler.get(daskgraph, nodes)
return dict(zip(nodes, results))
...@@ -3,6 +3,14 @@ from dask_jobqueue import SLURMCluster ...@@ -3,6 +3,14 @@ from dask_jobqueue import SLURMCluster
def local_scheduler(**kw): def local_scheduler(**kw):
"""
:param n_worker:
:param processes: True by default
:param threads_per_worker:
:param scheduler_port:
:param dashboard_address:
:param worker_dashboard_address:
"""
# Run this on any machine # Run this on any machine
kw.setdefault("n_workers", 1) kw.setdefault("n_workers", 1)
...@@ -14,6 +22,15 @@ def local_scheduler(**kw): ...@@ -14,6 +22,15 @@ def local_scheduler(**kw):
def slurm_scheduler(**kw): def slurm_scheduler(**kw):
"""
:param address:
:param n_workers:
:param minimum_jobs:
:param maximum_jobs:
:param cores:
:param processes:
:param memory:
"""
# Run this on slurm-access # Run this on slurm-access
# Parameters for each execute_graph: # Parameters for each execute_graph:
kw.setdefault("project", "esrftaskgraph") kw.setdefault("project", "esrftaskgraph")
......
import pytest
from ewoksdask import execute_graph
from ewokscore.tests.test_workflow_events import fetch_events
from ewokscore.tests.test_workflow_events import run_succesfull_workfow
from ewokscore.tests.test_workflow_events import run_failed_workfow
from ewokscore.tests.test_workflow_events import assert_succesfull_workfow_events
from ewokscore.tests.test_workflow_events import assert_failed_workfow_events
@pytest.mark.parametrize("scheduler", (None, "multithreading", "multiprocessing"))
def test_succesfull_workfow(scheduler, tmpdir):
uri = run_succesfull_workfow(tmpdir, execute_graph, scheduler=scheduler)
events = fetch_events(uri, 10)
assert_succesfull_workfow_events(events)
@pytest.mark.parametrize("scheduler", (None, "multithreading", "multiprocessing"))
def test_failed_workfow(scheduler, tmpdir):
uri = run_failed_workfow(tmpdir, execute_graph, scheduler=scheduler)
events = fetch_events(uri, 8)
assert_failed_workfow_events(events)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment