Commit 3cd1ff43 authored by payno's avatar payno
Browse files

Merge branch 'integrate_tomwer' into 'master'

Integrate tomwer

See merge request !4
parents 9c48a9f0 25b2a077
Pipeline #30869 passed with stages
in 1 minute and 14 seconds
......@@ -36,14 +36,18 @@ class JoinActor(AbstractActor):
def increaseNumberOfThreads(self):
self.numberOfThreads += 1
def trigger(self, inData):
self.listInData.append(inData)
def trigger(self, in_data):
if in_data is None:
channel = data = None
else:
channel, data = in_data
self.listInData.append(data)
if len(self.listInData) == self.numberOfThreads:
newInData = {}
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
actor.trigger((channel, data))
class JoinUntilStopSignal(AbstractActor):
......
......@@ -26,16 +26,15 @@ __date__ = "28/05/2019"
import os
import pprint
import logging
import datetime
import traceback
import functools
import importlib
import multiprocessing.pool
import multiprocessing
from multiprocessing.pool import Pool as _Pool
from pypushflow.AbstractActor import AbstractActor
from pypushflow.representation.scheme.node import Node
logger = logging.getLogger('pypushflow')
logger = logging.getLogger(__name__)
class WorkflowException(Exception):
......@@ -43,6 +42,8 @@ class WorkflowException(Exception):
def __init__(self, errorMessage="", traceBack="", data={}, msg=None):
super(WorkflowException, self).__init__(msg)
self.errorMessage = errorMessage
if data is None:
data = {}
self.data = data
self.traceBack = traceBack
......@@ -83,7 +84,7 @@ class NoDaemonProcess(multiprocessing.Process):
# because the latter is only a wrapper function, not a proper class.
class Edna2Pool(multiprocessing.pool.Pool):
class Pool(_Pool):
Process = NoDaemonProcess
#
......@@ -92,25 +93,21 @@ class Edna2Pool(multiprocessing.pool.Pool):
@trace_unhandled_exceptions
def _exec_node(name: str, input_: dict, properties: dict):
def _exec_node(name: str, channel_name: str, data: dict, properties: dict):
"""
Execute a node from the name of the process, input of the process and
properties of the process
:param str name: full name of the process to execute
:param input_: process input
:param data: data to process
:param properties: process properties / settings
:return: result of Node.execute
"""
logger.debug(' '.join(('processing', str(name), 'with input',
str(input_), 'and', str(properties), 'as properties')))
if type(input_) is tuple:
data_name, data = input_
else:
data_name = None
data = input_
logger.debug('processing {0} on channel {1} with input {2} and {3} as '
'properties'.format(str(name), str(channel_name),
str(data), str(properties)))
return Node.execute(process_pt=name, properties=properties,
input_data=data, input_name=data_name)
input_data=data, input_name=channel_name)
class AsyncFactory:
......@@ -121,9 +118,7 @@ class AsyncFactory:
self.node = node
self.callback = callback
self.errorCallback = errorCallback
# self.pool = Edna2Pool(1)
self.pool = multiprocessing.Pool(1)
# TODO: this shouldn't be limited to 1
self.pool = Pool(1)
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
......@@ -152,12 +147,12 @@ class ActorWrapper(object):
@trace_unhandled_exceptions
def run(self, in_data):
logger.debug('In actor wrapper for {node}'.format(node=self.node))
out_data = self.node.execute(in_data)
output_channel_name, out_data = self.node.execute(in_data)
if isinstance(out_data, WorkflowException):
return out_data
return output_channel_name, out_data
else:
in_data.update(out_data)
return out_data
return output_channel_name, out_data
class PythonActor(AbstractActor):
......@@ -201,7 +196,12 @@ class PythonActor(AbstractActor):
self.in_data = None
self.out_data = None
self.async_factory = None
# TODO: could or should the async factory be a borg idiom ?
def get_input_channel_name(self, type_):
return self.actor_wrapper.node.get_input_channel_name(type_)
def get_output_channel_name(self, type_):
return self.actor_wrapper.node.get_output_channel_name(type_)
def connectOnError(self, errorHandler):
self.list_error_handler.append(errorHandler)
......@@ -212,14 +212,13 @@ class PythonActor(AbstractActor):
:param data: input data
"""
channel, in_data = in_data
logging.info('On trigger channel is ' + str(channel))
# cast data to dict if necessary
if hasattr(in_data, 'to_dict'):
_in_data = in_data.to_dict()
else:
_in_data = in_data
in_data = in_data.to_dict()
self.in_data = _in_data
# self.uploadDataToMongo(actorData={'inData': in_data}, script=self.script)
self.in_data = in_data
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(in_data)))
if isinstance(in_data, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
......@@ -237,6 +236,7 @@ class PythonActor(AbstractActor):
callback=self.triggerDownStreamActor,
errorCallback=self.error_handler)
self.async_factory.call(self.actor_wrapper.node.process_pt,
channel,
in_data,
self.actor_wrapper.node.properties)
......@@ -254,8 +254,16 @@ class PythonActor(AbstractActor):
if self.error_handler is not None:
self.error_handler.triggerOnError(inData)
def triggerDownStreamActor(self, inData={}):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
def triggerDownStreamActor(self, output_last_processing=(None, {})):
logging.warning('---------------------')
logging.warning(output_last_processing)
logging.warning('---------------------')
try:
output_channel, inData = output_last_processing
except TypeError:
output_channel, inData = None, output_last_processing
logger.info('In triggerDownStreamActor for {0}, Output channel is {1}, '
'inData is {2}'.format(self.name, output_channel, inData))
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData
......@@ -264,32 +272,22 @@ class PythonActor(AbstractActor):
'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack.split('\n'),
}
logger.warning('oldInData type: {}, value: {}'.format(type(oldInData), oldInData))
oldInData['WorkflowException'] = exceptionDict
# self.uploadDataToMongo(actorData={
# 'stopTime': datetime.datetime.now(),
# 'status': 'error',
# 'outData': exceptionDict
# })
for errorHandler in self.list_error_handler:
errorHandler.trigger(oldInData)
errorHandler.trigger((None, oldInData))
if self.error_handler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name))
self.error_handler.triggerOnError(inData=oldInData)
self.error_handler.triggerOnError(inData=(None, oldInData))
else:
# TODO: what can be inData ? a list ? namedtuple ?
out_data = {}
if inData is not None:
for key, value in inData.items():
if key in self.in_data:
if self.in_data[key] != value:
out_data[key] = value
else:
out_data[key] = value
# self.uploadDataToMongo(actorData={
# 'stopTime': datetime.datetime.now(),
# 'status': 'finished',
# 'outData': outData
# })
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData))
downStreamActor.trigger(inData)
downStreamActor.trigger((output_channel, inData))
......@@ -53,4 +53,3 @@ class StopActor(object):
def join(self, timeout=7200):
self.lock.acquire(timeout=timeout)
Subproject commit ac97541efc62b1e4e2640ec568d33af23c2f417f
Subproject commit fed57846a1acb8c3f901081a74cf20a7ce1f828b
......@@ -49,9 +49,9 @@ class TestPythonActor(unittest.TestCase):
'name': 'Ragnar'
}
actor.connect(stopActor)
actor.trigger(inData)
actor.trigger((None, inData))
stopActor.join(timeout=10)
out_data = stopActor.out_data
out_channel, out_data = stopActor.out_data
self.assertIsNotNone(out_data)
self.assertEqual(out_data['reply'], 'Hello Ragnar!')
......@@ -67,11 +67,12 @@ class TestPythonActor(unittest.TestCase):
actor.connect(stopActor)
actor.connectOnError(errorHandler)
errorHandler.connect(stopActor)
actor.trigger(inData)
actor.trigger((None, inData))
stopActor.join(timeout=5)
out_data = stopActor.out_data
out_channel, out_data = stopActor.out_data
self.assertIsNotNone(out_data)
self.assertTrue('WorkflowException' in out_data)
self.assertEqual(out_channel, None)
def test_ForkAndJoinActors(self):
start = StartActor()
......@@ -91,9 +92,9 @@ class TestPythonActor(unittest.TestCase):
joinActor.connect(stop)
# Run
inData = {'a': 1}
start.trigger(inData)
start.trigger((None, inData))
stop.join(timeout=5)
out_data = stop.out_data
out_channel, out_data = stop.out_data
self.assertIsNotNone(out_data)
logger.info(out_data)
......
......@@ -68,7 +68,7 @@ class TestScheme(unittest.TestCase):
def testProcessing(self):
processable_workflow = ProcessableWorkflow(scheme=self.scheme)
out_ = exec_(scheme=processable_workflow, input_={'data': 0,})
channel_out, out_ = exec_(scheme=processable_workflow, input_=(None, {'data': 0,}))
self.assertEqual(out_['data'], 7)
......@@ -79,28 +79,28 @@ class TestNodeExecution(unittest.TestCase):
node = Node(processing_pt='pypushflow.test.utils.test_function')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 2})
self.assertEqual(res, (None, {'data': 2}))
def testCase2(self):
'test that an callable class can be executed from her name'
node = Node(processing_pt='pypushflow.test.utils.ProcessingClass1')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 4})
self.assertEqual(res, (None, {'data': 4}))
def testCase3(self):
"""Test that a class with handler can be executed"""
node = Node(processing_pt='pypushflow.test.utils.ProcessingClass2')
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'data': 0})
self.assertEqual(res, {'data': 1})
self.assertEqual(res, (None, {'data': 1}))
def testCase4(self):
script = 'pypushflow.test.pythonActorTest.run'
node = Node(processing_pt=script)
res = node.execute(node.process_pt, properties={}, input_name='data',
input_data={'name': 'pythonActorTest'})
self.assertEqual(res, {'reply': 'Hello pythonActorTest!'})
self.assertEqual(res, (None, {'reply': 'Hello pythonActorTest!'}))
def suite():
......
......@@ -332,8 +332,6 @@ if __name__ == "__main__": # Needed for multiprocessing support on Windows
help="Increase verbosity. Option -v prints additional " +
"INFO messages. Use -vv for full verbosity, " +
"including debug messages and test help strings.")
parser.add_argument("--qt-binding", dest="qt_binding", default=None,
help="Force using a Qt binding, from 'PyQt4', 'PyQt5', or 'PySide'")
default_test_name = "%s.test.suite" % PROJECT_NAME
parser.add_argument("test_name", nargs='*',
......@@ -356,30 +354,6 @@ if __name__ == "__main__": # Needed for multiprocessing support on Windows
test_verbosity = 2
use_buffer = False
if options.qt_binding:
binding = options.qt_binding.lower()
if binding == "pyqt4":
logger.info("Force using PyQt4")
if sys.version < "3.0.0":
try:
import sip
sip.setapi("QString", 2)
sip.setapi("QVariant", 2)
except Exception:
logger.warning("Cannot set sip API")
import PyQt4.QtCore # noqa
elif binding == "pyqt5":
logger.info("Force using PyQt5")
import PyQt5.QtCore # noqa
elif binding == "pyside":
logger.info("Force using PySide")
import PySide.QtCore # noqa
elif binding == "pyside2":
logger.info("Force using PySide2")
import PySide2.QtCore # noqa
else:
raise ValueError("Qt binding '%s' is unknown" % options.qt_binding)
# Run the tests
runnerArgs = {}
runnerArgs["verbosity"] = test_verbosity
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment