Commit 99dbcfa8 authored by payno's avatar payno
Browse files

add another workflow test - diamond conditionnal connection

parent 320b0788
def run_incrementation(a=None, increment_value=None, **kwargs):
if a is None:
raise RuntimeError("Missing argument 'a'!")
if increment_value is None:
raise RuntimeError("Missing argument 'increment_value'!")
return {
"a": a + increment_value,
"b": increment_value
}
def no_processing(a=None, b=None, c=None, d=None, **kwargs):
print("receive a: {}, b: {} c: {}, d: {}".format(a, b, c, d))
return {
"a": d,
"b": b,
"c": c,
"d": d,
}
def pass_a(a=None, **kwargs):
if a is None:
raise RuntimeError("Missing argument 'value'!")
return {
"a": a,
}
def move_d_to_a(a=None, b=None, c=None, d=None):
if a is None:
raise RuntimeError("Missing argument 'a'!")
if b is None:
raise RuntimeError("Missing argument 'b'!")
if c is None:
raise RuntimeError("Missing argument 'c'!")
if d is None:
raise RuntimeError("Missing argument 'd'!")
return {
"a": d,
"b": b,
"c": c,
}
\ No newline at end of file
from esrf2pypushflow import job
def workflow19():
sum3 = "tests.test_ppf_actors.pythonActorAddABC2D.run"
incrementation = "tests.test_ppf_actors.pythonActorDiamondTest.run_incrementation"
no_processing = "tests.test_ppf_actors.pythonActorDiamondTest.no_processing"
pass_a = "tests.test_ppf_actors.pythonActorDiamondTest.pass_a"
move_d_to_a = "tests.test_ppf_actors.pythonActorDiamondTest.move_d_to_a"
nodes = [
{"id": "task1", "ppfmethod": sum3, "inputs": {"a": 1, "b": 2, "c": 4}},
{"id": "task2", "ppfmethod": move_d_to_a},
{"id": "task3", "ppfmethod": incrementation, "inputs": {"increment_value": 1,}},
{"id": "task4", "ppfmethod": no_processing},
{"id": "task5", "ppfmethod": no_processing},
]
links = [
{"source": "task1", "target": "task2", "all_arguments": True},
{
"source": "task2",
"target": "task3",
"all_arguments": True,
"conditions": {"d": 7},
},
{
"source": "task2",
"target": "task4",
"all_arguments": True,
"conditions": {"d": 8},
},
{
"source": "task2",
"target": "task5",
"all_arguments": True,
"conditions": {"d": "other"},
},
{"source": "task3", "target": "task1", "all_arguments": True},
{"source": "task4", "target": "task1", "all_arguments": True},
]
graph = {
"graph": {"name": "workflow19"},
"links": links,
"nodes": nodes,
}
expected_result = {"ppfdict": {"a": 19, "b": 2, "c": 4}}
expected_result = {"ppfdict": {"a": 8, "b": 1, "c": 4, "d": 7, "increment_value": 1}}
return graph, expected_result
def test_workflow19(ppf_logging):
"""Test 2 unconditional upstream tasks, one coming from a feedback loop"""
graph, expected = workflow19()
result = job(graph)
# print("result is", result["ppfdict"])
for k, v in expected.items():
assert result[k] == v, k
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment