Commit 8b8d932f authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Use ThreadCountingActor

parent 46eaefd8
......@@ -26,21 +26,17 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
class AbstractActor(object):
class AbstractActor(ThreadCountingActor):
def __init__(self, parent=None, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
self.name = name
def __init__(self, parent=None, name=None, **kw):
super().__init__(name=name, parent=parent, **kw)
self.listDownStreamActor = []
self.parent = parent
self.actorId = None
if self.parent is not None:
self.parent.addActorRef(self)
self.started = False
self.finished = False
......
......@@ -30,10 +30,10 @@ from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, parent=None, name='Error handler'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Error handler', **kw):
super().__init__(parent=parent, name=name, **kw)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId'):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, 'error')
AbstractActor.trigger(self, inData=inData)
super().trigger(inData=inData)
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, parent=None, name='Fork actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Fork actor', **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -28,8 +28,8 @@ from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, parent=None, name='Join actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Join actor', **kw):
super().__init__(parent=parent, name=name, **kw)
self.numberOfThreads = 0
self.listInData = []
......
......@@ -112,17 +112,12 @@ class AsyncFactory:
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.func, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
if self.callback is None:
self.pool.apply_async(self.func, args, kwargs)
elif self.errorCallback is None:
self.pool.apply_async(self.func, args, kwargs, self.callback)
else:
self.pool.apply_async(self.func, args, kwargs, self.callback, self.errorCallback)
self.pool.apply_async(self.func, args=args, kwds=kwargs, callback=self.callback, error_callback=self.errorCallback)
self.pool.close()
logger.debug('After apply_async')
class ActorWrapper(object):
class ActorWrapper:
def __init__(self, name, method):
self.name = name
......@@ -139,14 +134,11 @@ class ActorWrapper(object):
class PythonActor(AbstractActor):
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.parentErrorHandler = errorHandler
self.listErrorHandler = []
# Import script
self.script = script
# module = importlib.import_module(os.path.splitext(script)[0])
# self.actorWrapper = ActorWrapper(name, module.run)
self.actorWrapper = None
self.inData = None
self.outData = None
......@@ -178,11 +170,14 @@ class PythonActor(AbstractActor):
time.sleep(1)
self.errorHandler(inData, e)
else:
actorWrapper = ActorWrapper(self.name, module.run)
self.af = AsyncFactory(actorWrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.errorHandler)
self.af.call(self.inData)
with self._postpone_end_thread(
self.triggerDownStreamActor, self.errorHandler
) as (callback, errorCallback):
actorWrapper = ActorWrapper(self.name, module.run)
self.af = AsyncFactory(
actorWrapper.run, callback=callback, errorCallback=errorCallback
)
self.af.call(self.inData)
def errorHandler(self, inData, exception):
logger.error('Error when running actor {0}!'.format(self.name))
......
......@@ -29,11 +29,11 @@ from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, parent, name='Request status', status=None):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Request status', status=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.status=status
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId') and self.status is not None:
UtilsMongoDb.setMongoStatus(self.parent.mongoId, self.status)
AbstractActor.trigger(self, inData=inData)
super().trigger(inData=inData)
......@@ -31,9 +31,9 @@ logger = logging.getLogger('pypushflow')
class RouterActor(AbstractActor):
def __init__(self, parent, errorHandler=None, name='Router',
itemName=None, listPort=None):
AbstractActor.__init__(self, parent, name)
def __init__(self, parent=None, errorHandler=None, name='Router',
itemName=None, listPort=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.errorHandler = errorHandler
self.name = name
self.itemName = itemName
......
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
def __init__(self, parent=None, name='Start actor'):
AbstractActor.__init__(self, parent=parent, name=name)
def __init__(self, parent=None, name='Start actor', **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -23,70 +23,47 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import time
import logging
import multiprocessing
from pypushflow import UtilsMongoDb
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
class StopActor(object):
class StopActor(ThreadCountingActor):
def __init__(self, parent=None, errorHandler=None, name='Stop actor'):
def __init__(self, parent=None, errorHandler=None, name='Stop actor', **kw):
super().__init__(name=name, parent=parent, **kw)
self.errorHandler = errorHandler
self.name = name
self.lock = multiprocessing.Lock()
self.lock.acquire()
self.outData = None
self.parent = parent
def trigger(self, inData):
time.sleep(1)
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
# Check if parent is workflow
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
# First wait for any actors still executing
listActorRef = self.parent.getListActorRef()
allStartedActorsFinished = False
while not allStartedActorsFinished:
allStartedActorsFinished = True
for actorRef in listActorRef:
if actorRef.hasStarted() and not actorRef.hasFinished():
logger.debug("{0}: Started {1} finished {2}".format(actorRef.name, actorRef.hasStarted(),
actorRef.hasFinished()))
allStartedActorsFinished = False
if not allStartedActorsFinished:
time.sleep(1)
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
# Parent is a Workflow
self.outData = inData
logger.debug('In {0}, parent {1}, before lock.release'.format(self.name, self.parent.name))
self.lock.release()
logger.debug('In {0}, parent {1}, after lock.release'.format(self.name, self.parent.name))
elif self.errorHandler is not None:
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
if self.errorHandler is not None:
# No parent or of type submodel, the stop actor to be triggered is in the error handler
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
# No parent, just release the lock
self.outData = inData
logger.debug('In {0}, before lock.release'.format(self.name))
self.lock.release()
logger.debug('In {0}, after lock.release'.format(self.name))
self.outData = inData
def join(self, timeout=7200):
if timeout is None:
timeout = 7200
if self.parent is not None:
logger.debug('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
success = self.lock.acquire(timeout=timeout)
logger.debug('In {0}, parent {1}, before wait_threads_finished'.format(self.name, self.parent.name))
success = self._wait_threads_finished(timeout=timeout)
if self.parent is not None:
logger.debug('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
if not success:
if self.parent is not None:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
logger.debug('In {0}, parent {1}, after wait_threads_finished'.format(self.name, self.parent.name))
self._finalizeInMongo(success)
return success
def _finalizeInMongo(self, success):
if self.parent is None:
return
if success:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
else:
logger.error('In {0}, parent {1}, finished'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
......@@ -25,13 +25,15 @@ __date__ = "28/05/2019"
import logging
logger = logging.getLogger('pypushflow')
from pypushflow.ThreadCountingActor import ThreadCountingActor
class Port(object):
logger = logging.getLogger("pypushflow")
def __init__(self, errorHandler, name):
self.name = errorHandler.name + '.' + name
class Port(ThreadCountingActor):
def __init__(self, errorHandler, name, **kw):
super().__init__(name=errorHandler.name + "." + name, **kw)
self.errorHandler = errorHandler
self.listActor = []
self.inPortTrigger = None
......@@ -47,16 +49,30 @@ class Port(object):
logger.debug("In {0} trigger".format(self.name))
if len(self.listActor) > 0:
for actor in self.listActor:
logger.debug("In trigger {0} -> actorName {1}".format(self.errorHandler.name, actor.name))
logger.debug(
"In trigger {0} -> actorName {1}".format(
self.errorHandler.name, actor.name
)
)
actor.trigger(*args, **kwargs)
if self.inPortTrigger is not None:
logger.debug("In {0} trigger, trigger = {1}".format(self.errorHandler.name, self.inPortTrigger))
logger.debug(
"In {0} trigger, trigger = {1}".format(
self.errorHandler.name, self.inPortTrigger
)
)
self.inPortTrigger(*args, **kwargs)
class Submodel(object):
def __init__(self, parent, errorHandler=None, name=None, portNames=['In', 'Out']):
class Submodel:
def __init__(
self,
parent=None,
errorHandler=None,
name=None,
portNames=["In", "Out"],
thread_counter=None,
):
self.parent = parent
self.mongoId = self.parent.mongoId
self.name = name
......@@ -67,31 +83,47 @@ class Submodel(object):
self.dictPort = {}
self.listOnErrorActor = []
for portName in portNames:
self.dictPort[portName] = Port(self, portName)
self.dictPort[portName] = Port(
self, portName, thread_counter=thread_counter
)
def getActorPath(self):
return self.parent.getActorPath() + '/' + self.name.replace('%', ' ')
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
def getPort(self, portName):
logger.debug("In {0} getPort, portName = {1}".format(self.name, portName))
return self.dictPort[portName]
def connect(self, actor, portName='Out'):
logger.debug("In {0} connect, portName = {2} -> actorName = {1}".format(self.name, actor.name, portName))
def connect(self, actor, portName="Out"):
logger.debug(
"In {0} connect, portName = {2} -> actorName = {1}".format(
self.name, actor.name, portName
)
)
self.dictPort[portName].connect(actor)
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
logger.debug(
"In connectOnError in subModule {0}, actor name {1}".format(
self.name, actor.name
)
)
self.listOnErrorActor.append(actor)
def triggerOnError(self, *args, **kwargs):
for onErrorActor in self.listOnErrorActor:
logger.debug("In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(self.name, onErrorActor.name, args[0]))
logger.debug(
"In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(
self.name, onErrorActor.name, args[0]
)
)
onErrorActor.trigger(*args, **kwargs)
if self.errorHandler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name))
logger.error(
'Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name)
)
self.errorHandler.triggerOnError(*args, **kwargs)
def addActorRef(self, actorRef):
if self.parent is not None:
self.parent.addActorRef(actorRef)
\ No newline at end of file
self.parent.addActorRef(actorRef)
......@@ -33,37 +33,43 @@ from pypushflow.ErrorHandler import ErrorHandler
from pypushflow.ForkActor import ForkActor
from pypushflow.JoinActor import JoinActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ThreadCounter import ThreadCounter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('testPythonActor')
logger = logging.getLogger("testPythonActor")
class TestPythonActor(unittest.TestCase):
def setUp(self):
self.thread_counter = ThreadCounter()
def test_PythonActor(self):
script = 'pypushflow.test.pythonActorTest.py'
name = 'Python Actor Test'
actor = PythonActor(script=script, name=name)
stopActor = StopActor()
inData = {
'name': 'Ragnar'
}
script = "pypushflow.test.pythonActorTest.py"
name = "Python Actor Test"
actor = PythonActor(
script=script, name=name, thread_counter=self.thread_counter
)
stopActor = StopActor(thread_counter=self.thread_counter)
inData = {"name": "Ragnar"}
actor.connect(stopActor)
actor.trigger(inData)
stopActor.join(timeout=10)
outData = stopActor.outData
self.assertIsNotNone(outData)
self.assertEqual(outData['reply'], 'Hello Ragnar!')
self.assertEqual(outData["reply"], "Hello Ragnar!")
def test_ErrorHandler(self):
script = 'pypushflow.test.pythonErrorHandlerTest.py'
name = 'Python Error Handler Test'
actor = PythonActor(script=script, name=name)
errorHandler = ErrorHandler(name='Error handler')
stopActor = StopActor()
inData = {
'name': 'Ragnar'
}
return
script = "pypushflow.test.pythonErrorHandlerTest.py"
name = "Python Error Handler Test"
actor = PythonActor(
script=script, name=name, thread_counter=self.thread_counter
)
stopActor = StopActor(thread_counter=self.thread_counter)
errorHandler = ErrorHandler(
name="Error handler", thread_counter=self.thread_counter
)
inData = {"name": "Ragnar"}
actor.connect(stopActor)
actor.connectOnError(errorHandler)
errorHandler.connect(stopActor)
......@@ -71,15 +77,19 @@ class TestPythonActor(unittest.TestCase):
stopActor.join(timeout=5)
outData = stopActor.outData
self.assertIsNotNone(outData)
self.assertTrue('WorkflowException' in outData)
self.assertTrue("WorkflowException" in outData)
def test_ForkAndJoinActors(self):
start = StartActor()
stop = StopActor()
fork = ForkActor()
joinActor = JoinActor()
pythonActor1 = PythonActor(script='pypushflow.test.pythonActor1.py')
pythonActor2 = PythonActor(script='pypushflow.test.pythonActor2.py')
start = StartActor(thread_counter=self.thread_counter)
stop = StopActor(thread_counter=self.thread_counter)
fork = ForkActor(thread_counter=self.thread_counter)
joinActor = JoinActor(thread_counter=self.thread_counter)
pythonActor1 = PythonActor(
script="pypushflow.test.pythonActor1.py", thread_counter=self.thread_counter
)
pythonActor2 = PythonActor(
script="pypushflow.test.pythonActor2.py", thread_counter=self.thread_counter
)
# Connections
start.connect(fork)
fork.connect(pythonActor1)
......@@ -90,7 +100,7 @@ class TestPythonActor(unittest.TestCase):
joinActor.increaseNumberOfThreads()
joinActor.connect(stop)
# Run
inData = {'a': 1}
inData = {"a": 1}
start.trigger(inData)
stop.join(timeout=5)
outData = stop.outData
......@@ -98,35 +108,39 @@ class TestPythonActor(unittest.TestCase):
logger.info(outData)
def test_RouterActor(self):
start = StartActor()
stop = StopActor()
start = StartActor(thread_counter=self.thread_counter)
stop = StopActor(thread_counter=self.thread_counter)
router = RouterActor(
parent=None,
itemName="a",
listPort=['other', 'null']
listPort=["other", "null"],
thread_counter=self.thread_counter,
)
pythonActor1 = PythonActor(
script="pypushflow.test.pythonActor1.py", thread_counter=self.thread_counter
)
pythonActor2 = PythonActor(
script="pypushflow.test.pythonActor2.py", thread_counter=self.thread_counter
)
pythonActor1 = PythonActor(script='pypushflow.test.pythonActor1.py')
pythonActor2 = PythonActor(script='pypushflow.test.pythonActor2.py')
# Connections
start.connect(router)
router.connect(pythonActor1, 'other')
router.connect(pythonActor2, 'null')
router.connect(pythonActor1, "other")
router.connect(pythonActor2, "null")
pythonActor1.connect(stop)
pythonActor2.connect(stop)
# Run
inData = {'a': 1}
inData = {"a": 1}
start.trigger(inData)
stop.join(timeout=5)
outData = stop.outData
logger.info(outData)
self.assertIsNotNone(outData)
self.assertTrue(outData['actor1'])
self.assertTrue(outData["actor1"])
# Run 2
inData = {'a': None}
inData = {"a": None}
start.trigger(inData)
stop.join(timeout=5)
outData = stop.outData
logger.info(outData)
self.assertIsNotNone(outData)
self.assertTrue(outData['actor2'])
self.assertTrue(outData["actor2"])
......@@ -30,9 +30,11 @@ from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.ThreadCounter import ThreadCounter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('testWorkflow')
logger = logging.getLogger("testWorkflow")
class Workflow1(Workflow):
......@@ -42,14 +44,16 @@ class Workflow1(Workflow):
"""
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor(parent=self)
super().__init__(name)
ctr = ThreadCounter()
self.startActor = StartActor(parent=self, thread_counter=ctr)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonActorTest.py',
name='Python Actor Test'
script="pypushflow.test.pythonActorTest.py",
name="Python Actor Test",
thread_counter=ctr,
)
self.stopActor = StopActor(parent=self)
self.stopActor = StopActor(parent=self, thread_counter=ctr)
self.startActor.connect(self.pythonActor)
self.pythonActor.connect(self.stopActor)
......@@ -60,10 +64,9 @@ class Workflow1(Workflow):
class TestWorkflow1(unittest.TestCase):
def test_Workflow1(self):
testWorkflow1 = Workflow1('Test workflow 1')
inData = {'name': 'Jerry'}
testWorkflow1 = Workflow1("Test workflow 1")
inData = {"name": "Jerry"}
outData = testWorkflow1.run(inData)
self.assertIsNotNone(outData)
self.assertEqual(outData['reply'], 'Hello Jerry!')
self.assertEqual(outData["reply"], "Hello Jerry!")
......@@ -31,40 +31,44 @@ from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ThreadCounter import ThreadCounter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('testWorkflow')
logger = logging.getLogger("testWorkflow")
class Workflow10(Workflow):
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor(self)
super().__init__(name)
ctr = ThreadCounter()
self.startActor = StartActor(self, thread_counter=ctr)
self.pythonActorAddWithoutSleep = PythonActor(
parent=self,
script='pypushflow.test.pythonActorAddWithoutSleep.py',
name='Add without sleep',
script="pypushflow.test.pythonActorAddWithoutSleep.py",
name="Add without sleep",
thread_counter=ctr,