Commit 2f041943 authored by payno's avatar payno
Browse files

add 'representation' as a submodule

representation should be the common representation of an esrf workflow.

add also some unit test for checking compatible and correct execution of the workflow.
parent 41265756
......@@ -121,12 +121,14 @@ class AsyncFactory:
self.callback = callback
self.errorCallback = errorCallback
# self.pool = Edna2Pool(1)
self.pool = multiprocessing.Pool()
self.pool = multiprocessing.Pool(1)
# TODO: this shouldn't be limited to 1
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.node, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
print('args are', args)
if self.callback is None:
self.pool.apply_async(_exec_node, args, kwargs)
elif self.errorCallback is None:
......@@ -161,15 +163,39 @@ class ActorWrapper(object):
class PythonActor(AbstractActor):
"""
TODO
- find a way to avoid 'duplication' of input 'script/process_pt'
(should be done upstream)
- I don't think script should have a default value.
:param parent:
:param name:
:param errorHandler:
:param script: script originally used
:param node: Node from representation
"""
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None):
def __init__(self, parent=None, name='Python Actor', errorHandler=None,
script=None, node=None):
if node is not None:
if script is not None:
raise ValueError("if a process pointer is provided, you "
"shouldn't provide a script")
elif script is None:
raise ValueError("no script provide to the python actor")
AbstractActor.__init__(self, parent=parent, name=name)
self.error_handler = errorHandler
self.list_error_handler = []
# Import script
self.script = script
module = importlib.import_module(os.path.splitext(script)[0])
self.actor_wrapper = ActorWrapper(name, module.run)
if script is not None:
module = importlib.import_module(os.path.splitext(script)[0])
node = Node('.'.join((module.__name__, 'run')))
self.actor_wrapper = ActorWrapper(node=node)
else:
self.actor_wrapper = ActorWrapper(node=node)
self.in_data = None
self.out_data = None
self.async_factory = None
......@@ -184,6 +210,7 @@ class PythonActor(AbstractActor):
:param data: input data
"""
print('--- A; In data is {} ---'.format(in_data))
# cast data to dict if necessary
if hasattr(in_data, 'to_dict'):
_in_data = in_data.to_dict()
......@@ -191,7 +218,7 @@ class PythonActor(AbstractActor):
_in_data = in_data
self.in_data = _in_data
self.uploadDataToMongo(actorData={'inData': in_data}, script=self.script)
# self.uploadDataToMongo(actorData={'inData': in_data}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(in_data)))
if isinstance(in_data, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
......@@ -208,7 +235,10 @@ class PythonActor(AbstractActor):
self.async_factory = AsyncFactory(self.actor_wrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.error_handler)
self.async_factory.call(in_data)
print('indata is', in_data)
self.async_factory.call(self.actor_wrapper.node.process_pt,
in_data,
self.actor_wrapper.node.properties)
def errorHandler(self, exception):
logger.error('Error when running actor {0}!'.format(self.name))
......@@ -225,8 +255,11 @@ class PythonActor(AbstractActor):
self.error_handler.triggerOnError(inData)
def triggerDownStreamActor(self, inData={}):
print('--- B ---')
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
if isinstance(inData, WorkflowException):
print('1')
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData
oldInData = workflowException.data
......@@ -235,17 +268,19 @@ class PythonActor(AbstractActor):
'traceBack': workflowException.traceBack.split('\n'),
}
oldInData['WorkflowException'] = exceptionDict
self.uploadDataToMongo(actorData={
'stopTime': datetime.datetime.now(),
'status': 'error',
'outData': exceptionDict
})
# self.uploadDataToMongo(actorData={
# 'stopTime': datetime.datetime.now(),
# 'status': 'error',
# 'outData': exceptionDict
# })
for errorHandler in self.list_error_handler:
errorHandler.trigger(oldInData)
if self.error_handler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name))
self.error_handler.triggerOnError(inData=oldInData)
else:
print('2')
# TODO: what can be inData ? a list ? namedtuple ?
outData = {}
for key, value in inData.items():
if key in self.in_data:
......@@ -253,11 +288,12 @@ class PythonActor(AbstractActor):
outData[key] = value
else:
outData[key] = value
self.uploadDataToMongo(actorData={
'stopTime': datetime.datetime.now(),
'status': 'finished',
'outData': outData
})
# self.uploadDataToMongo(actorData={
# 'stopTime': datetime.datetime.now(),
# 'status': 'finished',
# 'outData': outData
# })
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData))
downStreamActor.trigger(inData)
......@@ -67,7 +67,12 @@ class ProcessableWorkflow:
self._actor_factory = {}
for node in self._representation.nodes:
self._actor_factory[node] = ActorFactory(node)
name = '-'.join((str(node.id), node._process_pt))
print(node)
self._actor_factory[node] = ActorFactory(parent=None,
name=name,
node=node,
errorHandler=None)
# deal with connect
for node in self._representation.nodes:
......
......@@ -68,14 +68,34 @@ class TestScheme(unittest.TestCase):
def testProcessing(self):
processable_workflow = ProcessableWorkflow(scheme=self.scheme)
out_ = exec_(scheme=processable_workflow)
print(out_)
self.assertFalse(True)
out_ = exec_(scheme=processable_workflow, input_={'data': 0,})
self.assertEqual(out_['data'], 7)
class TestNodeExecution(unittest.TestCase):
"""Insure Node.execute works"""
def testCase1(self):
node = Node(processing_pt='pypushflow.test.utils.test_function')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 2})
def testCase2(self):
node = Node(processing_pt='pypushflow.test.utils.ProcessingClass1')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 4})
def testCase3(self):
node = Node(processing_pt='pypushflow.test.utils.ProcessingClass2')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 1})
def suite():
test_suite = unittest.TestSuite()
for ui in (TestScheme, ):
for ui in (TestScheme, TestNodeExecution, ):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
......
......@@ -32,21 +32,34 @@ __date__ = "02/04/2020"
class ProcessingClass1:
"""Class to test the workflow. Simply add one to input"""
def process(self, input):
return input + 1
input['data'] += 4
return input
__call__ = process
class ProcessingClass2:
"""Class to test the workflow. Simply add self.value to input"""
# define handler has name, type, and handler
inputs = [
['data', dict, 'process'],
]
def __init__(self):
self.value = 0
self.value = 1
def process(self, input):
return input + 1
input['data'] += self.value
return input
def setConfiguration(self, config):
self.value = config['value']
__call__ = process
def test_function(input, config):
def test_function(input):
"""Class to test the workflow. Simply add self.value to input"""
return input + config['value']
input['data'] += 2
return input
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment