GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit 60e8dc89 authored by Operator for ID30's avatar Operator for ID30

Added mongodb persistence

parent e91023d8
......@@ -24,16 +24,21 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
class AbstractActor(object):
def __init__(self, name=None):
def __init__(self, parent=None, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
self.name = name
self.listDownStreamActor = []
self.parent = parent
self.actorId = None
def connect(self, actor):
logger.debug('Connecting actor "{0}" to actor "{1}"'.format(
......@@ -47,3 +52,11 @@ class AbstractActor(object):
self.name, actor.name
))
actor.trigger(inData)
def uploadDataToMongo(self, actorData={}):
if self.parent is not None:
if self.parent.mongoId is not None:
if self.actorId is None:
self.actorId = UtilsMongoDb.initActor(workflowId=self.parent.mongoId, name=self.name, actorData=actorData)
else:
UtilsMongoDb.addDataToActor(workflowId=self.parent.mongoId, actorId=self.actorId, actorData=actorData)
\ No newline at end of file
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, name='Error handler'):
AbstractActor.__init__(self, name)
def __init__(self, parent, name='Error handler'):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, name='Fork actor'):
AbstractActor.__init__(self, name)
def __init__(self, parent, name='Fork actor'):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -28,8 +28,8 @@ from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, name='Join actor'):
AbstractActor.__init__(self, name)
def __init__(self, parent, name='Join actor'):
AbstractActor.__init__(self, parent=parent, name=name)
self.numberOfThreads = 0
self.listInData = []
......
......@@ -24,9 +24,9 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import os
import time
import pprint
import logging
import datetime
import traceback
import functools
import importlib
......@@ -133,8 +133,8 @@ class ActorWrapper(object):
class PythonActor(AbstractActor):
def __init__(self, name='Python Actor', errorHandler=None, script=None):
AbstractActor.__init__(self, name)
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None):
AbstractActor.__init__(self, parent=parent, name=name)
self.errorHandler = errorHandler
self.listErrorHandler = []
# Import script
......@@ -147,6 +147,7 @@ class PythonActor(AbstractActor):
self.listErrorHandler.append(errorHandler)
def trigger(self, inData):
self.uploadDataToMongo(actorData={'inData': inData})
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(inData)))
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
......@@ -199,12 +200,22 @@ class PythonActor(AbstractActor):
'traceBack': workflowException.traceBack,
}
oldInData['WorkflowException'] = exceptionDict
self.uploadDataToMongo(actorData={
'stopTime': datetime.datetime.utcnow(),
'status': 'error',
'outData': exceptionDict
})
for errorHandler in self.listErrorHandler:
errorHandler.trigger(oldInData)
if self.errorHandler is not None:
logger.error('Trigger on error on errorHandler {0}'.format(self.errorHandler.name))
self.errorHandler.triggerOnError(inData=oldInData)
else:
self.uploadDataToMongo(actorData={
'stopTime': datetime.datetime.utcnow(),
'status': 'finished',
'outData': inData
})
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData))
downStreamActor.trigger(inData)
......
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, name='Request status'):
AbstractActor.__init__(self, name)
def __init__(self, parent, name='Request status'):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -31,9 +31,9 @@ logger = logging.getLogger('pypushflow')
class RouterActor(AbstractActor):
def __init__(self, errorHandler=None, name='Router', itemName=None, listPort=[]):
AbstractActor.__init__(self, name)
self.errorHandler=errorHandler
def __init__(self, parent, errorHandler=None, name='Router', itemName=None, listPort=[]):
AbstractActor.__init__(self, parent, name)
self.errorHandler = errorHandler
self.name = name
self.itemName = itemName
self.listPort = listPort
......
......@@ -28,5 +28,5 @@ from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
def __init__(self, name='Start actor'):
AbstractActor.__init__(self, name)
def __init__(self, parent=None, name='Start actor'):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -26,20 +26,24 @@ __date__ = "28/05/2019"
import logging
import multiprocessing
from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
class StopActor(object):
def __init__(self, errorHandler=None, name='Stop actor'):
self.errorHandler=errorHandler
def __init__(self, parent=None, errorHandler=None, name='Stop actor'):
self.errorHandler = errorHandler
self.name = name
self.lock = multiprocessing.Lock()
self.lock.acquire()
self.outData = None
self.parent = parent
def trigger(self, inData):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
UtilsMongoDb.closeMongo(self.parent.mongoId)
if self.errorHandler is not None:
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
......
......@@ -56,7 +56,9 @@ class Port(object):
class Submodel(object):
def __init__(self, errorHandler=None, name=None, portNames=['In', 'Out']):
def __init__(self, parent, errorHandler=None, name=None, portNames=['In', 'Out']):
self.parent = parent
self.mongoId = self.parent.mongoId
self.name = name
self.errorHandler = errorHandler
self.dictPort = {}
......
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "16/10/2019"
import os
import datetime
import pymongo
from bson.objectid import ObjectId
def getDateTimeString():
return datetime.datetime.utcnow()
def getMongoUrl():
return os.environ.get('PYPUSHFLOW_MONGOURL', None)
def initMongo(name):
workflowId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
creator = os.environ.get('PYPUSHFLOW_CREATOR', 'Unknown')
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
workflowData = {
'startTime': getDateTimeString(),
'creator': creator,
'name': name,
'status': 'started',
'actors': []
}
insertOneResult = collection.insert_one(workflowData)
workflowId = insertOneResult.inserted_id
return workflowId
def closeMongo(workflowId):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
dictWorkflow['status'] = 'finished'
dictWorkflow['stopTime'] = getDateTimeString()
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
def initActor(name, workflowId, actorData={}):
actorId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
actorId = ObjectId()
actor = {
'_id': actorId,
'startTime': getDateTimeString(),
'name': name,
'status': 'started',
}
actor.update(actorData)
collection.update({'_id': workflowId}, {'$push': {'actors': actor}})
return actorId
def addDataToActor(workflowId, actorId, actorData={}):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
for actor in dictWorkflow['actors']:
if actor['_id'] == actorId:
actor.update(actorData)
break
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
......@@ -24,8 +24,12 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import time
import pprint
import logging
from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
......@@ -34,6 +38,7 @@ class Workflow(object):
def __init__(self, name):
self.name = name
self.listOnErrorActor = []
self.mongoId = UtilsMongoDb.initMongo(name=name)
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
......
......@@ -38,12 +38,13 @@ class Workflow1(Workflow):
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor()
self.startActor = StartActor(parent=self)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonActorTest.py',
name='Python Actor Test'
)
self.stopActor = StopActor()
self.stopActor = StopActor(parent=self)
self.startActor.connect(self.pythonActor)
self.pythonActor.connect(self.stopActor)
......@@ -63,13 +64,14 @@ class Workflow2(Workflow):
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor()
self.startActor = StartActor(parent=self)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonErrorHandlerTest.py',
name='Python Error Handler Test',
errorHandler=self
)
self.stopActor = StopActor()
self.stopActor = StopActor(parent=self)
self.startActor.connect(self.pythonActor)
self.pythonActor.connect(self.stopActor)
self.connectOnError(self.stopActor)
......@@ -85,9 +87,10 @@ class Submodel1(Submodel):
Submodel containing one python actor.
"""
def __init__(self, name):
Submodel.__init__(self, name=name)
def __init__(self, parent, name):
Submodel.__init__(self, parent, name=name)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonActorTest.py',
name='Python Actor Test',
)
......@@ -103,9 +106,9 @@ class Workflow3(Workflow):
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor()
self.submodel1 = Submodel1('Submodel 1')
self.stopActor = StopActor()
self.startActor = StartActor(parent=self)
self.submodel1 = Submodel1(parent=self, name='Submodel 1')
self.stopActor = StopActor(parent=self)
self.startActor.connect(self.submodel1.getPort('In'))
self.submodel1.getPort('Out').connect(self.stopActor)
......@@ -120,9 +123,10 @@ class Submodel2(Submodel):
Submodel containing one python actor which throws an exception.
"""
def __init__(self, name):
Submodel.__init__(self, name=name)
def __init__(self, parent, name):
Submodel.__init__(self, parent, name=name)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonErrorHandlerTest.py',
name='Python Error Handler Test',
errorHandler=self
......
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import os
import logging
import unittest
from pypushflow import UtilsMongoDb
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('testPythonActor')
class TestUtilsMongoDb(unittest.TestCase):
def setUp(self):
os.environ['PYPUSHFLOW_MONGOURL'] = 'mongodb://pybes:pybes@linsvensson.esrf.fr:27017/pybes'
os.environ['PYPUSHFLOW_CREATOR'] = 'TestUtilsMongoDb'
def tes_initMongo(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
def tes_initActor(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
actorName = 'TestActor'
actorId = UtilsMongoDb.initActor(name=actorName, workflowId=workflowId)
self.assertIsNotNone(actorId)
def test_addDataToActor(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
actorName1 = 'TestActor1'
actorId1 = UtilsMongoDb.initActor(name=actorName1, workflowId=workflowId)
self.assertIsNotNone(actorId1)
actorName2 = 'TestActor2'
actorId2 = UtilsMongoDb.initActor(name=actorName2, workflowId=workflowId)
inData = {'a': 1}
UtilsMongoDb.addDataToActor(workflowId=workflowId, actorId=actorId1, key='inData', value=inData)
\ No newline at end of file
......@@ -23,6 +23,7 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import os
import logging
import unittest
......@@ -37,6 +38,10 @@ logger = logging.getLogger('testWorkflow')
class TestWorkflows(unittest.TestCase):
def setUp(self):
os.environ['PYPUSHFLOW_MONGOURL'] = 'mongodb://pybes:pybes@linsvensson.esrf.fr:27017/pybes'
os.environ['PYPUSHFLOW_CREATOR'] = 'TestWorkflows'
def test_Workflow1(self):
testWorkflow1 = Workflow1('Test workflow 1')
inData = {'name': 'Jerry'}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment