Commit 0743f329 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Rename Task.process to Task.run

parent 05592ac0
......@@ -3,7 +3,7 @@ from esrftaskgraph.utils import import_method
class MethodExecutorTask(Task, input_names=["method"], output_names=["return_value"]):
def process(self):
def run(self):
method_kwargs = self.input_values
fullname = method_kwargs.pop("method")
method = import_method(fullname)
......
......@@ -13,7 +13,7 @@ class PpfMethodExecutorTask(
arguments and passed to the method.
"""
def process(self):
def run(self):
method_kwargs = self.input_values
fullname = method_kwargs.pop("method")
method = import_method(fullname)
......@@ -32,7 +32,7 @@ class PpfPortTask(
):
"""A ppfmethod which represents the identity mapping"""
def process(self):
def run(self):
method_kwargs = self.input_values
method_kwargs.pop("ppfport") # not used
ppfdict = method_kwargs.pop("ppfdict", None)
......
......@@ -4,7 +4,7 @@ from esrftaskgraph.task import Task
class ScriptExecutorTask(Task, input_names=["script"], output_names=["returncode"]):
def process(self):
def run(self):
fullname = self.inputs.script
if not isinstance(fullname, str):
raise TypeError(fullname, type(fullname))
......
......@@ -202,13 +202,13 @@ class Task(Registered, hashing.UniversalHashable, register=False):
if self.done:
return
self.assert_ready_to_execute()
self.process()
self.run()
self._outputs.dump()
except Exception as e:
self._exception = e
if raise_on_error:
raise
def process(self):
def run(self):
"""To be implemented by the derived classes"""
raise NotImplementedError
......@@ -194,7 +194,7 @@ class Variable(hashing.UniversalHashable):
def force_non_existing(self):
while self.exists:
super().uhash_randomize()
self.uhash_randomize()
class VariableContainer(Mapping, Variable):
......
%% Cell type:markdown id:dominican-grave tags:
# Load an run a task graph
%% Cell type:code id:explicit-stake tags:
``` python
import networkx
import matplotlib.pyplot as plt
from esrftaskgraph import load_graph
from taskgraphlib import acyclic_graph1
varinfo = {"root_uri": "/tmp/myresults"}
persistent_taskgraph, expected_results = acyclic_graph1()
runtime_taskgraph = load_graph(persistent_taskgraph)
runtime_taskgraph.execute(varinfo=varinfo)
networkx.draw(runtime_taskgraph.graph, with_labels=True)
plt.show()
```
%%%% Output: display_data
![]()
%% Cell type:markdown id:academic-rebound tags:
# Instantiating a task
A task can always be instantiated and executed from the runtime representation of a task graph. No results from upstream tasks need to be provided explicitely. The runtime representation can be instantiated from the persistent representation with `load_graph`.
%% Cell type:code id:liked-belly tags:
``` python
import inspect
def print_task_sourcecode(task):
lines, line_nb = inspect.getsourcelines(type(task))
print(" \nSource code:\n" + " ".join(lines))
def print_task_info(name, task):
print(f"{name}: ")
print(f" attributes = {runtime_taskgraph.graph.nodes[name]}")
print(f" uhash = {task.uhash}")
print(f" done = {task.done}")
if task.done:
print(f" result = {task.outputs.result}")
else:
print(" result = <not available>")
def instantiate_task(name):
task = runtime_taskgraph.instantiate_task_static(name, varinfo=varinfo)
print_task_info(name, task)
return task
task = instantiate_task("task6")
print_task_sourcecode(task)
```
%%%% Output: stream
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = 6273c01a95d490123dc64bc2b8acc852c2a64244552d073029b8e0a5597a4395
done = True
result = 16
Source code:
class SumTask(
Task, input_names=["a"], optional_input_names=["b"], output_names=["result"]
):
def process(self):
def run(self):
result = self.inputs.a
if self.inputs.b:
result += self.inputs.b
self.outputs.result = result
%% Cell type:markdown id:bronze-wellington tags:
# Modify and rerun task graphs
When modifying an input, re-running the task graph will only execute that tasks that are affected by this modification
%% Cell type:code id:permanent-processing tags:
``` python
modify_name = "task1"
dependent_name = "task6"
independent_name = "task4"
upstream = runtime_taskgraph.graph.nodes[modify_name]
upstream["inputs"]["a"] += 1
print("Modify inputs:")
task = instantiate_task(modify_name)
print("\nIndependent task:")
task = instantiate_task(independent_name)
print("\nDependent task:")
task = instantiate_task(dependent_name)
print("\nRun the new graph instance ...")
runtime_taskgraph.execute(varinfo)
print("\nDependent task is a different instance:")
task = instantiate_task(dependent_name)
print("\nIndependent task is still the same instance:")
task = instantiate_task(independent_name)
```
%%%% Output: stream
Modify inputs:
task1:
attributes = {'inputs': {'a': 2}, 'class': 'tasklib.tasks.SumTask'}
uhash = 3da320c1b5d863b447d1c3388d113cd1adbeeffc3e5f8125d225b95ac9ec852f
done = True
result = 2
Independent task:
task4:
attributes = {'inputs': {'b': 4}, 'class': 'tasklib.tasks.SumTask'}
uhash = f3d6c17410499f2bfca9e056a5c50a72d5682614fe905ba3afd0136ac5bc35cf
done = True
result = 6
Dependent task:
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = ab34985766dd54a4cb45c24865e926d06793b09c4afc0684cc7ae792f303b436
done = True
result = 17
Run the new graph instance ...
Dependent task is a different instance:
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = ab34985766dd54a4cb45c24865e926d06793b09c4afc0684cc7ae792f303b436
done = True
result = 17
Independent task is still the same instance:
task4:
attributes = {'inputs': {'b': 4}, 'class': 'tasklib.tasks.SumTask'}
uhash = f3d6c17410499f2bfca9e056a5c50a72d5682614fe905ba3afd0136ac5bc35cf
done = True
result = 6
%% Cell type:markdown id:armed-virgin tags:
# Task graph as a hash tree
Create a dictionary of all runtime representations of tasks
%% Cell type:code id:technical-agreement tags:
``` python
tasks = dict()
for name in runtime_taskgraph.graph.nodes:
runtime_taskgraph.instantiate_task_static(name, tasks=tasks, varinfo=varinfo)
```
%% Cell type:markdown id:chinese-correlation tags:
The same python object of a task represents a different task instance when modifying upstream input as a result of hash linking
%% Cell type:code id:undefined-rebound tags:
``` python
modify_name = "task1"
dependent_name = "task6"
task = tasks[dependent_name]
print("Task represented by the python object:")
print_task_info(dependent_name, task)
print("\nModify upstream input:")
tasks[modify_name].input_variables["a"].value += 1
print_task_info(dependent_name, task)
print("\nReset upstream input:")
tasks[modify_name].input_variables["a"].value -= 1
print_task_info(dependent_name, task)
```
%%%% Output: stream
Task represented by the python object:
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = ab34985766dd54a4cb45c24865e926d06793b09c4afc0684cc7ae792f303b436
done = True
result = 17
Modify upstream input:
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = 67ba60d3a7f4a986bb55b937bdde080c464aba2bf506f381b0150b8a49e95b74
done = False
result = <not available>
Reset upstream input:
task6:
attributes = {'inputs': {'b': 6}, 'class': 'tasklib.tasks.SumTask'}
uhash = ab34985766dd54a4cb45c24865e926d06793b09c4afc0684cc7ae792f303b436
done = True
result = 17
......
......@@ -7,7 +7,7 @@ __all__ = ["SumTask", "CondSumTask"]
class SumTask(
Task, input_names=["a"], optional_input_names=["b"], output_names=["result"]
):
def process(self):
def run(self):
result = self.inputs.a
if self.inputs.b:
result += self.inputs.b
......@@ -15,15 +15,15 @@ class SumTask(
class CondSumTask(SumTask, output_names=["too_small"]):
def process(self):
super().process()
def run(self):
super().run()
self.outputs.too_small = self.outputs.result < 10
class ErrorSumTask(
Task, optional_input_names=["a", "b", "raise_error"], output_names=["result"]
):
def process(self):
def run(self):
result = self.inputs.a
if result is self.MISSING_DATA:
result = 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment