Skip to content
Snippets Groups Projects
Commit 5d71e054 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch 'ewokscore_graph_api_change' into 'main'

ewokscore API change MR !100

See merge request !48
parents 99feb238 764c0eeb
No related branches found
Tags v0.0.3a4
1 merge request!48ewokscore API change MR !100
Pipeline #66385 passed
from .bindings import execute_graph # noqa: F401
__version__ = "0.0.3-alpha.3"
__version__ = "0.0.3-alpha.4"
......@@ -20,6 +20,8 @@ from ewokscore.inittask import task_executable
from ewokscore.inittask import get_varinfo
from ewokscore.inittask import task_executable_info
from ewokscore.graph import TaskGraph
from ewokscore.graph import analysis
from ewokscore.graph import graph_io
from ewokscore.node import get_node_label
from ewokscore.node import NodeIdType
......@@ -309,7 +311,9 @@ class EwoksWorkflow(Workflow):
script=ppfrunscript.__name__ + ".dummy",
**self._actor_arguments,
)
if not taskgraph.has_successors(node_id, link_has_on_error=True):
if not analysis.has_successors(
taskgraph.graph, node_id, link_has_on_error=True
):
self._connect_actors(actor, error_actor)
taskactors[node_id] = actor
self.addActorRef(actor)
......@@ -368,7 +372,7 @@ class EwoksWorkflow(Workflow):
source_actor = taskactors[source_id]
if conditions:
conditions = {c["source_output"]: c["value"] for c in conditions}
all_conditions = taskgraph.node_condition_values(source_id)
all_conditions = analysis.node_condition_values(taskgraph.graph, source_id)
conditions_else_value = taskgraph.graph.nodes[source_id].get(
"conditions_else_value", None
)
......@@ -418,7 +422,7 @@ class EwoksWorkflow(Workflow):
item["target_input"]: item["source_output"] for item in data_mapping
}
on_error = link_attrs.get("on_error", False)
required = taskgraph.link_is_required(source_id, target_id)
required = analysis.link_is_required(taskgraph.graph, source_id, target_id)
source_attrs = taskgraph.graph.nodes[source_id]
target_attrs = taskgraph.graph.nodes[target_id]
source_label = get_node_label(source_attrs, node_id=source_id)
......@@ -446,7 +450,7 @@ class EwoksWorkflow(Workflow):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
for target_id in taskgraph.graph.nodes:
predecessors = list(taskgraph.predecessors(target_id))
predecessors = list(analysis.predecessors(taskgraph.graph, target_id))
npredecessors = len(predecessors)
if npredecessors == 0:
targetactor = None
......@@ -475,7 +479,7 @@ class EwoksWorkflow(Workflow):
# target_id -> EwoksPythonActor or InputMergeActor
targetactors = self._targetactors
start_actor = self._start_actor
for target_id in taskgraph.start_nodes():
for target_id in analysis.start_nodes(taskgraph.graph):
target_actor = targetactors.get(target_id)
if target_actor is None:
target_actor = taskactors[target_id]
......@@ -485,7 +489,7 @@ class EwoksWorkflow(Workflow):
# task_name -> EwoksPythonActor
taskactors = self._taskactors
stop_actor = self._stop_actor
for source_id in taskgraph.end_nodes():
for source_id in analysis.end_nodes(taskgraph.graph):
source_actor = taskactors[source_id]
self._connect_actors(source_actor, stop_actor)
......@@ -496,9 +500,9 @@ class EwoksWorkflow(Workflow):
results_of_all_nodes: Optional[bool] = False,
outputs: Optional[List[dict]] = None,
timeout: Optional[float] = None,
shared_pool: bool = False,
**pool_options,
):
with self._run_context(shared_pool=shared_pool):
with self._run_context(**pool_options):
startindata = dict(self.startargs)
if startargs:
startindata.update(startargs)
......@@ -541,6 +545,6 @@ def execute_graph(
load_options = dict()
ewoksgraph = load_graph(source=graph, **load_options)
if inputs:
ewoksgraph.update_default_inputs(inputs)
graph_io.update_default_inputs(ewoksgraph.graph, inputs)
ppfgraph = EwoksWorkflow(ewoksgraph, varinfo=varinfo)
return ppfgraph.run(startargs=startargs, timeout=timeout, **execute_options)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment