Skip to content
Snippets Groups Projects
Commit 084a7c24 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch 'ppf_logging_api_change' into 'main'

Ppf logging api change

See merge request !39
parents 3e85994b f5366fb9
No related branches found
No related tags found
1 merge request!39Ppf logging api change
Pipeline #61767 passed
......@@ -15,3 +15,6 @@ build_doc:
pages:
extends: .pages
assets:
extends: .assets
import sys
import pprint
from typing import Optional
from pypushflow.Workflow import Workflow
......@@ -77,6 +78,7 @@ class EwoksPythonActor(PythonActor):
"""
:param dict inData: output from the previous task
"""
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
infokey = ppfrunscript.INFOKEY
inData[infokey] = dict(inData[infokey])
inData[infokey]["node_id"] = self.node_id
......@@ -115,6 +117,7 @@ class DecodeRouterActor(RouterActor):
return CONDITIONS_ELSE_VALUE
def trigger(self, inData):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
value = self._extractValue(inData)
......@@ -149,6 +152,7 @@ class NameMapperActor(AbstractActor):
actor._required_actor(self)
def trigger(self, inData):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
is_error = "WorkflowException" in inData
if is_error and not self.trigger_on_error:
return
......@@ -182,34 +186,42 @@ class InputMergeActor(AbstractActor):
def __init__(self, parent=None, name="Input merger", **kw):
super().__init__(parent=parent, name=name, **kw)
self.requiredDownStreamActor = list()
self.startInData = list()
self.requiredInData = list()
self.requiredInData = dict()
self.nonrequiredInData = dict()
def _required_actor(self, actor):
if actor.required:
self.requiredDownStreamActor.append(actor)
self.requiredInData.append(None)
self.requiredInData[actor] = None
def trigger(self, inData, source=None):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
if source is None:
self.startInData.append(inData)
else:
try:
i = self.requiredDownStreamActor.index(source)
except ValueError:
self.nonrequiredInData = inData
if source in self.requiredInData:
self.requiredInData[source] = inData
else:
self.requiredInData[i] = inData
if None in self.requiredInData:
self.nonrequiredInData = inData
missing = {k: v for k, v in self.requiredInData.items() if v is None}
if missing:
self.logger.info(
"not triggering downstream actors (missing inputs %s)",
[actor.name for actor in missing],
)
return
self.logger.info(
"triggering downstream actors (%d start inputs, %d required inputs, %d optional inputs)",
len(self.startInData),
len(self.requiredInData),
int(bool(self.nonrequiredInData)),
)
newInData = dict()
for data in self.startInData:
newInData.update(data)
for data in self.requiredInData:
for data in self.requiredInData.values():
newInData.update(data)
newInData.update(self.nonrequiredInData)
for actor in self.listDownStreamActor:
......@@ -242,7 +254,7 @@ class EwoksWorkflow(Workflow):
# target_id -> EwoksPythonActor or InputMergeActor
self._targetactors = dict()
self._threadcounter = ThreadCounter()
self._threadcounter = ThreadCounter(parent=self)
self._start_actor = StartActor(name="Start", **self._actor_arguments)
self._stop_actor = StopActor(name="Stop", **self._actor_arguments)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment