GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit 093e9092 authored by Operator for ID30's avatar Operator for ID30

Added wait for all actors to have finished before end of workflow

parent d93df363
......@@ -39,6 +39,10 @@ class AbstractActor(object):
self.listDownStreamActor = []
self.parent = parent
self.actorId = None
if self.parent is not None:
self.parent.addActorRef(self)
self.started = False
self.finished = False
def connect(self, actor):
logger.debug('Connecting actor "{0}" to actor "{1}"'.format(
......@@ -47,6 +51,8 @@ class AbstractActor(object):
self.listDownStreamActor.append(actor)
def trigger(self, inData):
self.setStarted()
self.setFinished()
for actor in self.listDownStreamActor:
logger.debug('In actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
......@@ -76,6 +82,19 @@ class AbstractActor(object):
if self.parent.mongoId is not None:
UtilsMongoDb.setMongoAttribute(self.parent.mongoId, attribute, value)
def getActorPath(self):
return self.parent.getActorPath()
def hasStarted(self):
return self.started
def setStarted(self):
logger.info("Setting finished of {0} to True".format(self.name))
self.started = True
def hasFinished(self):
return self.finished
def setFinished(self):
logger.info("Setting finished of {0} to True".format(self.name))
self.finished = True
......@@ -37,6 +37,8 @@ class JoinActor(AbstractActor):
self.numberOfThreads += 1
def trigger(self, inData):
self.setStarted()
self.setFinished()
self.listInData.append(inData)
if len(self.listInData) == self.numberOfThreads:
newInData = {}
......
......@@ -97,6 +97,7 @@ class AsyncFactory:
self.callback = callback
self.errorCallback = errorCallback
self.pool = Edna2Pool(1)
self.hasFinished = False
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
......@@ -149,6 +150,7 @@ class PythonActor(AbstractActor):
self.listErrorHandler.append(errorHandler)
def trigger(self, inData):
self.setStarted()
self.inData = inData
self.uploadDataToMongo(actorData={'inData': inData}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(inData)))
......@@ -170,6 +172,7 @@ class PythonActor(AbstractActor):
def errorHandler(self, exception):
logger.error('Error when running actor {0}!'.format(self.name))
self.setFinished()
workflowException = WorkflowException(
errorMessage=exception,
traceBack=None,
......@@ -184,6 +187,7 @@ class PythonActor(AbstractActor):
def triggerDownStreamActor(self, inData={}):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
self.setFinished()
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData
......
......@@ -49,6 +49,8 @@ class RouterActor(AbstractActor):
def trigger(self, inData):
logger.debug('In router actor "{0}"'.format(self.name))
self.setStarted()
self.setFinished()
listActor = None
if self.itemName in inData:
logger.debug('In router actor "{0}", itemName {1} in inData'.format(self.name, self.itemName))
......
......@@ -23,6 +23,7 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import time
import logging
import multiprocessing
......@@ -46,23 +47,37 @@ class StopActor(object):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
# Check if parent is workflow
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
# First wait for any actors still executing
listActorRef = self.parent.getListActorRef()
allStartedActorsFinished = False
while not allStartedActorsFinished:
allStartedActorsFinished = True
for actorRef in listActorRef:
if actorRef.hasStarted() and not actorRef.hasFinished():
logger.info("{0}: Started {1} finished {2}".format(actorRef.name, actorRef.hasStarted(),
actorRef.hasFinished()))
allStartedActorsFinished = False
if not allStartedActorsFinished:
time.sleep(1)
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
self.outData = inData
logger.debug('In {0}, parent {1}, before lock.release'.format(self.name, self.parent.name))
self.lock.release()
logger.debug('In {0}, parent {1}, after lock.release'.format(self.name, self.parent.name))
else:
elif self.errorHandler is not None:
self.errorHandler.errorHandler.stopActor.trigger(inData)
def join(self, timeout=7200):
if timeout is None:
timeout = 7200
logger.debug('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
if self.parent is not None:
logger.debug('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
success = self.lock.acquire(timeout=timeout)
logger.debug('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
if self.parent is not None:
logger.debug('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
if not success:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
if self.parent is not None:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
......@@ -86,4 +86,8 @@ class Submodel(object):
logger.debug("In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(self.name, onErrorActor.name, args[0]))
onErrorActor.trigger(*args, **kwargs)
if self.errorHandler is not None:
self.errorHandler.triggerOnError(*args, **kwargs)
\ No newline at end of file
self.errorHandler.triggerOnError(*args, **kwargs)
def addActorRef(self, actorRef):
if self.parent is not None:
self.parent.addActorRef(actorRef)
\ No newline at end of file
......@@ -39,6 +39,7 @@ class Workflow(object):
self.name = name
self.listOnErrorActor = []
self.mongoId = UtilsMongoDb.initMongo(name=name)
self.listActorRef = []
def connectOnError(self, actor):
logger.debug("In Workflow '{0}' connectOnError, actor name {1}".format(self.name, actor.name))
......@@ -53,3 +54,10 @@ class Workflow(object):
def getActorPath(self):
return '/' + self.name
def addActorRef(self, actorRef):
logger.info("Adding actor ref: {0}".format(actorRef.name))
self.listActorRef.append(actorRef)
def getListActorRef(self):
return self.listActorRef
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment