Commit aa9f581e authored by payno's avatar payno
Browse files

first modifications for tomwer

parent 811bb1f8
...@@ -92,25 +92,21 @@ class Edna2Pool(multiprocessing.pool.Pool): ...@@ -92,25 +92,21 @@ class Edna2Pool(multiprocessing.pool.Pool):
@trace_unhandled_exceptions @trace_unhandled_exceptions
def _exec_node(name: str, input_: dict, properties: dict): def _exec_node(name: str, channel_name: str, data: dict, properties: dict):
""" """
Execute a node from the name of the process, input of the process and Execute a node from the name of the process, input of the process and
properties of the process properties of the process
:param str name: full name of the process to execute :param str name: full name of the process to execute
:param input_: process input :param data: data to process
:param properties: process properties / settings :param properties: process properties / settings
:return: result of Node.execute :return: result of Node.execute
""" """
logger.debug(' '.join(('processing', str(name), 'with input', logger.debug('processing {0} on channel {1} with input {2} and {3} as '
str(input_), 'and', str(properties), 'as properties'))) 'properties'.format(str(name), str(channel_name),
if type(input_) is tuple: str(data), str(properties)))
data_name, data = input_
else:
data_name = None
data = input_
return Node.execute(process_pt=name, properties=properties, return Node.execute(process_pt=name, properties=properties,
input_data=data, input_name=data_name) input_data=data, input_name=channel_name)
class AsyncFactory: class AsyncFactory:
...@@ -152,12 +148,12 @@ class ActorWrapper(object): ...@@ -152,12 +148,12 @@ class ActorWrapper(object):
@trace_unhandled_exceptions @trace_unhandled_exceptions
def run(self, in_data): def run(self, in_data):
logger.debug('In actor wrapper for {node}'.format(node=self.node)) logger.debug('In actor wrapper for {node}'.format(node=self.node))
out_data = self.node.execute(in_data) output_channel_type, out_data = self.node.execute(in_data)
if isinstance(out_data, WorkflowException): if isinstance(out_data, WorkflowException):
return out_data return output_channel_type, out_data
else: else:
in_data.update(out_data) in_data.update(out_data)
return out_data return output_channel_type, out_data
class PythonActor(AbstractActor): class PythonActor(AbstractActor):
...@@ -201,7 +197,10 @@ class PythonActor(AbstractActor): ...@@ -201,7 +197,10 @@ class PythonActor(AbstractActor):
self.in_data = None self.in_data = None
self.out_data = None self.out_data = None
self.async_factory = None self.async_factory = None
# TODO: could or should the async factory be a borg idiom ?
def get_channel_name(self, type_):
print('******************')
return self.actor_wrapper.node.get_channel_name(type_)
def connectOnError(self, errorHandler): def connectOnError(self, errorHandler):
self.list_error_handler.append(errorHandler) self.list_error_handler.append(errorHandler)
...@@ -212,6 +211,8 @@ class PythonActor(AbstractActor): ...@@ -212,6 +211,8 @@ class PythonActor(AbstractActor):
:param data: input data :param data: input data
""" """
channel, in_data = in_data
logging.info(' on trigger channel is', channel)
# cast data to dict if necessary # cast data to dict if necessary
if hasattr(in_data, 'to_dict'): if hasattr(in_data, 'to_dict'):
_in_data = in_data.to_dict() _in_data = in_data.to_dict()
...@@ -219,7 +220,6 @@ class PythonActor(AbstractActor): ...@@ -219,7 +220,6 @@ class PythonActor(AbstractActor):
_in_data = in_data _in_data = in_data
self.in_data = _in_data self.in_data = _in_data
# self.uploadDataToMongo(actorData={'inData': in_data}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(in_data))) logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(in_data)))
if isinstance(in_data, WorkflowException): if isinstance(in_data, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name)) logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
...@@ -237,6 +237,7 @@ class PythonActor(AbstractActor): ...@@ -237,6 +237,7 @@ class PythonActor(AbstractActor):
callback=self.triggerDownStreamActor, callback=self.triggerDownStreamActor,
errorCallback=self.error_handler) errorCallback=self.error_handler)
self.async_factory.call(self.actor_wrapper.node.process_pt, self.async_factory.call(self.actor_wrapper.node.process_pt,
channel,
in_data, in_data,
self.actor_wrapper.node.properties) self.actor_wrapper.node.properties)
...@@ -254,8 +255,13 @@ class PythonActor(AbstractActor): ...@@ -254,8 +255,13 @@ class PythonActor(AbstractActor):
if self.error_handler is not None: if self.error_handler is not None:
self.error_handler.triggerOnError(inData) self.error_handler.triggerOnError(inData)
def triggerDownStreamActor(self, inData={}): def triggerDownStreamActor(self, output_last_processing=(None, {})):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name)) try:
output_type, inData = output_last_processing
except TypeError:
output_type, inData = None, output_last_processing
logger.debug('In triggerDownStreamActor for {0}, Output type is {1}, '
'inData is {2}'.format(self.name, output_type, inData))
if isinstance(inData, WorkflowException): if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor])) logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData workflowException = inData
...@@ -265,18 +271,13 @@ class PythonActor(AbstractActor): ...@@ -265,18 +271,13 @@ class PythonActor(AbstractActor):
'traceBack': workflowException.traceBack.split('\n'), 'traceBack': workflowException.traceBack.split('\n'),
} }
oldInData['WorkflowException'] = exceptionDict oldInData['WorkflowException'] = exceptionDict
# self.uploadDataToMongo(actorData={
# 'stopTime': datetime.datetime.now(),
# 'status': 'error',
# 'outData': exceptionDict
# })
for errorHandler in self.list_error_handler: for errorHandler in self.list_error_handler:
errorHandler.trigger(oldInData) errorHandler.trigger((None, oldInData))
if self.error_handler is not None: if self.error_handler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name)) logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name))
self.error_handler.triggerOnError(inData=oldInData) self.error_handler.triggerOnError(inData=(None, oldInData))
else: else:
# TODO: what can be inData ? a list ? namedtuple ? logger.info('--------------------------')
out_data = {} out_data = {}
for key, value in inData.items(): for key, value in inData.items():
if key in self.in_data: if key in self.in_data:
...@@ -284,12 +285,12 @@ class PythonActor(AbstractActor): ...@@ -284,12 +285,12 @@ class PythonActor(AbstractActor):
out_data[key] = value out_data[key] = value
else: else:
out_data[key] = value out_data[key] = value
# self.uploadDataToMongo(actorData={ logger.info('+++++++++++++++++++++++++++')
# 'stopTime': datetime.datetime.now(),
# 'status': 'finished',
# 'outData': outData
# })
for downStreamActor in self.listDownStreamActor: for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData)) print('aaaaaaaaaaaaaaaaa')
downStreamActor.trigger(inData) channel_name = downStreamActor.get_channel_name(output_type)
print('bbbbbbbbbbbbbbbbbbbb')
logger.debug('In trigger {0}, triggering actor {1}, channel {2} ; inData={3}'.format(self.name, downStreamActor.name, channel_name, inData))
print('cccccccccccccccccccc')
downStreamActor.trigger((channel_name, inData))
...@@ -53,4 +53,3 @@ class StopActor(object): ...@@ -53,4 +53,3 @@ class StopActor(object):
def join(self, timeout=7200): def join(self, timeout=7200):
self.lock.acquire(timeout=timeout) self.lock.acquire(timeout=timeout)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment