Skip to content
Snippets Groups Projects

Draft: Add test of execution of workflow with self-triggering task

Open Loic Huder requested to merge self-trigger-test into main
1 file
+ 142
0
Compare changes
  • Side-by-side
  • Inline
+ 142
0
 
import pytest
 
import os.path
 
from ewokscore import Task
 
from ewoksppf import execute_graph
 
 
 
qualname = __name__
 
 
 
def test_explicit_start_node_on_self_triggering_node(tmpdir):
 
"""
 
Workflow:
 
- LOOP depends on itself (self-triggering task).
 
- SAVE depends on LOOP and CONFIG
 
- CONFIG does not depend on anything
 
 
/|
 
/ |
 
LOOP -- SAVE
 
/
 
/
 
/
 
CONFIG
 
 
This test tests that, while graph analysis considers that the only
 
start node is CONFIG, it is possible to set `start_node` for LOOP
 
and make it a start node so that SAVE can get inputs from LOOP and
 
CONFIG.
 
"""
 
 
class ConfigTask(Task, input_names=["filename"], output_names=["config"]):
 
def run(self):
 
self.outputs.config = {"filename": self.inputs.filename}
 
 
class LoopTask(
 
Task,
 
input_names=["i", "n"],
 
output_names=["i", "keep_looping"],
 
):
 
def run(self):
 
self.outputs.i = self.inputs.i + 1
 
self.outputs.keep_looping = self.outputs.i < self.inputs.n
 
 
class SaveTask(
 
Task,
 
input_names=["config"],
 
optional_input_names=["i"],
 
output_names=["result"],
 
):
 
def run(self):
 
if self.missing_inputs.i:
 
raise RuntimeError("LOOP not executed!")
 
config = self.inputs.config
 
with open(config["filename"], "a") as out_file:
 
out_file.write(f"LOOP executed: {self.inputs.i}")
 
 
workflow = {
 
"graph": {"id": "testworkflow"},
 
"nodes": [
 
{
 
"id": "CONFIG",
 
"task_type": "class",
 
"task_identifier": f"{qualname}.ConfigTask",
 
},
 
{
 
"id": "LOOP",
 
"task_type": "class",
 
"task_identifier": f"{qualname}.LoopTask",
 
},
 
{
 
"id": "SAVE",
 
"task_type": "class",
 
"task_identifier": f"{qualname}.SaveTask",
 
},
 
],
 
"links": [
 
{
 
"source": "LOOP",
 
"target": "LOOP",
 
"data_mapping": [{"source_output": "i", "target_input": "i"}],
 
"conditions": [{"source_output": "keep_looping", "value": True}],
 
},
 
{
 
"source": "LOOP",
 
"target": "SAVE",
 
"data_mapping": [{"source_output": "i", "target_input": "i"}],
 
},
 
{
 
"source": "CONFIG",
 
"target": "SAVE",
 
"data_mapping": [{"source_output": "config", "target_input": "config"}],
 
},
 
],
 
}
 
 
filename = str(tmpdir / "test.txt")
 
max_iterations = 10
 
inputs = [
 
{
 
"task_identifier": f"{qualname}.ConfigTask",
 
"name": "filename",
 
"value": filename,
 
},
 
{
 
"task_identifier": f"{qualname}.LoopTask",
 
"name": "i",
 
"value": 0,
 
},
 
{
 
"task_identifier": f"{qualname}.LoopTask",
 
"name": "n",
 
"value": max_iterations,
 
},
 
]
 
 
# Execute without setting `force_start_node` in the LOOP node: the SAVE task fails.
 
with pytest.raises(RuntimeError) as e:
 
execute_graph(
 
workflow,
 
scaling_workers=False,
 
pool_type="thread",
 
inputs=inputs,
 
)
 
assert str(e) == "RuntimeError: Task 'SAVE' failed"
 
assert not os.path.exists(filename)
 
 
# Execute with `force_start_node` in the LOOP node; the SAVE task runs and the file exists.
 
loop_node = workflow["nodes"][1]
 
assert loop_node["id"] == "LOOP"
 
loop_node["force_start_node"] = True
 
 
execute_graph(
 
workflow,
 
scaling_workers=False,
 
pool_type="thread",
 
inputs=inputs,
 
)
 
 
with open(filename) as _file:
 
txt = _file.read()
 
for i in range(max_iterations):
 
assert str(i) in txt
Loading