Commit ce288ea1 authored by payno's avatar payno
Browse files

add representation to be used in pypushflow.

Had to remove the dson and MongoDB as it fails on my computer.
Work in progress.
parent caa7591f
......@@ -25,13 +25,15 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
#from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
class AbstractActor(object):
"""
TODO
"""
def __init__(self, parent=None, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
......@@ -53,23 +55,23 @@ class AbstractActor(object):
))
actor.trigger(inData)
def uploadDataToMongo(self, actorData={}, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
if self.actorId is None:
actorPath = self.getActorPath() + '/' + self.name
self.actorId = UtilsMongoDb.initActor(
workflowId=self.parent.mongoId,
name=actorPath,
actorData=actorData,
script=script
)
else:
UtilsMongoDb.addDataToActor(
workflowId=self.parent.mongoId,
actorId=self.actorId,
actorData=actorData
)
# def uploadDataToMongo(self, actorData={}, script=None):
# if self.parent is not None:
# if self.parent.mongoId is not None:
# if self.actorId is None:
# actorPath = self.getActorPath() + '/' + self.name
# self.actorId = UtilsMongoDb.initActor(
# workflowId=self.parent.mongoId,
# name=actorPath,
# actorData=actorData,
# script=script
# )
# else:
# UtilsMongoDb.addDataToActor(
# workflowId=self.parent.mongoId,
# actorId=self.actorId,
# actorData=actorData
# )
def getActorPath(self):
return self.parent.getActorPath()
......@@ -23,7 +23,7 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import UtilsMongoDb
#from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
......@@ -34,6 +34,6 @@ class ErrorHandler(AbstractActor):
AbstractActor.__init__(self, parent=parent, name=name)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId'):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, 'error')
# if self.parent is not None and hasattr(self.parent, 'mongoId'):
# UtilsMongoDb.setMongoStatus(self.parent.mongoId, 'error')
AbstractActor.trigger(self, inData=inData)
......@@ -43,4 +43,32 @@ class JoinActor(AbstractActor):
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
\ No newline at end of file
actor.trigger(newInData)
class JoinUntilStopSignal(AbstractActor):
def __init__(self, name):
self.name = name
self.listInData = []
self.listDownStreamActor = []
self._nprocess_received = 0
self._nprocess_waited = 0
self._can_stop = False
def connect(self, actor):
self.listDownStreamActor.append(actor)
def trigger(self, inData):
if type(inData) is dict and 'sig_type' in inData and inData['sig_type'] == 'stop':
self._can_stop = True
self._nprocess_waited = inData['n_process']
else:
self._nprocess_received += 1
self.listInData.append(inData)
if self._can_stop and self._nprocess_waited <= self._nprocess_received:
newInData = {}
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
......@@ -30,10 +30,9 @@ import datetime
import traceback
import functools
import importlib
import multiprocessing
import multiprocessing.pool
from pypushflow.AbstractActor import AbstractActor
from pypushflow.representation.scheme.node import Node
logger = logging.getLogger('pypushflow')
......@@ -91,23 +90,49 @@ class Edna2Pool(multiprocessing.pool.Pool):
#
#############################################################################
def _exec_node(name: str, input_: dict, properties: dict):
"""
Execute a node from the name of the process, input of the process and
properties of the process
:param str name: full name of the process to execute
:param input_: process input
:param properties: process properties / settings
:return: result of Node.execute
"""
logger.debug(' '.join(('processing', str(name), 'with input',
str(input_), 'and', str(properties), 'as properties')))
if type(input_) is tuple:
data_name, data = input_
else:
data_name = None
data = input_
return Node.execute(process_pt=name, properties=properties,
input_data=data, input_name=data_name)
class AsyncFactory:
def __init__(self, func, callback=None, errorCallback=None):
self.func = func
"""
TODO
"""
def __init__(self, node, callback=None, errorCallback=None):
self.node = node
self.callback = callback
self.errorCallback = errorCallback
self.pool = Edna2Pool(1)
# self.pool = Edna2Pool(1)
self.pool = multiprocessing.Pool()
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.func, self.callback, self.errorCallback))
self.node, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
if self.callback is None:
self.pool.apply_async(self.func, args, kwargs)
self.pool.apply_async(_exec_node, args, kwargs)
elif self.errorCallback is None:
self.pool.apply_async(self.func, args, kwargs, self.callback)
self.pool.apply_async(_exec_node, args, kwargs, self.callback)
else:
self.pool.apply_async(self.func, args, kwargs, self.callback, self.errorCallback)
self.pool.apply_async(_exec_node, args, kwargs, self.callback, self.errorCallback)
logger.debug('After apply_async')
def wait(self):
......@@ -116,46 +141,62 @@ class AsyncFactory:
class ActorWrapper(object):
def __init__(self, name, method):
self.name = name
self.method = method
"""
TODO
"""
def __init__(self, node):
self.node = node
@trace_unhandled_exceptions
def run(self, *args, **kwargs):
logger.debug('In actor wrapper for {0}'.format(self.name))
logger.debug('args={0}, kwargs={1}, method={2}'.format(args, kwargs, self.method))
inData = args[0]
outData = self.method(**inData)
inData.update(outData)
return inData
def run(self, in_data):
logger.debug('In actor wrapper for {node}'.format(node=self.node))
out_data = self.node.execute(in_data)
if isinstance(out_data, WorkflowException):
return out_data
else:
in_data.update(out_data)
return out_data
class PythonActor(AbstractActor):
"""
TODO
"""
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None):
AbstractActor.__init__(self, parent=parent, name=name)
self.errorHandler = errorHandler
self.listErrorHandler = []
self.error_handler = errorHandler
self.list_error_handler = []
# Import script
self.script = script
module = importlib.import_module(os.path.splitext(script)[0])
self.actorWrapper = ActorWrapper(name, module.run)
self.inData = None
self.outData = None
self.af = None
self.actor_wrapper = ActorWrapper(name, module.run)
self.in_data = None
self.out_data = None
self.async_factory = None
# TODO: could or should the async factory be a borg idiom ?
def connectOnError(self, errorHandler):
self.listErrorHandler.append(errorHandler)
self.list_error_handler.append(errorHandler)
def trigger(self, inData):
self.inData = inData
self.uploadDataToMongo(actorData={'inData': inData}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(inData)))
if isinstance(inData, WorkflowException):
def trigger(self, in_data):
"""
'callback' function when this function is triggered.
:param data: input data
"""
# cast data to dict if necessary
if hasattr(in_data, 'to_dict'):
_in_data = in_data.to_dict()
else:
_in_data = in_data
self.in_data = _in_data
self.uploadDataToMongo(actorData={'inData': in_data}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(in_data)))
if isinstance(in_data, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
if self.errorHandler is not None:
workflowException = inData
if self.error_handler is not None:
workflowException = in_data
oldInData = workflowException.data
exceptionDict = {
'errorMessage': workflowException.errorMessage,
......@@ -163,10 +204,11 @@ class PythonActor(AbstractActor):
}
oldInData['WorkflowException'] = exceptionDict
self.triggerOnError(oldInData)
self.af = AsyncFactory(self.actorWrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.errorHandler)
self.af.call(inData)
self.async_factory = AsyncFactory(self.actor_wrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.error_handler)
self.async_factory.call(in_data)
def errorHandler(self, exception):
logger.error('Error when running actor {0}!'.format(self.name))
......@@ -177,10 +219,10 @@ class PythonActor(AbstractActor):
)
inData = {'WorkflowException': workflowException}
logger.error(exception)
for errorHandler in self.listErrorHandler:
for errorHandler in self.list_error_handler:
errorHandler.trigger(inData)
if self.errorHandler is not None:
self.errorHandler.triggerOnError(inData)
if self.error_handler is not None:
self.error_handler.triggerOnError(inData)
def triggerDownStreamActor(self, inData={}):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
......@@ -198,16 +240,16 @@ class PythonActor(AbstractActor):
'status': 'error',
'outData': exceptionDict
})
for errorHandler in self.listErrorHandler:
for errorHandler in self.list_error_handler:
errorHandler.trigger(oldInData)
if self.errorHandler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name))
self.errorHandler.triggerOnError(inData=oldInData)
if self.error_handler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name))
self.error_handler.triggerOnError(inData=oldInData)
else:
outData = {}
for key, value in inData.items():
if key in self.inData:
if self.inData[key] != value:
if key in self.in_data:
if self.in_data[key] != value:
outData[key] = value
else:
outData[key] = value
......@@ -219,5 +261,3 @@ class PythonActor(AbstractActor):
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData))
downStreamActor.trigger(inData)
......@@ -27,6 +27,8 @@ from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
"""
TODO
"""
def __init__(self, parent=None, name='Start actor'):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -26,7 +26,7 @@ __date__ = "28/05/2019"
import logging
import multiprocessing
from pypushflow import UtilsMongoDb
# from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
......@@ -43,8 +43,8 @@ class StopActor(object):
def trigger(self, inData):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
if self.parent is not None and hasattr(self.parent, 'mongoId'):
UtilsMongoDb.closeMongo(self.parent.mongoId)
# if self.parent is not None and hasattr(self.parent, 'mongoId'):
# UtilsMongoDb.closeMongo(self.parent.mongoId)
if self.errorHandler is not None:
self.errorHandler.errorHandler.stopActor.trigger(inData)
else:
......
......@@ -24,21 +24,25 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import time
import pprint
import logging
from pypushflow.representation import Scheme
from pypushflow.PythonActor import PythonActor as ActorFactory
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.JoinActor import JoinUntilStopSignal
from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
class Workflow(object):
#from pypushflow import UtilsMongoDb
def __init__(self, name):
self.name = name
self.listOnErrorActor = []
self.mongoId = UtilsMongoDb.initMongo(name=name)
#self.mongoId = UtilsMongoDb.initMongo(name=name)
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
......@@ -52,3 +56,52 @@ class Workflow(object):
def getActorPath(self):
return '/' + self.name
class ProcessableWorkflow:
def __init__(self, scheme):
assert isinstance(scheme, Scheme)
self._representation = scheme
# first load node handlers if any
scheme.load_handlers()
self._actor_factory = {}
for node in self._representation.nodes:
self._actor_factory[node] = ActorFactory(node)
# deal with connect
for node in self._representation.nodes:
actor_factory = self._actor_factory[node]
for downstream_node in node.downstream_nodes:
downstream_actor_factory = self._actor_factory[downstream_node]
actor_factory.connect(downstream_actor_factory)
# add start actor
self._start_actor = StartActor()
for node in self._representation.start_nodes():
actor_factory = self._actor_factory[node]
self._start_actor.connect(actor_factory)
def connect_finals_nodes(actor):
# add end actor
for node in self._representation.final_nodes():
actor_factory = self._actor_factory[node]
actor_factory.connect(actor)
self._end_actor = StopActor()
if self.has_final_join():
self._join_actor = JoinUntilStopSignal('stop join')
connect_finals_nodes(self._join_actor)
self._join_actor.connect(self._end_actor)
else:
connect_finals_nodes(self._end_actor)
def has_final_join(self):
"""True if we need to send a 'end' signal before closing the workflow
This is needed for DataList and DataWatcher
"""
for node in self._representation.nodes:
if node.need_stop_join:
return True
return False
......@@ -28,6 +28,7 @@ from pypushflow.Submodel import Submodel
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
import unittest
class Workflow1(Workflow):
......@@ -153,3 +154,14 @@ class Workflow4(Workflow):
self.startActor.trigger(inData)
self.stopActor.join(timeout=5)
return self.stopActor.outData
def suite():
test_suite = unittest.TestSuite()
for ui in ():
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest="suite")
......@@ -19,7 +19,20 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__authors__ = ["O. Svensson", "H.Payno"]
__license__ = "MIT"
__date__ = "28/05/2019"
__date__ = "02/04/2020"
import unittest
from .test_representation import suite as test_representation_suite
#from .test_Workflows import suite as test_workflows_suite
def suite(loader=None):
test_suite = unittest.TestSuite()
test_suite.addTest(test_representation_suite)
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')
......@@ -26,9 +26,7 @@ __date__ = "28/05/2019"
import os
import logging
import unittest
from pypushflow import UtilsMongoDb
# from pypushflow import UtilsMongoDb
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('testPythonActor')
......@@ -42,25 +40,25 @@ class TestUtilsMongoDb(unittest.TestCase):
def tes_initMongo(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
# workflowId = UtilsMongoDb.initMongo(name=name)
# self.assertIsNotNone(workflowId)
def tes_initActor(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
# def tes_initActor(self):
# name = 'test_initMongo'
# workflowId = UtilsMongoDb.initMongo(name=name)
# self.assertIsNotNone(workflowId)
actorName = 'TestActor'
actorId = UtilsMongoDb.initActor(name=actorName, workflowId=workflowId)
self.assertIsNotNone(actorId)
# actorId = UtilsMongoDb.initActor(name=actorName, workflowId=workflowId)
# self.assertIsNotNone(actorId)
def test_addDataToActor(self):
# def test_addDataToActor(self):
name = 'test_initMongo'
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
actorName1 = 'TestActor1'
actorId1 = UtilsMongoDb.initActor(name=actorName1, workflowId=workflowId)
self.assertIsNotNone(actorId1)
actorName2 = 'TestActor2'
actorId2 = UtilsMongoDb.initActor(name=actorName2, workflowId=workflowId)
inData = {'a': 1}
UtilsMongoDb.addDataToActor(workflowId=workflowId, actorId=actorId1, actorData={'inData': inData})
\ No newline at end of file
# workflowId = UtilsMongoDb.initMongo(name=name)
# self.assertIsNotNone(workflowId)
# actorName1 = 'TestActor1'
# actorId1 = UtilsMongoDb.initActor(name=actorName1, workflowId=workflowId)
# self.assertIsNotNone(actorId1)
# actorName2 = 'TestActor2'
# actorId2 = UtilsMongoDb.initActor(name=actorName2, workflowId=workflowId)
# inData = {'a': 1}
# UtilsMongoDb.addDataToActor(workflowId=workflowId, actorId=actorId1, actorData={'inData': inData})
\ No newline at end of file
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2015-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""test representation is well taken into account"""
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "02/04/2020"
import unittest
from pypushflow.representation import Scheme, Node, Link
from pypushflow.Workflow import ProcessableWorkflow
def exec_(scheme, input_=None):
"""
Simple execution procedure of a workflow.
:param Scheme scheme:
:param input_: workflow input if any
"""
assert isinstance(scheme, ProcessableWorkflow)
scheme._start_actor.trigger(input_)
scheme._end_actor.join()
return scheme._end_actor.outData
class TestScheme(unittest.TestCase):
"""Test the Scheme class"""
def setUp(self):
self.node1 = Node(processing_pt='pypushflow.test.utils.test_function')
self.node2 = Node(processing_pt='pypushflow.test.utils.ProcessingClass1')
self.node3 = Node(processing_pt='pypushflow.test.utils.ProcessingClass2')
self.link1 = Link(self.node1, self.node2, 'data', 'data')
self.link2 = Link(self.node2, self.node3, 'data', 'data')
self.scheme = Scheme(nodes=(self.node1, self.node2, self.node3),
links=(self.link1, self.link2))
def testSchemeDefinition(self):
"""simple test of scheme definition"""
self.assertEqual(len(self.scheme.nodes), 3)