Commit 54154d46 authored by Olof Svensson's avatar Olof Svensson
Browse files

Merge branch '7-refactor-the-stopactor' into 'main'

Resolve "Refactor the StopActor"

Closes #7

See merge request !10
parents 83e7eb8b 8b8d932f
......@@ -26,21 +26,17 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
class AbstractActor(object):
class AbstractActor(ThreadCountingActor):
def __init__(self, parent=None, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
self.name = name
def __init__(self, parent=None, name=None, **kw):
super().__init__(name=name, parent=parent, **kw)
self.listDownStreamActor = []
self.parent = parent
self.actorId = None
if self.parent is not None:
self.parent.addActorRef(self)
self.started = False
self.finished = False
......
......@@ -30,10 +30,10 @@ from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, parent=None, name='Error handler'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Error handler', **kw):
super().__init__(parent=parent, name=name, **kw)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId'):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, 'error')
AbstractActor.trigger(self, inData=inData)
super().trigger(inData=inData)
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, parent=None, name='Fork actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Fork actor', **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -28,8 +28,8 @@ from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, parent=None, name='Join actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Join actor', **kw):
super().__init__(parent=parent, name=name, **kw)
self.numberOfThreads = 0
self.listInData = []
......
......@@ -112,17 +112,12 @@ class AsyncFactory:
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.func, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
if self.callback is None:
self.pool.apply_async(self.func, args, kwargs)
elif self.errorCallback is None:
self.pool.apply_async(self.func, args, kwargs, self.callback)
else:
self.pool.apply_async(self.func, args, kwargs, self.callback, self.errorCallback)
self.pool.apply_async(self.func, args=args, kwds=kwargs, callback=self.callback, error_callback=self.errorCallback)
self.pool.close()
logger.debug('After apply_async')
class ActorWrapper(object):
class ActorWrapper:
def __init__(self, name, method):
self.name = name
......@@ -139,14 +134,11 @@ class ActorWrapper(object):
class PythonActor(AbstractActor):
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.parentErrorHandler = errorHandler
self.listErrorHandler = []
# Import script
self.script = script
# module = importlib.import_module(os.path.splitext(script)[0])
# self.actorWrapper = ActorWrapper(name, module.run)
self.actorWrapper = None
self.inData = None
self.outData = None
......@@ -178,11 +170,14 @@ class PythonActor(AbstractActor):
time.sleep(1)
self.errorHandler(inData, e)
else:
actorWrapper = ActorWrapper(self.name, module.run)
self.af = AsyncFactory(actorWrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.errorHandler)
self.af.call(self.inData)
with self._postpone_end_thread(
self.triggerDownStreamActor, self.errorHandler
) as (callback, errorCallback):
actorWrapper = ActorWrapper(self.name, module.run)
self.af = AsyncFactory(
actorWrapper.run, callback=callback, errorCallback=errorCallback
)
self.af.call(self.inData)
def errorHandler(self, inData, exception):
logger.error('Error when running actor {0}!'.format(self.name))
......
......@@ -29,11 +29,11 @@ from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, parent, name='Request status', status=None):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Request status', status=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.status=status
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId') and self.status is not None:
UtilsMongoDb.setMongoStatus(self.parent.mongoId, self.status)
AbstractActor.trigger(self, inData=inData)
super().trigger(inData=inData)
......@@ -31,9 +31,9 @@ logger = logging.getLogger('pypushflow')
class RouterActor(AbstractActor):
def __init__(self, parent, errorHandler=None, name='Router',
itemName=None, listPort=None):
AbstractActor.__init__(self, parent, name)
def __init__(self, parent=None, errorHandler=None, name='Router',
itemName=None, listPort=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.errorHandler = errorHandler
self.name = name
self.itemName = itemName
......
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
def __init__(self, parent=None, name='Start actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Start actor', **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -23,70 +23,47 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import time
import logging
import multiprocessing
from pypushflow import UtilsMongoDb
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
class StopActor(object):
class StopActor(ThreadCountingActor):
def __init__(self, parent=None, errorHandler=None, name='Stop actor'):
def __init__(self, parent=None, errorHandler=None, name='Stop actor', **kw):
super().__init__(name=name, parent=parent, **kw)
self.errorHandler = errorHandler
self.name = name
self.lock = multiprocessing.Lock()
self.lock.acquire()
self.outData = None
self.parent = parent
def trigger(self, inData):
time.sleep(1)
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
# Check if parent is workflow
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
# First wait for any actors still executing
listActorRef = self.parent.getListActorRef()
allStartedActorsFinished = False
while not allStartedActorsFinished:
allStartedActorsFinished = True
for actorRef in listActorRef:
if actorRef.hasStarted() and not actorRef.hasFinished():
logger.debug("{0}: Started {1} finished {2}".format(actorRef.name, actorRef.hasStarted(),
actorRef.hasFinished()))
allStartedActorsFinished = False
if not allStartedActorsFinished:
time.sleep(1)
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
# Parent is a Workflow
self.outData = inData
logger.debug('In {0}, parent {1}, before lock.release'.format(self.name, self.parent.name))
self.lock.release()
logger.debug('In {0}, parent {1}, after lock.release'.format(self.name, self.parent.name))
elif self.errorHandler is not None:
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
if self.errorHandler is not None:
# No parent or of type submodel, the stop actor to be triggered is in the error handler
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
# No parent, just release the lock
self.outData = inData
logger.debug('In {0}, before lock.release'.format(self.name))
self.lock.release()
logger.debug('In {0}, after lock.release'.format(self.name))
self.outData = inData
def join(self, timeout=7200):
if timeout is None:
timeout = 7200
if self.parent is not None:
logger.debug('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
success = self.lock.acquire(timeout=timeout)
logger.debug('In {0}, parent {1}, before wait_threads_finished'.format(self.name, self.parent.name))
success = self._wait_threads_finished(timeout=timeout)
if self.parent is not None:
logger.debug('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
if not success:
if self.parent is not None:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
logger.debug('In {0}, parent {1}, after wait_threads_finished'.format(self.name, self.parent.name))
self._finalizeInMongo(success)
return success
def _finalizeInMongo(self, success):
if self.parent is None:
return
if success:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
else:
logger.error('In {0}, parent {1}, finished'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
......@@ -25,13 +25,15 @@ __date__ = "28/05/2019"
import logging
logger = logging.getLogger('pypushflow')
from pypushflow.ThreadCountingActor import ThreadCountingActor
class Port(object):
logger = logging.getLogger("pypushflow")
def __init__(self, errorHandler, name):
self.name = errorHandler.name + '.' + name
class Port(ThreadCountingActor):
def __init__(self, errorHandler, name, **kw):
super().__init__(name=errorHandler.name + "." + name, **kw)
self.errorHandler = errorHandler
self.listActor = []
self.inPortTrigger = None
......@@ -47,16 +49,30 @@ class Port(object):
logger.debug("In {0} trigger".format(self.name))
if len(self.listActor) > 0:
for actor in self.listActor:
logger.debug("In trigger {0} -> actorName {1}".format(self.errorHandler.name, actor.name))
logger.debug(
"In trigger {0} -> actorName {1}".format(
self.errorHandler.name, actor.name
)
)
actor.trigger(*args, **kwargs)
if self.inPortTrigger is not None:
logger.debug("In {0} trigger, trigger = {1}".format(self.errorHandler.name, self.inPortTrigger))
logger.debug(
"In {0} trigger, trigger = {1}".format(
self.errorHandler.name, self.inPortTrigger
)
)
self.inPortTrigger(*args, **kwargs)
class Submodel(object):
def __init__(self, parent, errorHandler=None, name=None, portNames=['In', 'Out']):
class Submodel:
def __init__(
self,
parent=None,
errorHandler=None,
name=None,
portNames=["In", "Out"],
thread_counter=None,
):
self.parent = parent
self.mongoId = self.parent.mongoId
self.name = name
......@@ -67,31 +83,47 @@ class Submodel(object):
self.dictPort = {}
self.listOnErrorActor = []
for portName in portNames:
self.dictPort[portName] = Port(self, portName)
self.dictPort[portName] = Port(
self, portName, thread_counter=thread_counter
)
def getActorPath(self):
return self.parent.getActorPath() + '/' + self.name.replace('%', ' ')
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
def getPort(self, portName):
logger.debug("In {0} getPort, portName = {1}".format(self.name, portName))
return self.dictPort[portName]
def connect(self, actor, portName='Out'):
logger.debug("In {0} connect, portName = {2} -> actorName = {1}".format(self.name, actor.name, portName))
def connect(self, actor, portName="Out"):
logger.debug(
"In {0} connect, portName = {2} -> actorName = {1}".format(
self.name, actor.name, portName
)
)
self.dictPort[portName].connect(actor)
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
logger.debug(
"In connectOnError in subModule {0}, actor name {1}".format(
self.name, actor.name
)
)
self.listOnErrorActor.append(actor)
def triggerOnError(self, *args, **kwargs):
for onErrorActor in self.listOnErrorActor:
logger.debug("In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(self.name, onErrorActor.name, args[0]))
logger.debug(
"In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(
self.name, onErrorActor.name, args[0]
)
)
onErrorActor.trigger(*args, **kwargs)
if self.errorHandler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name))
logger.error(
'Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name)
)
self.errorHandler.triggerOnError(*args, **kwargs)
def addActorRef(self, actorRef):
if self.parent is not None:
self.parent.addActorRef(actorRef)
\ No newline at end of file
self.parent.addActorRef(actorRef)
import logging
from threading import Condition
logger = logging.getLogger("pypushflow")
class ThreadCounter:
"""Scheduling thread counter"""
def __init__(self):
self.__counter = 0
self.__condition = Condition()
def start_thread(self, msg=None):
with self.__condition:
self.__counter += 1
self._log_counter_change(msg=msg)
self.__condition.notify_all()
def end_thread(self, msg=None):
with self.__condition:
self.__counter = max(self.__counter - 1, 0)
self._log_counter_change(msg=msg)
self.__condition.notify_all()
def __enter__(self):
self.start_thread()
return self
def __exit__(self, *args):
self.end_thread()
def wait_threads_finished(self, timeout=None):
"""Returns False when timeout expires"""
while True:
with self.__condition:
if self.__counter == 0:
break
if not self.__condition.wait(timeout=timeout):
return False
return True
@property
def nthreads(self):
return self.__counter
def _log_counter_change(self, msg=None):
if msg is None:
msg = "Thread counter changed"
logger.debug("%s (%d threads running)", msg, self.__counter)
from functools import wraps
from contextlib import contextmanager
def with_thread_context(trigger):
"""Wraps the `trigger` method of all derived classes of ThreadCountingActor"""
@wraps(trigger)
def wrapper(self, *args, **kw):
with self._thread_context():
return trigger(self, *args, **kw)
return wrapper
def callback_with_end_thread(async_callback, end_thread, log_msg):
"""Wraps a async_callback"""
@wraps(async_callback)
def wrapper(*args, **kw):
try:
return async_callback(*args, **kw)
finally:
end_thread(msg=log_msg)
return wrapper
class ThreadCountingActor:
"""The `trigger` method of will increase the thread counter
at the start and decrease the thread counter at the end.
"""
def __init__(self, name=None, parent=None, thread_counter=None):
if name is None:
raise RuntimeError("Actor name is None!")
if thread_counter is None:
raise ValueError("Actor requires a 'thread_counter' argument")
self.name = name
self.parent = parent
if parent is not None:
parent.addActorRef(self)
self.__thread_counter = thread_counter
self.__in_thread_context = False
self.__postpone_end_thread = False
def __init_subclass__(subcls, **kw):
"""Wrap the `trigger` method"""
super().__init_subclass__(**kw)
subcls.trigger = with_thread_context(subcls.trigger)
@contextmanager
def _thread_context(self):
"""Re-entrant context manager that starts a thread
on first entrance and ends a thread on last exit,
unless the thread ending is post-poned until after
and async callback.
"""
if self.__in_thread_context:
yield
return
self.__thread_counter.start_thread(msg="Thread started for " + repr(self.name))
try:
self.__in_thread_context = True
self.__postpone_end_thread = False
try:
yield
finally:
self.__in_thread_context = False
finally:
if self.__postpone_end_thread:
self.__postpone_end_thread = False
else:
self.__thread_counter.end_thread(
msg="Thread ended for " + repr(self.name)
)
@contextmanager
def _postpone_end_thread(self, *async_callbacks):
"""Post-pone thread ending until after an async callback is executed.
Only one of the async callbacks is expected to be called.
"""
if self.__in_thread_context:
self.__postpone_end_thread = True
try:
async_callbacks = tuple(
callback_with_end_thread(
async_callback,
self.__thread_counter.end_thread,
"Thread ended for " + repr(self.name),
)
for async_callback in async_callbacks
)
yield async_callbacks
except BaseException:
if self.__in_thread_context:
self.__postpone_end_thread = False
raise
def _wait_threads_finished(self, **kw):
return self.__thread_counter.wait_threads_finished(**kw)
def trigger(self, **kw):
raise NotImplementedError
......@@ -33,37 +33,43 @@ from pypushflow.ErrorHandler import ErrorHandler
from pypushflow.ForkActor import ForkActor
from pypushflow.JoinActor import JoinActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ThreadCounter import ThreadCounter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('testPythonActor')
logger = logging.getLogger("testPythonActor")
class TestPythonActor(unittest.TestCase):
def setUp(self):
self.thread_counter = ThreadCounter()
def test_PythonActor(self):
script = 'pypushflow.test.pythonActorTest.py'
name = 'Python Actor Test'
actor = PythonActor(script=script, name=name)
stopActor = StopActor()
inData = {
'name': 'Ragnar'
}
script = "pypushflow.test.pythonActorTest.py"
name = "Python Actor Test"
actor = PythonActor(
script=script, name=name, thread_counter=self.thread_counter
)
stopActor = StopActor(thread_counter=self.thread_counter)
inData = {"name": "Ragnar"}
actor.connect(stopActor)
actor.trigger(inData)
stopActor.join(timeout=10)
outData = stopActor.outData
self.assertIsNotNone(outData)
self.assertEqual(outData['reply'], 'Hello Ragnar!')
self.assertEqual(outData["reply"], "Hello Ragnar!")
def test_ErrorHandler(self):
script = 'pypushflow.test.pythonErrorHandlerTest.py'
name = 'Python Error Handler Test'
actor = PythonActor(script=script, name=name)
errorHandler = ErrorHandler(name='Error handler')
stopActor = StopActor()
inData = {
'name': 'Ragnar'
}
return
script = "pypushflow.test.pythonErrorHandlerTest.py"
name = "Python Error Handler Test"
actor = PythonActor(
script=script, name=name, thread_counter=self.thread_counter
)
stopActor = StopActor(thread_counter=self.thread_counter)
errorHandler = ErrorHandler(
name="Error handler", thread_counter=self.thread_counter
)
inData = {"name": "Ragnar"}
actor.connect(stopActor)
actor.connectOnError(errorHandler)
errorHandler.connect(stopActor)
......@@ -71,15 +77,19 @@ class TestPythonActor(unittest.TestCase):
stopActor.join(timeout=5)
outData = stopActor.outData
self.assertIsNotNone(outData)
self.assertTrue('WorkflowException' in outData)
self.assertTrue("WorkflowException" in outData)