Self-triggering workflow issues
Reported by @edgar
The example below exposes three issues
-
the "loop" node is not recognized as a start node (issue with ewokscore.graph.analysis.start_nodes
) ewokscore!236 (merged) -
we sometimes get "the pool is closed" errors (issue in pypushflow?) -
"save" cannot have required parameters because it fails when triggered by "config" (ewoksppf input merge actor issue?)
from ewokscore import Task
from ewoks import execute_graph
class ConfigTask(Task, output_names=["config"]):
def run(self):
print("config")
self.outputs.config = {}
class LoopTask(
Task,
input_names=["i", "n"],
optional_input_names=["config"],
output_names=["i", "keep_looping"],
):
def run(self):
self.outputs.i = self.inputs.i + 1
self.outputs.keep_looping = self.outputs.i < self.inputs.n
print("loop", self.outputs.i)
class SaveTask(
Task, input_names=[], optional_input_names=["i", "config"], output_names=["result"]
):
def run(self):
if self.missing_inputs.i:
print("skip save")
self.outputs.result = False
else:
print("save", self.inputs.i)
FILE.append(self.inputs.i)
self.outputs.result = True
nodes = [
{
"id": "config",
"task_type": "class",
"task_identifier": "__main__.ConfigTask",
},
{
"id": "loop",
"task_type": "class",
"task_identifier": "__main__.LoopTask",
"default_inputs": [{"name": "i", "value": 0}, {"name": "n", "value": 10}],
},
{
"id": "save",
"task_type": "class",
"task_identifier": "__main__.SaveTask",
},
]
links = [
{
"source": "loop",
"target": "loop",
"data_mapping": [{"source_output": "i", "target_input": "i"}],
"conditions": [{"source_output": "keep_looping", "value": True}],
},
{
"source": "loop",
"target": "save",
"data_mapping": [{"source_output": "i", "target_input": "i"}],
},
{
"source": "config",
"target": "save",
"data_mapping": [{"source_output": "config", "target_input": "config"}],
}
]
if True:
# Because loop cannot be a start node (see ewokscore graph analysis)
links.append({
"source": "config",
"target": "loop",
"data_mapping": [{"source_output": "config", "target_input": "config"}],
})
workflow = {"graph": {"id": "testworkflow"}, "nodes": nodes, "links": links}
import logging
# logging.basicConfig(level=logging.DEBUG)
FILE = []
result = execute_graph(workflow, engine="ppf", scaling_workers=False, pool_type="thread")
print(FILE)
Edited by Loic Huder