Commit 7b582358 authored by payno's avatar payno
Browse files

[node] reword node to access and use channels names

parent d378f770
......@@ -89,7 +89,7 @@ class Node(object):
_JSON_ERROR_HANDLER = 'error_handler'
def __init__(self, processing_pt, id: Union[None, int] = None,
properties:Union[None, dict] = None,
properties: Union[None, dict] = None,
error_handler=None): = get_next_node_free_id() if id is None else id
"""int of the node id"""
......@@ -106,9 +106,25 @@ class Node(object):
self._handlers = {}
"""handlers with link name as key and callback as value.
The default handler is store under the 'None' value"""
self._input_type_to_name = {}
"""link input type to a signal name"""
self._output_type_to_name = {}
"""link output type to a signal name"""
self._error_handler = error_handler
self.outData = None
def get_input_channel_name(self, data_object):
for dtype, channel_name in self._input_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def get_output_channel_name(self, data_object):
for dtype, channel_name in self._output_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def handlers(self) -> dict:
return self._handlers
......@@ -144,6 +160,8 @@ class Node(object):
assert self._process_pt is not None
if callable(self._process_pt):
self.__process_instance = self._process_pt
......@@ -154,7 +172,7 @@ class Node(object):
sname = self._process_pt.rsplit('.')
if not (len(sname) > 1):
raise ValueError('Invalid name')
raise ValueError(self._process_pt + ' is not recognized as a valid name')
class_name = sname[-1]
del sname[-1]
module_name = '.'.join(sname)
......@@ -167,6 +185,7 @@ class Node(object):
class_or_fct = getattr(module, class_name)
if inspect.isclass(class_or_fct):
_logger.debug('instanciate ' + str(class_or_fct))
self.__process_instance = class_or_fct()
self.__process_instance = class_or_fct
......@@ -175,15 +194,33 @@ class Node(object):
# manage the case where a class has several input handler
if hasattr(self.__process_instance, 'inputs'):
for input_ in self.__process_instance.inputs:
input_name, input_type, input_handler = input_
input_name, input_type, input_handler = input_[:3]
_logger.debug('[node: %s] add input_name: %s, '
'input_type: %s, input_handler: %s' % (self._process_pt, input_name, input_type, input_handler))
if str(input_type) in self._input_type_to_name:
raise ValueError('Several input name found for the '
'same input type. This case is not managed.')
self._input_type_to_name[input_type] = input_name
self._handlers[input_name] = input_handler
if hasattr(self.__process_instance, 'outputs'):
for output_ in self.__process_instance.outputs:
output_name, output_type, output_handler = output_[:3]
_logger.debug('[node: %s] add output_name: %s, '
'output_type: %s, output_handler: %s' % (
self._process_pt, input_name, input_type,
if output_type in self._output_type_to_name:
raise ValueError(
'Several output name found for the '
'same output type. This case is not managed.')
self._output_type_to_name[output_type] = output_name
if len(self._handlers) == 0:
raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt))
def execute(process_pt, properties: dict, input_name: str,
input_data: object):
input_data: object) -> tuple:
Create an instance of a node with 'process_pt' and execute it with the
given input_name, properties and input_data.
......@@ -194,20 +231,34 @@ class Node(object):
:param str input_name: name of the input data
:param input_data: input data :warning: Should be serializable
:return: output data. :warning: Should be serializable
:return: (output data type, output data)
:warning: Should be serializable
node = Node(processing_pt=process_pt, properties=properties)
node.load_handlers()'start execution of {0} with {1} through channel {2}'
''.format(str(process_pt), input_data, input_name))
if input_name in node.handlers:
out = node.handlers[input_name](input_data)
out = getattr(node.__process_instance, node.handlers[input_name])(input_data)
elif None in node.handlers:
out = node.handlers[None](input_data)
out = getattr(node.__process_instance, node.handlers[None])(input_data)
raise KeyError(input_name, 'is not managed by', str(node._process_pt))
err = '"{0}" channel is not managed by {1}'.format(input_name, node._process_pt)
raise KeyError(err)
# retrieve output channel
if out is None:
output_channel = None
output_channel = node.get_output_channel_name(out)
_logger.warning('--- output channel from {0} is {1}'.format(process_pt, output_channel))
_logger.warning('--- type(out) is {0}'.format(type(out)))
if hasattr(out, 'to_dict'):
return out.to_dict()
return output_channel, out.to_dict()
return out
return output_channel, out
def to_json(self) -> dict:
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment