Skip to content
Snippets Groups Projects
Commit 3091897a authored by Wout De Nolf's avatar Wout De Nolf
Browse files

ewokscore API change MR !100

parent 99feb238
No related branches found
No related tags found
1 merge request!48ewokscore API change MR !100
Pipeline #66381 passed
......@@ -20,6 +20,8 @@ from ewokscore.inittask import task_executable
from ewokscore.inittask import get_varinfo
from ewokscore.inittask import task_executable_info
from ewokscore.graph import TaskGraph
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.node import get_node_label
from ewokscore.node import NodeIdType
......@@ -309,7 +311,9 @@ class EwoksWorkflow(Workflow):
script=ppfrunscript.__name__ + ".dummy",
**self._actor_arguments,
)
if not taskgraph.has_successors(node_id, link_has_on_error=True):
if not analysis.has_successors(
taskgraph.graph, node_id, link_has_on_error=True
):
self._connect_actors(actor, error_actor)
taskactors[node_id] = actor
self.addActorRef(actor)
......@@ -368,7 +372,7 @@ class EwoksWorkflow(Workflow):
source_actor = taskactors[source_id]
if conditions:
conditions = {c["source_output"]: c["value"] for c in conditions}
all_conditions = taskgraph.node_condition_values(source_id)
all_conditions = analysis.node_condition_values(taskgraph.graph, source_id)
conditions_else_value = taskgraph.graph.nodes[source_id].get(
"conditions_else_value", None
)
......@@ -418,7 +422,7 @@ class EwoksWorkflow(Workflow):
item["target_input"]: item["source_output"] for item in data_mapping
}
on_error = link_attrs.get("on_error", False)
required = taskgraph.link_is_required(source_id, target_id)
required = analysis.link_is_required(taskgraph.graph, source_id, target_id)
source_attrs = taskgraph.graph.nodes[source_id]
target_attrs = taskgraph.graph.nodes[target_id]
source_label = get_node_label(source_attrs, node_id=source_id)
......@@ -446,7 +450,7 @@ class EwoksWorkflow(Workflow):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
for target_id in taskgraph.graph.nodes:
predecessors = list(taskgraph.predecessors(target_id))
predecessors = list(analysis.predecessors(taskgraph.graph, target_id))
npredecessors = len(predecessors)
if npredecessors == 0:
targetactor = None
......@@ -475,7 +479,7 @@ class EwoksWorkflow(Workflow):
# target_id -> EwoksPythonActor or InputMergeActor
targetactors = self._targetactors
start_actor = self._start_actor
for target_id in taskgraph.start_nodes():
for target_id in analysis.start_nodes(taskgraph.graph):
target_actor = targetactors.get(target_id)
if target_actor is None:
target_actor = taskactors[target_id]
......@@ -485,7 +489,7 @@ class EwoksWorkflow(Workflow):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
stop_actor = self._stop_actor
for source_id in taskgraph.end_nodes():
for source_id in analysis.end_nodes(taskgraph.graph):
source_actor = taskactors[source_id]
self._connect_actors(source_actor, stop_actor)
......@@ -496,9 +500,9 @@ class EwoksWorkflow(Workflow):
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
timeout: Optional[float] = None,
shared_pool: bool = False,
**pool_options,
):
with self._run_context(shared_pool=shared_pool):
with self._run_context(**pool_options):
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
......@@ -541,6 +545,6 @@ def execute_graph(
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if inputs:
ewoksgraph.update_default_inputs(inputs)
graph_io.update_default_inputs(ewoksgraph.graph, inputs)
ppfgraph = EwoksWorkflow(ewoksgraph, varinfo=varinfo)
return ppfgraph.run(startargs=startargs, timeout=timeout, **execute_options)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment