Commit 05592ac0 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

esrftaskgraph: rename TaskGraph.run and Task.run to TaskGraph.execute and Task.execute

parent 628b8ac1
......@@ -15,7 +15,7 @@ def execute_task(node_name, *inputs):
info = inputs[0]
esrfgraph = load_graph(info["esrfgraph"])
task = esrfgraph.instantiate_task_static(node_name, varinfo=info["varinfo"])
task.run()
task.execute()
return info
......
......@@ -49,7 +49,7 @@ class TaskRunner(luigi.Task):
return EsrfTarget(self.esrftask)
def run(self):
self.esrftask.run()
self.esrftask.execute()
return dict(self.esrftask.output_uhashes)
......
......@@ -37,7 +37,7 @@ class TaskCommand(mpdag.BaseCommand):
logger.info(self.prefix + "instantiate tasks ...")
task = self.taskgraph.instantiate_task_static(self.node, varinfo=self.varinfo)
logger.info(self.prefix + "run ...")
task.run()
task.execute()
logger.info(self.prefix + f"output = {task.output_values}")
......
......@@ -144,7 +144,7 @@ class OWESRFWidget(OWWidget, metaclass=ESRFWidgetMetaClass, openclass=True):
def run(self):
task = self.esrftaskclass(**self.input_variables, varinfo=self.varinfo)
try:
task.run()
task.execute()
except TaskInputError:
self.clear_downstream()
return
......
......@@ -11,11 +11,11 @@ def convert_graph(esrfgraph, varinfo):
tasks = dict()
for node in esrfgraph.graph.nodes:
task = esrfgraph.instantiate_task_static(node, tasks=tasks, varinfo=varinfo)
pdgraph.add_vertex(task.run)
pdgraph.add_vertex(task.execute)
for node in esrfgraph.graph.nodes:
task = tasks[node]
for upstream in esrfgraph.predecessors(node):
pdgraph.add_edge(tasks[upstream].run, task.run)
pdgraph.add_edge(tasks[upstream].execute, task.execute)
return pdgraph
......
......@@ -25,7 +25,7 @@ def run(**inputs):
task = instantiate_task(info["node_attrs"], varinfo=varinfo, inputs=inputs)
try:
task.run()
task.execute()
except Exception as e:
if log:
logger.error(
......
......@@ -189,7 +189,7 @@ class TaskGraph:
.. code-block:: python
taskgraph.run()
taskgraph.execute()
"""
GraphRepresentation = enum.Enum(
......@@ -553,11 +553,11 @@ class TaskGraph:
raise RuntimeError("Sorting nodes is not possible for cyclic graphs")
yield from networkx.topological_sort(self.graph)
def run(self, varinfo=None):
def execute(self, varinfo=None):
"""Sequential execution of DAGs"""
tasks = dict()
if self.has_conditional_links:
raise RuntimeError("Cannot execute graphs with conditional links")
for node in self.topological_sort():
task = self.instantiate_task_static(node, tasks=tasks, varinfo=varinfo)
task.run()
task.execute()
......@@ -6,7 +6,7 @@ from esrftaskgraph.variable import ReadOnlyVariableContainerNamespace
from esrftaskgraph.registration import Registered
class TaskInputError(RuntimeError):
class TaskInputError(AssertionError):
pass
......@@ -173,33 +173,35 @@ class Task(Registered, hashing.UniversalHashable, register=False):
def failed(self):
return self._exception is not None
@property
def can_run(self):
def _unavailable_inputs(self):
for iname in self._INPUT_NAMES:
ivar = self._inputs.get(iname)
if ivar is None or not ivar.available:
return False
return True
yield iname
def check_can_run(self):
unavailable = list()
for iname in self._INPUT_NAMES:
ivar = self._inputs.get(iname)
if ivar is None or not ivar.available:
unavailable.append(iname)
@property
def is_ready_to_execute(self):
try:
next(iter(self._unavailable_inputs()))
except StopIteration:
return True
return False
def assert_ready_to_execute(self):
unavailable = list(self._unavailable_inputs())
if unavailable:
raise TaskInputError(
"The following inputs could not be loaded: " + str(unavailable)
)
def run(self, force_rerun=False, raise_on_error=True):
def execute(self, force_rerun=False, raise_on_error=True):
try:
if force_rerun:
# Rerun a task which is already done
self._outputs.force_non_existing()
if self.done:
return
self.check_can_run()
self.assert_ready_to_execute()
self.process()
self._outputs.dump()
except Exception as e:
......
......@@ -21,7 +21,7 @@ def test_acyclic_execution(tmpdir):
g, expected = taskgraphs.acyclic_graph1()
taskgraph = load_graph(g)
varinfo = {"root_uri": str(tmpdir)}
taskgraph.run(varinfo=varinfo)
taskgraph.execute(varinfo=varinfo)
assert_taskgraph_result(taskgraph, expected, varinfo)
......@@ -30,7 +30,7 @@ def test_cyclic_execution(tmpdir):
taskgraph = load_graph(g)
varinfo = {"root_uri": str(tmpdir)}
with pytest.raises(RuntimeError):
taskgraph.run(varinfo=varinfo)
taskgraph.execute(varinfo=varinfo)
def test_start_nodes(tmpdir):
......
......@@ -45,7 +45,7 @@ def test_task_readonly_input(variable_kwargs):
def test_task_optional_input(tmpdir, variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
assert task.outputs.result == 10
expected = [{"result": str(task.output_variables["result"].uhash)}, 10]
......@@ -55,7 +55,7 @@ def test_task_optional_input(tmpdir, variable_kwargs):
def test_task_done(variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
task = SumTask(a=10, **variable_kwargs)
......@@ -63,7 +63,7 @@ def test_task_done(variable_kwargs):
task = SumTask(a=10)
assert not task.done
task.run()
task.execute()
assert task.done
task = SumTask(a=10)
......@@ -85,7 +85,7 @@ def test_task_uhash(variable_kwargs):
def test_task_storage(tmpdir, variable_kwargs):
task = SumTask(a=10, b=2, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
assert task.outputs.result == 12
expected = [{"result": str(task.output_variables["result"].uhash)}, 12]
......@@ -98,7 +98,7 @@ def test_task_storage(tmpdir, variable_kwargs):
task = SumTask(a=2, b=10, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
assert task.outputs.result == 12
expected += [{"result": str(task.output_variables["result"].uhash)}, 12]
......@@ -106,7 +106,7 @@ def test_task_storage(tmpdir, variable_kwargs):
task = SumTask(a=task.output_variables["result"], b=0, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
assert task.outputs.result == 12
expected += [{"result": str(task.output_variables["result"].uhash)}, 12]
......@@ -114,7 +114,7 @@ def test_task_storage(tmpdir, variable_kwargs):
task = SumTask(a=1, b=task.output_variables["result"].uhash, **variable_kwargs)
assert not task.done
task.run()
task.execute()
assert task.done
assert task.outputs.result == 13
expected += [{"result": str(task.output_variables["result"].uhash)}, 13]
......@@ -125,7 +125,7 @@ def test_method_task(variable_kwargs):
task = Task.instantiate(
"MethodExecutorTask", method=qualname(mymethod), a=3, b=5, **variable_kwargs
)
task.run()
task.execute()
assert task.done
assert task.output_values == {"return_value": {"result": 8}}
......@@ -138,7 +138,7 @@ def test_ppfmethod_task(variable_kwargs):
b=5,
**variable_kwargs,
)
task.run()
task.execute()
assert task.done
assert task.output_values == {"ppfdict": {"a": 8, "b": 5}}
......@@ -179,7 +179,7 @@ def test_python_script_task(tmpdir, variable_kwargs, capsys):
task = Task.instantiate(
"ScriptExecutorTask", script=str(pyscriptname), a=10, **variable_kwargs
)
task.run()
task.execute()
assert task.done
assert task.outputs.returncode == 0
captured = capsys.readouterr()
......@@ -196,7 +196,7 @@ def test_shell_script_task(tmpdir, variable_kwargs, capsys):
task = Task.instantiate(
"ScriptExecutorTask", script=str(shellscriptname), a=10, **variable_kwargs
)
task.run()
task.execute()
assert task.done
assert task.outputs.returncode == 0
captured = capsys.readouterr()
......
%% Cell type:markdown id:lucky-florist tags:
# Prepare for persistent storage of results
%% Cell type:code id:automotive-bulgaria tags:
``` python
import os
import shutil
varinfo = {"root_uri": "/tmp/myresults"}
def prepare_results(clean=True):
if clean:
shutil.rmtree(varinfo["root_uri"], ignore_errors=True)
os.makedirs(varinfo["root_uri"], exist_ok=True)
```
%% Cell type:markdown id:hindu-practice tags:
# Bindings for task schedulers
%% Cell type:code id:falling-trace tags:
``` python
schedulers = {}
```
%% Cell type:code id:small-reputation tags:
``` python
def sequential_execution(taskgraph, varinfo):
# runs in a single thread
from esrftaskgraph import load_graph
runtime_taskgraph = load_graph(taskgraph)
runtime_taskgraph.run(varinfo=varinfo)
runtime_taskgraph.execute(varinfo=varinfo)
schedulers[None] = sequential_execution
```
%% Cell type:code id:latin-width tags:
``` python
def multithreading_scheduler(taskgraph, varinfo):
# tasks are distributed over threads
import esrf2paradag
esrf2paradag.job(taskgraph, varinfo=varinfo)
schedulers["multithreading"] = multithreading_scheduler
```
%% Cell type:code id:operational-school tags:
``` python
def multiprocessing_scheduler(taskgraph, varinfo):
# tasks are distributed over processes
import esrf2multiprocessing
esrf2multiprocessing.job(taskgraph, varinfo=varinfo)
schedulers["multiprocessing"] = multiprocessing_scheduler
```
%% Cell type:code id:stunning-object tags:
``` python
def pypushflow_scheduler(taskgraph, varinfo):
# tasks are distributed over processes
import esrf2pypushflow
esrf2pypushflow.job(taskgraph, varinfo=varinfo)
schedulers["pypushflow"] = pypushflow_scheduler
```
%% Cell type:code id:sexual-beauty tags:
``` python
def luigi_scheduler(taskgraph, varinfo, **kw):
# tasks are distributed by local or centralized scheduler
import esrf2luigi
esrf2luigi.job(taskgraph, varinfo=varinfo, **kw)
schedulers["luigi"] = luigi_scheduler
```
%% Cell type:code id:accredited-basin tags:
``` python
def dask_scheduler(taskgraph, varinfo, **kw):
# tasks are distributed by local or centralized scheduler
import esrf2dask
esrf2dask.job(taskgraph, varinfo=varinfo, **kw)
schedulers["dask"] = dask_scheduler
```
%% Cell type:markdown id:requested-township tags:
### Load a task graph
%% Cell type:code id:communist-disco tags:
``` python
from taskgraphlib import acyclic_graph1
persistent_taskgraph, expected_results = acyclic_graph1()
```
%% Cell type:markdown id:indoor-drink tags:
### Show the task graph
%% Cell type:code id:legal-marking tags:
``` python
import networkx
from pprint import pprint
import matplotlib.pyplot as plt
from esrftaskgraph import load_graph
pprint(persistent_taskgraph)
runtime_taskgraph = load_graph(persistent_taskgraph)
networkx.draw(runtime_taskgraph.graph, with_labels=True)
plt.show()
```
%%%% Output: stream
{'directed': True,
'graph': {'name': 'taskgraphlib.taskgraphs.acyclic_graph1'},
'links': [{'arguments': {'a': 'result'}, 'source': 'task1', 'target': 'task3'},
{'arguments': {'a': 'result'}, 'source': 'task2', 'target': 'task4'},
{'arguments': {'a': 'result'}, 'source': 'task3', 'target': 'task5'},
{'arguments': {'b': 'result'}, 'source': 'task4', 'target': 'task5'},
{'arguments': {'a': 'result'},
'source': 'task5',
'target': 'task6'}],
'multigraph': False,
'nodes': [{'class': 'tasklib.tasks.SumTask',
'id': 'task1',
'inputs': {'a': 1}},
{'class': 'tasklib.tasks.SumTask',
'id': 'task2',
'inputs': {'a': 2}},
{'class': 'tasklib.tasks.SumTask',
'id': 'task3',
'inputs': {'b': 3}},
{'class': 'tasklib.tasks.SumTask',
'id': 'task4',
'inputs': {'b': 4}},
{'class': 'tasklib.tasks.SumTask',
'id': 'task5',
'inputs': {'b': 5}},
{'class': 'tasklib.tasks.SumTask',
'id': 'task6',
'inputs': {'b': 6}}]}
%%%% Output: display_data
![]()
%% Cell type:markdown id:original-buddy tags:
### Execute the task graph
%% Cell type:code id:acknowledged-sodium tags:
``` python
# Select scheduler
local_scheduler = True
scheduler = schedulers["dask"]
# Schedulers options
scheduler_options = dict()
if local_scheduler:
varinfo["root_uri"] = "/tmp/myresults"
if scheduler is dask_scheduler:
scheduler_options["scheduler"] = "multithreading"
else:
varinfo["root_uri"] = "/data/id21/tmp/myresults"
if scheduler is luigi_scheduler:
# Run "luigid" on any host
scheduler_options["scheduler"] = {"local_scheduler": local_scheduler,
"scheduler_host": "localhost",
"scheduler_port":8082,
"workers": 10}
elif scheduler is dask_scheduler:
# Run this on any host:
# >>> from esrf2dask import local_scheduler
# >>> scheduler = local_scheduler(n_workers=5)
#
# Or run this on slurm-access or any slurm node:
# >>> from esrf2dask import slurm_scheduler
# >>> scheduler = slurm_scheduler(maximum_jobs=5)
scheduler_options["scheduler"] = {"address": "160.103.228.113:35881"}
# Prepare location for results
prepare_results(clean=True)
%timeit -n 1 -r 1 scheduler(persistent_taskgraph, varinfo, **scheduler_options)
```
%%%% Output: stream
210 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%% Cell type:markdown id:indoor-bangladesh tags:
### Show the results of each task
%% Cell type:code id:antique-moderator tags:
``` python
tasks = dict()
print(f"Task graph results {repr(runtime_taskgraph)}:")
for node, value in expected_results.items():
task = runtime_taskgraph.instantiate_task_static(node, tasks=tasks, varinfo=varinfo)
assert task.done, str(task)
assert task.output_values == value, str(task)
print(f"\ntask {repr(node)}:\n Result: {value}\n Storage: {task.outputs}")
```
%%%% Output: stream
Task graph results taskgraphlib.taskgraphs.acyclic_graph1:
task 'task1':
Result: {'result': 1}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc90ef89340>
task 'task2':
Result: {'result': 2}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc911007e80>
task 'task3':
Result: {'result': 4}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc95b7ec460>
task 'task4':
Result: {'result': 6}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc90eb6ea60>
task 'task5':
Result: {'result': 10}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc90eb6e550>
task 'task6':
Result: {'result': 16}
Storage: <esrftaskgraph.variable.VariableContainerNamespace object at 0x7fc90eb6e5e0>
......
......@@ -22,7 +22,7 @@ def multiprocess_scheduler(taskgraph, varinfo):
def singlethread_scheduler(taskgraph, varinfo):
# runs in a single thread
taskgraph.run(varinfo)
taskgraph.execute(varinfo)
if __name__ == "__main__":
......
%% Cell type:markdown id:dominican-grave tags:
# Load an run a task graph
%% Cell type:code id:explicit-stake tags:
``` python
import networkx
import matplotlib.pyplot as plt
from esrftaskgraph import load_graph
from taskgraphlib import acyclic_graph1
varinfo = {"root_uri": "/tmp/myresults"}
persistent_taskgraph, expected_results = acyclic_graph1()
runtime_taskgraph = load_graph(persistent_taskgraph)
runtime_taskgraph.run(varinfo=varinfo)
runtime_taskgraph.execute(varinfo=varinfo)
networkx.draw(runtime_taskgraph.graph, with_labels=True)
plt.show()
```
%%%% Output: display_data
![]()
%% Cell type:markdown id:academic-rebound tags:
# Instantiating a task
A task can always be instantiated and executed from the runtime representation of a task graph. No results from upstream tasks need to be provided explicitely. The runtime representation can be instantiated from the persistent representation with `load_graph`.
%% Cell type:code id:liked-belly tags:
``` python
import inspect
def print_task_sourcecode(task):
lines, line_nb = inspect.getsourcelines(type(task))
print(" \nSource code:\n" + " ".join(lines))
def print_task_info(name, task):
print(f"{name}: ")
print(f" attributes = {runtime_taskgraph.graph.nodes[name]}")
print(f" uhash = {task.uhash}")
print(f" done = {task.done}")
if task.done:
print(f" result = {task.outputs.result}")
else:
print(" result = <not available>")
def instantiate_task(name):
task = runtime_taskgraph.instantiate_task_static(name, varinfo=varinfo)
print_task_info(name, task)
return task
task = instantiate_task("task6")
print_task_sourcecode(task)
```
%%%% Output: stream
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}