Commit 7cc4f442 authored by payno's avatar payno
Browse files

one nore step for connecting it to tomwer

parent aa9f581e
......@@ -26,10 +26,8 @@ __date__ = "28/05/2019"
import os
import pprint
import logging
import datetime
import traceback
import functools
import importlib
import multiprocessing.pool
from pypushflow.AbstractActor import AbstractActor
from pypushflow.representation.scheme.node import Node
......@@ -148,12 +146,12 @@ class ActorWrapper(object):
@trace_unhandled_exceptions
def run(self, in_data):
logger.debug('In actor wrapper for {node}'.format(node=self.node))
output_channel_type, out_data = self.node.execute(in_data)
output_channel_name, out_data = self.node.execute(in_data)
if isinstance(out_data, WorkflowException):
return output_channel_type, out_data
return output_channel_name, out_data
else:
in_data.update(out_data)
return output_channel_type, out_data
return output_channel_name, out_data
class PythonActor(AbstractActor):
......@@ -198,9 +196,11 @@ class PythonActor(AbstractActor):
self.out_data = None
self.async_factory = None
def get_channel_name(self, type_):
print('******************')
return self.actor_wrapper.node.get_channel_name(type_)
def get_input_channel_name(self, type_):
return self.actor_wrapper.node.get_input_channel_name(type_)
def get_output_channel_name(self, type_):
return self.actor_wrapper.node.get_output_channel_name(type_)
def connectOnError(self, errorHandler):
self.list_error_handler.append(errorHandler)
......@@ -256,12 +256,15 @@ class PythonActor(AbstractActor):
self.error_handler.triggerOnError(inData)
def triggerDownStreamActor(self, output_last_processing=(None, {})):
logging.warning('---------------------')
logging.warning(output_last_processing)
logging.warning('---------------------')
try:
output_type, inData = output_last_processing
output_channel, inData = output_last_processing
except TypeError:
output_type, inData = None, output_last_processing
logger.debug('In triggerDownStreamActor for {0}, Output type is {1}, '
'inData is {2}'.format(self.name, output_type, inData))
output_channel, inData = None, output_last_processing
logger.info('In triggerDownStreamActor for {0}, Output channel is {1}, '
'inData is {2}'.format(self.name, output_channel, inData))
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData
......@@ -277,7 +280,6 @@ class PythonActor(AbstractActor):
logger.error('Trigger on error on errorHandler "{0}"'.format(self.error_handler.name))
self.error_handler.triggerOnError(inData=(None, oldInData))
else:
logger.info('--------------------------')
out_data = {}
for key, value in inData.items():
if key in self.in_data:
......@@ -285,12 +287,6 @@ class PythonActor(AbstractActor):
out_data[key] = value
else:
out_data[key] = value
logger.info('+++++++++++++++++++++++++++')
for downStreamActor in self.listDownStreamActor:
print('aaaaaaaaaaaaaaaaa')
channel_name = downStreamActor.get_channel_name(output_type)
print('bbbbbbbbbbbbbbbbbbbb')
logger.debug('In trigger {0}, triggering actor {1}, channel {2} ; inData={3}'.format(self.name, downStreamActor.name, channel_name, inData))
print('cccccccccccccccccccc')
downStreamActor.trigger((channel_name, inData))
downStreamActor.trigger((output_channel, inData))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment