Commit 4b7317b7 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

esrftaskgraph: do not unpack inputs when passing to the Task constructor

parent 0743f329
......@@ -12,7 +12,6 @@ from pypushflow.AbstractActor import AbstractActor
from esrftaskgraph import load_graph
from esrf2pypushflow import ppfrunscript
from esrftaskgraph import Variable
from esrftaskgraph.variable import VARINFO
from esrftaskgraph.inittask import task_executable
......@@ -80,8 +79,8 @@ class DecodeRouterActor(RouterActor):
uhash = inData[self.itemName]
else:
return self.OTHERVALUE
kw = {VARINFO: inData[ppfrunscript.INFOKEY][VARINFO]}
value = Variable(uhash=uhash, **kw).value
varinfo = inData[ppfrunscript.INFOKEY]["varinfo"]
value = Variable(uhash=uhash, varinfo=varinfo).value
if self.is_ppfmethod:
if self.itemName in value:
value = value[self.itemName]
......@@ -104,7 +103,7 @@ class DecodeRouterActor(RouterActor):
def trigger(self, inData):
self.setStarted()
self.setFinished()
varinfo = inData[ppfrunscript.INFOKEY][VARINFO]
varinfo = inData[ppfrunscript.INFOKEY]["varinfo"]
if varinfo.get("root_uri", None):
value = self._extractPersistentValue(inData)
else:
......@@ -159,7 +158,7 @@ class EsrfWorkflow(Workflow):
# When triggering a task, the output dict of the previous task
# is merged with the input dict of the current task.
self.startargs = {
ppfrunscript.INFOKEY: {VARINFO: varinfo, "enable_logging": False}
ppfrunscript.INFOKEY: {"varinfo": varinfo, "enable_logging": False}
}
self.graph_to_actors(esrfgraph, varinfo)
......
import logging
from esrftaskgraph import UniversalHash
from esrftaskgraph import instantiate_task
from esrftaskgraph.variable import VARINFO
INFOKEY = "_noinput"
......@@ -18,7 +17,7 @@ def run(**inputs):
"""
info = inputs.pop(INFOKEY)
log = info.get("enable_logging")
varinfo = info[VARINFO]
varinfo = info["varinfo"]
presistent = bool(varinfo.get("root_uri", None))
if presistent:
inputs = {name: UniversalHash(uhash) for name, uhash in inputs.items()}
......
......@@ -5,7 +5,6 @@ from esrftaskgraph.ppftasks import PpfMethodExecutorTask
from esrftaskgraph.ppftasks import PpfPortTask
from esrftaskgraph.utils import import_method
from esrftaskgraph.utils import import_qualname
from esrftaskgraph.variable import VARINFO
TASK_EXECUTABLE_ATTRIBUTE = (
"class",
......@@ -69,25 +68,27 @@ def instantiate_task(node_attrs, varinfo=None, inputs=None, node_name=""):
:returns Task:
"""
# Static inputs
kwargs = dict(node_attrs.get("inputs", dict()))
task_inputs = dict(node_attrs.get("inputs", dict()))
# Dynamic inputs (from other tasks)
if inputs:
kwargs.update(inputs)
# `Variable` arguments
kwargs[VARINFO] = varinfo
task_inputs.update(inputs)
# Instantiate task
key, value = task_executable_key(node_attrs, node_name=node_name)
if key == "class":
return Task.instantiate(value, **kwargs)
return Task.instantiate(value, inputs=task_inputs, varinfo=varinfo)
elif key == "method":
return MethodExecutorTask(method=value, **kwargs)
task_inputs["method"] = value
return MethodExecutorTask(inputs=task_inputs, varinfo=varinfo)
elif key == "ppfmethod":
return PpfMethodExecutorTask(method=value, **kwargs)
task_inputs["method"] = value
return PpfMethodExecutorTask(inputs=task_inputs, varinfo=varinfo)
elif key == "ppfport":
return PpfPortTask(ppfport=value, **kwargs)
task_inputs["ppfport"] = value
return PpfPortTask(inputs=task_inputs, varinfo=varinfo)
elif key == "script":
return ScriptExecutorTask(script=value, **kwargs)
task_inputs["script"] = value
return ScriptExecutorTask(inputs=task_inputs, varinfo=varinfo)
else:
raise_task_error(node_name, all=False)
......
from collections.abc import Mapping
from esrftaskgraph import hashing
from esrftaskgraph.variable import VARINFO
from esrftaskgraph.variable import VariableContainer
from esrftaskgraph.variable import VariableContainerNamespace
from esrftaskgraph.variable import ReadOnlyVariableContainerNamespace
......@@ -27,9 +27,12 @@ class Task(Registered, hashing.UniversalHashable, register=False):
_OPTIONAL_INPUT_NAMES = set()
_OUTPUT_NAMES = set()
def __init__(self, **inputs):
def __init__(self, inputs=None, varinfo=None):
"""The named arguments are inputs and Variable configuration"""
varkw = {VARINFO: inputs.pop(VARINFO, dict())}
if inputs is None:
inputs = dict()
elif not isinstance(inputs, Mapping):
raise TypeError(inputs, type(inputs))
# Check required inputs
missing_required = set(self._INPUT_NAMES) - set(inputs.keys())
......@@ -49,9 +52,12 @@ class Task(Registered, hashing.UniversalHashable, register=False):
# The output hash will update dynamically if any of the input
# variables change
self._inputs = VariableContainer(value=inputs, **varkw)
self._inputs = VariableContainer(value=inputs, varinfo=varinfo)
self._outputs = VariableContainer(
value=ovars, uhash=self._inputs, uhash_nonce=self.class_nonce(), **varkw
value=ovars,
uhash=self._inputs,
uhash_nonce=self.class_nonce(),
varinfo=varinfo,
)
self._user_inputs = ReadOnlyVariableContainerNamespace(self._inputs)
......@@ -89,15 +95,9 @@ class Task(Registered, hashing.UniversalHashable, register=False):
)
subclass._OUTPUT_NAMES = subclass._OUTPUT_NAMES | set(output_names)
_RESERVED_VARIABLE_NAMES = None
@staticmethod
def _reserved_variable_names():
if Task._RESERVED_VARIABLE_NAMES is None:
Task._RESERVED_VARIABLE_NAMES = (
VariableContainerNamespace._reserved_variable_names() | {VARINFO}
)
return Task._RESERVED_VARIABLE_NAMES
return VariableContainerNamespace._reserved_variable_names()
@classmethod
def instantiate(cls, name, **kw):
......
......@@ -44,9 +44,6 @@ def atomic_write(filename):
os.rename(tmpname, filename)
VARINFO = "_varinfo"
class Variable(hashing.UniversalHashable):
"""Has a runtime representation (python object) and a persistent
representation (JSON).
......@@ -54,14 +51,17 @@ class Variable(hashing.UniversalHashable):
TODO: make abstraction of persistent representation
"""
def __init__(self, value=hashing.UniversalHashable.MISSING_DATA, **kw):
def __init__(
self, value=hashing.UniversalHashable.MISSING_DATA, varinfo=None, **kw
):
"""
:param value: the runtime representation
:param dict varinfo:
:param **kw: see `UniversalHashable`
"""
varinfo = kw.pop(VARINFO, dict())
if not isinstance(varinfo, Mapping):
if varinfo is None:
varinfo = dict()
elif not isinstance(varinfo, Mapping):
raise TypeError(varinfo, type(varinfo))
self._root_uri = varinfo.get("root_uri")
self._disable_persistency = not self._root_uri
......
import pytest
from esrftaskgraph.variable import VARINFO
@pytest.fixture
def variable_kwargs(tmpdir):
yield {VARINFO: {"root_uri": str(tmpdir)}}
def varinfo(tmpdir):
yield {"root_uri": str(tmpdir)}
......@@ -31,19 +31,19 @@ def test_no_public_reserved_names():
assert not [s for s in Task._reserved_variable_names() if not s.startswith("_")]
def test_task_missing_input(variable_kwargs):
def test_task_missing_input():
with pytest.raises(ValueError):
SumTask(**variable_kwargs)
SumTask()
def test_task_readonly_input(variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
def test_task_readonly_input():
task = SumTask(inputs={"a": 10})
with pytest.raises(RuntimeError):
task.inputs.a = 10
def test_task_optional_input(tmpdir, variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
def test_task_optional_input(tmpdir, varinfo):
task = SumTask(inputs={"a": 10}, varinfo=varinfo)
assert not task.done
task.execute()
assert task.done
......@@ -52,26 +52,26 @@ def test_task_optional_input(tmpdir, variable_kwargs):
assert_storage(tmpdir, expected)
def test_task_done(variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
def test_task_done(varinfo):
task = SumTask(inputs={"a": 10}, varinfo=varinfo)
assert not task.done
task.execute()
assert task.done
task = SumTask(a=10, **variable_kwargs)
task = SumTask(inputs={"a": 10}, varinfo=varinfo)
assert task.done
task = SumTask(a=10)
task = SumTask(inputs={"a": 10})
assert not task.done
task.execute()
assert task.done
task = SumTask(a=10)
task = SumTask(inputs={"a": 10})
assert not task.done
def test_task_uhash(variable_kwargs):
task = SumTask(a=10, **variable_kwargs)
def test_task_uhash(varinfo):
task = SumTask(inputs={"a": 10}, varinfo=varinfo)
uhash = task.uhash
assert task.uhash == task.output_variables.uhash
assert task.uhash != task.input_variables.uhash
......@@ -82,8 +82,8 @@ def test_task_uhash(variable_kwargs):
assert task.uhash != task.input_variables.uhash
def test_task_storage(tmpdir, variable_kwargs):
task = SumTask(a=10, b=2, **variable_kwargs)
def test_task_storage(tmpdir, varinfo):
task = SumTask(inputs={"a": 10, "b": 2}, varinfo=varinfo)
assert not task.done
task.execute()
assert task.done
......@@ -91,12 +91,12 @@ def test_task_storage(tmpdir, variable_kwargs):
expected = [{"result": str(task.output_variables["result"].uhash)}, 12]
assert_storage(tmpdir, expected)
task = SumTask(a=10, b=2, **variable_kwargs)
task = SumTask(inputs={"a": 10, "b": 2}, varinfo=varinfo)
assert task.done
assert task.outputs.result == 12
assert_storage(tmpdir, expected)
task = SumTask(a=2, b=10, **variable_kwargs)
task = SumTask({"a": 2, "b": 10}, varinfo=varinfo)
assert not task.done
task.execute()
assert task.done
......@@ -104,7 +104,7 @@ def test_task_storage(tmpdir, variable_kwargs):
expected += [{"result": str(task.output_variables["result"].uhash)}, 12]
assert_storage(tmpdir, expected)
task = SumTask(a=task.output_variables["result"], b=0, **variable_kwargs)
task = SumTask({"a": task.output_variables["result"], "b": 0}, varinfo=varinfo)
assert not task.done
task.execute()
assert task.done
......@@ -112,7 +112,9 @@ def test_task_storage(tmpdir, variable_kwargs):
expected += [{"result": str(task.output_variables["result"].uhash)}, 12]
assert_storage(tmpdir, expected)
task = SumTask(a=1, b=task.output_variables["result"].uhash, **variable_kwargs)
task = SumTask(
{"a": 1, "b": task.output_variables["result"].uhash}, varinfo=varinfo
)
assert not task.done
task.execute()
assert task.done
......@@ -121,22 +123,22 @@ def test_task_storage(tmpdir, variable_kwargs):
assert_storage(tmpdir, expected)
def test_method_task(variable_kwargs):
def test_method_task(varinfo):
task = Task.instantiate(
"MethodExecutorTask", method=qualname(mymethod), a=3, b=5, **variable_kwargs
"MethodExecutorTask",
inputs={"method": qualname(mymethod), "a": 3, "b": 5},
varinfo=varinfo,
)
task.execute()
assert task.done
assert task.output_values == {"return_value": {"result": 8}}
def test_ppfmethod_task(variable_kwargs):
def test_ppfmethod_task(varinfo):
task = Task.instantiate(
"PpfMethodExecutorTask",
method=qualname(myppfmethod),
a=3,
b=5,
**variable_kwargs,
inputs={"method": qualname(myppfmethod), "a": 3, "b": 5},
varinfo=varinfo,
)
task.execute()
assert task.done
......@@ -171,13 +173,15 @@ if __name__ == "__main__":
"""
def test_python_script_task(tmpdir, variable_kwargs, capsys):
def test_python_script_task(tmpdir, varinfo, capsys):
pyscriptname = tmpdir / "test.py"
with open(pyscriptname, mode="w") as f:
f.writelines(pyscript)
task = Task.instantiate(
"ScriptExecutorTask", script=str(pyscriptname), a=10, **variable_kwargs
"ScriptExecutorTask",
inputs={"a": 10, "script": str(pyscriptname)},
varinfo=varinfo,
)
task.execute()
assert task.done
......@@ -187,14 +191,16 @@ def test_python_script_task(tmpdir, variable_kwargs, capsys):
assert captured.err == ""
def test_shell_script_task(tmpdir, variable_kwargs, capsys):
def test_shell_script_task(tmpdir, varinfo, capsys):
shellscriptname = tmpdir / "test.sh"
with open(shellscriptname, mode="w") as f:
f.writelines(shellscript)
os.chmod(shellscriptname, 0o755)
task = Task.instantiate(
"ScriptExecutorTask", script=str(shellscriptname), a=10, **variable_kwargs
"ScriptExecutorTask",
inputs={"a": 10, "script": str(shellscriptname)},
varinfo=varinfo,
)
task.execute()
assert task.done
......
......@@ -5,8 +5,8 @@ from esrftaskgraph.variable import MutableVariableContainer
VALUES = [None, True, 10, "string", 10.1, [1, 2, 3], {"1": 1, "2": {"2": [10, 20]}}]
def test_variable_missing_data(variable_kwargs):
v = Variable(**variable_kwargs)
def test_variable_missing_data(varinfo):
v = Variable(varinfo=varinfo)
assert not v.available
assert not v.exists
assert not v.value
......@@ -29,12 +29,12 @@ def test_variable_none_uhash():
assert v4.uhash is None
def test_variable_uhash(variable_kwargs):
def test_variable_uhash(varinfo):
for value in VALUES:
v1 = Variable(value, **variable_kwargs)
v2 = Variable(value, **variable_kwargs)
v3 = Variable(uhash=v1, **variable_kwargs)
v4 = Variable(uhash=v1.uhash, **variable_kwargs)
v1 = Variable(value, varinfo=varinfo)
v2 = Variable(value, varinfo=varinfo)
v3 = Variable(uhash=v1, varinfo=varinfo)
v4 = Variable(uhash=v1.uhash, varinfo=varinfo)
assert v1.uhash == v2.uhash
assert v1.uhash == v3.uhash
assert v1.uhash == v4.uhash
......@@ -44,26 +44,26 @@ def test_variable_uhash(variable_kwargs):
assert v1.uhash != v4.uhash
def test_variable_nonce(variable_kwargs):
v1 = Variable(9999, **variable_kwargs)
v2 = Variable(value=9999, uhash_nonce=1, **variable_kwargs)
def test_variable_nonce(varinfo):
v1 = Variable(9999, varinfo=varinfo)
v2 = Variable(value=9999, uhash_nonce=1, varinfo=varinfo)
assert v1.uhash != v2.uhash
assert v1 != v2
assert v1.value == v2.value
v2 = Variable(uhash=v1, uhash_nonce=1, **variable_kwargs)
v2 = Variable(uhash=v1, uhash_nonce=1, varinfo=varinfo)
assert v1.uhash != v2.uhash
assert v1 != v2
assert v1.value != v2.value
v2 = Variable(uhash=v1.uhash, uhash_nonce=1, **variable_kwargs)
v2 = Variable(uhash=v1.uhash, uhash_nonce=1, varinfo=varinfo)
assert v1.uhash != v2.uhash
assert v1 != v2
assert v1.value != v2.value
def test_variable_compare(variable_kwargs):
def test_variable_compare(varinfo):
for value in VALUES:
v1 = Variable(value, **variable_kwargs)
v2 = Variable(value, **variable_kwargs)
v1 = Variable(value, varinfo=varinfo)
v2 = Variable(value, varinfo=varinfo)
assert v1 == v2
assert v1 == value
assert v2 == value
......@@ -73,10 +73,10 @@ def test_variable_compare(variable_kwargs):
assert v2 == value
def test_variable_uri(variable_kwargs):
def test_variable_uri(varinfo):
for value in VALUES:
v1 = Variable(value, **variable_kwargs)
v2 = Variable(value, **variable_kwargs)
v1 = Variable(value, varinfo=varinfo)
v2 = Variable(value, varinfo=varinfo)
assert v1.uri is not None
assert v1.uri == v2.uri
v1.value = 99999
......@@ -84,9 +84,9 @@ def test_variable_uri(variable_kwargs):
assert v1.uri != v2.uri
def test_variable_chain(variable_kwargs):
v1 = Variable(9999, **variable_kwargs)
v2 = Variable(uhash=v1, **variable_kwargs)
def test_variable_chain(varinfo):
v1 = Variable(9999, varinfo=varinfo)
v2 = Variable(uhash=v1, varinfo=varinfo)
assert v1 == v1
v1.value += 1
assert v1 == v2
......@@ -100,12 +100,12 @@ def test_variable_chain(variable_kwargs):
v2.validate()
def test_variable_persistency(variable_kwargs):
def test_variable_persistency(varinfo):
for value in VALUES:
v1 = Variable(value, **variable_kwargs)
v2 = Variable(value, **variable_kwargs)
v3 = Variable(uhash=v1.uhash, **variable_kwargs)
v4 = Variable(uhash=v2, **variable_kwargs)
v1 = Variable(value, varinfo=varinfo)
v2 = Variable(value, varinfo=varinfo)
v3 = Variable(uhash=v1.uhash, varinfo=varinfo)
v4 = Variable(uhash=v2, varinfo=varinfo)
assert not v1.exists
assert not v2.exists
......@@ -131,12 +131,12 @@ def test_variable_persistency(variable_kwargs):
v4.validate()
def test_variable_container_uhash(variable_kwargs):
def test_variable_container_uhash(varinfo):
values = {f"var{i}": value for i, value in enumerate(VALUES, 1)}
v1 = MutableVariableContainer(value=values, **variable_kwargs)
v2 = MutableVariableContainer(value=v1, **variable_kwargs)
v3 = MutableVariableContainer(uhash=v1, **variable_kwargs)
v4 = MutableVariableContainer(uhash=v1.uhash, **variable_kwargs)
v1 = MutableVariableContainer(value=values, varinfo=varinfo)
v2 = MutableVariableContainer(value=v1, varinfo=varinfo)
v3 = MutableVariableContainer(uhash=v1, varinfo=varinfo)
v4 = MutableVariableContainer(uhash=v1.uhash, varinfo=varinfo)
v1[next(iter(v1))].value = 9999
assert v1.uhash == v2.uhash
......@@ -144,12 +144,12 @@ def test_variable_container_uhash(variable_kwargs):
assert v1.uhash != v4.uhash
def test_variable_container_compare(tmpdir, variable_kwargs):
def test_variable_container_compare(tmpdir, varinfo):
values = {f"var{i}": value for i, value in enumerate(VALUES, 1)}
v1 = MutableVariableContainer(value=values, **variable_kwargs)
v2 = MutableVariableContainer(value=v1, **variable_kwargs)
v3 = MutableVariableContainer(uhash=v1, **variable_kwargs)
v4 = MutableVariableContainer(uhash=v1.uhash, **variable_kwargs)
v1 = MutableVariableContainer(value=values, varinfo=varinfo)
v2 = MutableVariableContainer(value=v1, varinfo=varinfo)
v3 = MutableVariableContainer(uhash=v1, varinfo=varinfo)
v4 = MutableVariableContainer(uhash=v1.uhash, varinfo=varinfo)
v1.dump()
v1[next(iter(v1))].value = 9999
......@@ -166,12 +166,12 @@ def test_variable_container_compare(tmpdir, variable_kwargs):
assert len(tmpdir.listdir()) == nfiles + 2
def test_variable_container_persistency(tmpdir, variable_kwargs):
def test_variable_container_persistency(tmpdir, varinfo):
values = {f"var{i}": value for i, value in enumerate(VALUES, 1)}
v1 = MutableVariableContainer(value=values, **variable_kwargs)
v2 = MutableVariableContainer(value=v1, **variable_kwargs)
v3 = MutableVariableContainer(uhash=v1, **variable_kwargs)
v4 = MutableVariableContainer(uhash=v1.uhash, **variable_kwargs)
v1 = MutableVariableContainer(value=values, varinfo=varinfo)
v2 = MutableVariableContainer(value=v1, varinfo=varinfo)
v3 = MutableVariableContainer(uhash=v1, varinfo=varinfo)
v4 = MutableVariableContainer(uhash=v1.uhash, varinfo=varinfo)
assert v1.keys() == v2.keys()
for v in v1.values():
......
This diff is collapsed.
......@@ -3,7 +3,6 @@ from pprint import pprint
import matplotlib.pyplot as plt
from esrftaskgraph import load_graph
from esrftaskgraph import Variable
from esrftaskgraph.variable import VARINFO
def assert_taskgraph_result(taskgraph, expected, varinfo):
......@@ -27,10 +26,9 @@ def assert_taskgraph_result(taskgraph, expected, varinfo):
def assert_taskgraph_result_output(result, expected, varinfo):
kw = {VARINFO: varinfo}
for k, v in expected.items():
uhash = result[k]
var = Variable(uhash=uhash, **kw)
var = Variable(uhash=uhash, varinfo=varinfo)
assert var.value == v
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment