Commits (7)
......@@ -35,7 +35,7 @@ import importlib
from typing import Union
from importlib.machinery import SourceFileLoader
_logger = logging.getLogger(__file__)
_logger = logging.getLogger(__name__)
global next_node_free_idF
next_node_free_id = 0
......@@ -89,7 +89,7 @@ class Node(object):
_JSON_ERROR_HANDLER = 'error_handler'
def __init__(self, processing_pt, id: Union[None, int] = None,
properties:Union[None, dict] = None,
properties: Union[None, dict] = None,
error_handler=None): = get_next_node_free_id() if id is None else id
"""int of the node id"""
......@@ -106,9 +106,25 @@ class Node(object):
self._handlers = {}
"""handlers with link name as key and callback as value.
The default handler is store under the 'None' value"""
self._input_type_to_name = {}
"""link input type to a signal name"""
self._output_type_to_name = {}
"""link output type to a signal name"""
self._error_handler = error_handler
self.out_data = None
def get_input_channel_name(self, data_object):
for dtype, channel_name in self._input_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def get_output_channel_name(self, data_object):
for dtype, channel_name in self._output_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def handlers(self) -> dict:
return self._handlers
......@@ -148,6 +164,9 @@ class Node(object):
assert self._process_pt is not None
if callable(self._process_pt):
self.__process_instance = self._process_pt
......@@ -158,7 +177,7 @@ class Node(object):
sname = self._process_pt.rsplit('.')
if not (len(sname) > 1):
raise ValueError('Invalid name')
raise ValueError(self._process_pt + ' is not recognized as a valid name')
class_name = sname[-1]
del sname[-1]
module_name = '.'.join(sname)
......@@ -171,6 +190,7 @@ class Node(object):
class_or_fct = getattr(module, class_name)
if inspect.isclass(class_or_fct):
_logger.debug('instanciate ' + str(class_or_fct))
self.__process_instance = class_or_fct()
self.__process_instance = class_or_fct
......@@ -179,15 +199,34 @@ class Node(object):
# manage the case where a class has several input handler
if hasattr(self.__process_instance, 'inputs'):
for input_ in self.__process_instance.inputs:
input_name, input_type, input_handler = input_
self._handlers[input_name] = getattr(self.__process_instance, input_handler)
input_name, input_type, input_handler = input_[:3]
_logger.debug('[node: %s] add input_name: %s, '
'input_type: %s, input_handler: %s' % (self._process_pt, input_name, input_type, input_handler))
if str(input_type) in self._input_type_to_name:
raise ValueError('Several input name found for the '
'same input type. This case is not managed.')
self._input_type_to_name[input_type] = input_name
self._handlers[input_name] = input_handler
# self._handlers[input_name] = getattr(self.__process_instance, input_handler)
if hasattr(self.__process_instance, 'outputs'):
for output_ in self.__process_instance.outputs:
output_name, output_type, output_handler = output_[:3]
_logger.debug('[node: %s] add output_name: %s, '
'output_type: %s, output_handler: %s' % (
self._process_pt, input_name, input_type,
if output_type in self._output_type_to_name:
raise ValueError(
'Several output name found for the '
'same output type. This case is not managed.')
self._output_type_to_name[output_type] = output_name
if len(self._handlers) == 0:
raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt))
def execute(process_pt, properties: dict, input_name: str,
input_data: object):
input_data: object) -> tuple:
Create an instance of a node with 'process_pt' and execute it with the
given input_name, properties and input_data.
......@@ -198,20 +237,40 @@ class Node(object):
:param str input_name: name of the input data
:param input_data: input data :warning: Should be serializable
:return: output data. :warning: Should be serializable
:return: (output data type, output data)
:warning: Should be serializable
node = Node(processing_pt=process_pt, properties=properties)
node.load_handlers()'start execution of {0} with {1} through channel {2}'
''.format(str(process_pt), input_data, input_name))
if hasattr(node.__process_instance, 'set_properties'):
if input_name in node.handlers:
out = node.handlers[input_name](input_data)
if type(node.handlers[input_name]) is str:
out = getattr(node.__process_instance, node.handlers[input_name])(input_data)
out = node.handlers[input_name](input_data)
elif None in node.handlers:
out = node.handlers[None](input_data)
if type(node.handlers[None]) is str:
out = getattr(node.__process_instance, node.handlers[None])(input_data)
out = node.__process_instance(input_data)
err = '"{0}" channel is not managed by {1}'.format(input_name, node._process_pt)
raise KeyError(err)
# retrieve output channel
if out is None:
output_channel = None
raise KeyError(input_name, 'is not managed by', str(node._process_pt))
output_channel = node.get_output_channel_name(out)
if hasattr(out, 'to_dict'):
return out.to_dict()
return output_channel, out.to_dict()
return out
return output_channel, out
def to_json(self) -> dict:
......@@ -56,14 +56,14 @@ class Parser(object):
aliases = {}
import pypushflowaddon
import ppfaddon
except ImportError:
return aliases
import pkgutil
for importer, modname, ispkg in pkgutil.iter_modules(pypushflowaddon.__path__):
for importer, modname, ispkg in pkgutil.iter_modules(ppfaddon.__path__):
mod_name = '.'.join((pypushflowaddon.__name__, modname, 'aliases'))
mod_name = '.'.join((ppfaddon.__name__, modname, 'aliases'))
module = importlib.import_module(mod_name)
except ImportError:
_logger.warning(modname + ' does not fit the add-on design, skip it')