Commits (8)
...@@ -35,7 +35,7 @@ import importlib ...@@ -35,7 +35,7 @@ import importlib
from typing import Union from typing import Union
from importlib.machinery import SourceFileLoader from importlib.machinery import SourceFileLoader
_logger = logging.getLogger(__file__) _logger = logging.getLogger(__name__)
global next_node_free_idF global next_node_free_idF
next_node_free_id = 0 next_node_free_id = 0
...@@ -52,7 +52,7 @@ def trace_unhandled_exceptions(func): ...@@ -52,7 +52,7 @@ def trace_unhandled_exceptions(func):
@functools.wraps(func) @functools.wraps(func)
def wrapped_func(*args, **kwargs): def wrapped_func(*args, **kwargs):
try: try:
outData = func(*args, **kwargs) out_data = func(*args, **kwargs)
except Exception as e: except Exception as e:
_logger.exception(e) _logger.exception(e)
errorMessage = '{0}'.format(e) errorMessage = '{0}'.format(e)
...@@ -62,7 +62,7 @@ def trace_unhandled_exceptions(func): ...@@ -62,7 +62,7 @@ def trace_unhandled_exceptions(func):
traceBack=traceBack, traceBack=traceBack,
data=args[1] data=args[1]
) )
return outData return out_data
return wrapped_func return wrapped_func
...@@ -89,7 +89,7 @@ class Node(object): ...@@ -89,7 +89,7 @@ class Node(object):
_JSON_ERROR_HANDLER = 'error_handler' _JSON_ERROR_HANDLER = 'error_handler'
def __init__(self, processing_pt, id: Union[None, int] = None, def __init__(self, processing_pt, id: Union[None, int] = None,
properties:Union[None, dict] = None, properties: Union[None, dict] = None,
error_handler=None): error_handler=None):
self.id = get_next_node_free_id() if id is None else id self.id = get_next_node_free_id() if id is None else id
"""int of the node id""" """int of the node id"""
...@@ -106,8 +106,24 @@ class Node(object): ...@@ -106,8 +106,24 @@ class Node(object):
self._handlers = {} self._handlers = {}
"""handlers with link name as key and callback as value. """handlers with link name as key and callback as value.
The default handler is store under the 'None' value""" The default handler is store under the 'None' value"""
self._input_type_to_name = {}
"""link input type to a signal name"""
self._output_type_to_name = {}
"""link output type to a signal name"""
self._error_handler = error_handler self._error_handler = error_handler
self.outData = None self.out_data = None
def get_input_channel_name(self, data_object):
for dtype, channel_name in self._input_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def get_output_channel_name(self, data_object):
for dtype, channel_name in self._output_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
@property @property
def handlers(self) -> dict: def handlers(self) -> dict:
...@@ -117,6 +133,10 @@ class Node(object): ...@@ -117,6 +133,10 @@ class Node(object):
def process_pt(self): def process_pt(self):
return self._process_pt return self._process_pt
@property
def class_instance(self):
return self.__process_instance
def isfinal(self) -> bool: def isfinal(self) -> bool:
""" """
...@@ -144,6 +164,8 @@ class Node(object): ...@@ -144,6 +164,8 @@ class Node(object):
definition definition
""" """
self._handlers.clear() self._handlers.clear()
self._input_type_to_name.clear()
self._output_type_to_name.clear()
assert self._process_pt is not None assert self._process_pt is not None
if callable(self._process_pt): if callable(self._process_pt):
self.__process_instance = self._process_pt self.__process_instance = self._process_pt
...@@ -154,7 +176,7 @@ class Node(object): ...@@ -154,7 +176,7 @@ class Node(object):
else: else:
sname = self._process_pt.rsplit('.') sname = self._process_pt.rsplit('.')
if not (len(sname) > 1): if not (len(sname) > 1):
raise ValueError('Invalid name') raise ValueError(self._process_pt + ' is not recognized as a valid name')
class_name = sname[-1] class_name = sname[-1]
del sname[-1] del sname[-1]
module_name = '.'.join(sname) module_name = '.'.join(sname)
...@@ -167,6 +189,7 @@ class Node(object): ...@@ -167,6 +189,7 @@ class Node(object):
class_or_fct = getattr(module, class_name) class_or_fct = getattr(module, class_name)
if inspect.isclass(class_or_fct): if inspect.isclass(class_or_fct):
_logger.debug('instanciate ' + str(class_or_fct))
self.__process_instance = class_or_fct() self.__process_instance = class_or_fct()
else: else:
self.__process_instance = class_or_fct self.__process_instance = class_or_fct
...@@ -175,15 +198,34 @@ class Node(object): ...@@ -175,15 +198,34 @@ class Node(object):
# manage the case where a class has several input handler # manage the case where a class has several input handler
if hasattr(self.__process_instance, 'inputs'): if hasattr(self.__process_instance, 'inputs'):
for input_ in self.__process_instance.inputs: for input_ in self.__process_instance.inputs:
input_name, input_type, input_handler = input_ input_name, input_type, input_handler = input_[:3]
_logger.debug('[node: %s] add input_name: %s, '
'input_type: %s, input_handler: %s' % (self._process_pt, input_name, input_type, input_handler))
if str(input_type) in self._input_type_to_name:
raise ValueError('Several input name found for the '
'same input type. This case is not managed.')
self._input_type_to_name[input_type] = input_name
self._handlers[input_name] = input_handler self._handlers[input_name] = input_handler
# self._handlers[input_name] = getattr(self.__process_instance, input_handler)
if hasattr(self.__process_instance, 'outputs'):
for output_ in self.__process_instance.outputs:
output_name, output_type, output_handler = output_[:3]
_logger.debug('[node: %s] add output_name: %s, '
'output_type: %s, output_handler: %s' % (
self._process_pt, input_name, input_type,
input_handler))
if output_type in self._output_type_to_name:
raise ValueError(
'Several output name found for the '
'same output type. This case is not managed.')
self._output_type_to_name[output_type] = output_name
if len(self._handlers) == 0: if len(self._handlers) == 0:
raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt)) raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt))
@staticmethod @staticmethod
def execute(process_pt, properties: dict, input_name: str, def execute(process_pt, properties: dict, input_name: str,
input_data: object): input_data: object) -> tuple:
""" """
Create an instance of a node with 'process_pt' and execute it with the Create an instance of a node with 'process_pt' and execute it with the
given input_name, properties and input_data. given input_name, properties and input_data.
...@@ -194,20 +236,36 @@ class Node(object): ...@@ -194,20 +236,36 @@ class Node(object):
:param str input_name: name of the input data :param str input_name: name of the input data
:param input_data: input data :warning: Should be serializable :param input_data: input data :warning: Should be serializable
:return: output data. :warning: Should be serializable :return: (output data type, output data)
:warning: Should be serializable
""" """
node = Node(processing_pt=process_pt, properties=properties) node = Node(processing_pt=process_pt, properties=properties)
node.load_handlers() node.load_handlers()
logging.info('start execution of {0} with {1} through channel {2}'
''.format(str(process_pt), input_data, input_name))
if hasattr(node.__process_instance, 'set_properties'):
node.__process_instance.set_properties(properties)
else:
raise ValueError('no function set properties found')
if input_name in node.handlers: if input_name in node.handlers:
out = node.handlers[input_name](input_data) out = getattr(node.__process_instance, node.handlers[input_name])(input_data)
elif None in node.handlers: elif None in node.handlers:
out = node.handlers[None](input_data) out = getattr(node.__process_instance, node.handlers[None])(input_data)
else:
err = '"{0}" channel is not managed by {1}'.format(input_name, node._process_pt)
raise KeyError(err)
# retrieve output channel
if out is None:
output_channel = None
else: else:
raise KeyError(input_name, 'is not managed by', str(node._process_pt)) output_channel = node.get_output_channel_name(out)
if hasattr(out, 'to_dict'): if hasattr(out, 'to_dict'):
return out.to_dict() return output_channel, out.to_dict()
else: else:
return out return output_channel, out
def to_json(self) -> dict: def to_json(self) -> dict:
""" """
......
...@@ -56,14 +56,14 @@ class Parser(object): ...@@ -56,14 +56,14 @@ class Parser(object):
""" """
aliases = {} aliases = {}
try: try:
import pypushflowaddon import ppfaddon
except ImportError: except ImportError:
return aliases return aliases
else: else:
import pkgutil import pkgutil
for importer, modname, ispkg in pkgutil.iter_modules(pypushflowaddon.__path__): for importer, modname, ispkg in pkgutil.iter_modules(ppfaddon.__path__):
try: try:
mod_name = '.'.join((pypushflowaddon.__name__, modname, 'aliases')) mod_name = '.'.join((ppfaddon.__name__, modname, 'aliases'))
module = importlib.import_module(mod_name) module = importlib.import_module(mod_name)
except ImportError: except ImportError:
_logger.warning(modname + ' does not fit the add-on design, skip it') _logger.warning(modname + ' does not fit the add-on design, skip it')
......
...@@ -154,7 +154,7 @@ class Scheme(object): ...@@ -154,7 +154,7 @@ class Scheme(object):
""" """
:return: list of nodes starting the workflow. Those does not require :return: list of nodes starting the workflow. Those does not require
any input_data any input_data.
:rtype: list :rtype: list
""" """
res = [] res = []
...@@ -302,9 +302,10 @@ class Scheme(object): ...@@ -302,9 +302,10 @@ class Scheme(object):
:param str json_file_path: path to the json file containing the scheme :param str json_file_path: path to the json file containing the scheme
description description
:return: Scheme fitting the json description contains if the file. :return: Scheme fitting the json description contains if the file.
If description is incomplete, return None If description is incomplete, return None
:rtype:Union[Scheme,None] :rtype: Union[Scheme, None]
""" """
try: try:
with open(json_file_path, 'r') as json_file: with open(json_file_path, 'r') as json_file:
...@@ -378,7 +379,7 @@ class Scheme(object): ...@@ -378,7 +379,7 @@ class Scheme(object):
def scheme_to_etree(self, data_format:str = "literal", def scheme_to_etree(self, data_format:str = "literal",
pickle_fallback: bool = False): pickle_fallback: bool = False):
""" """
Return an `xml.etree.ElementTree` representation of the `scheme. Return an 'xml.etree.ElementTree' representation of the scheme.
""" """
builder = TreeBuilder(element_factory=Element) builder = TreeBuilder(element_factory=Element)
builder.start("scheme", {"version": "2.0", builder.start("scheme", {"version": "2.0",
...@@ -562,9 +563,10 @@ class SubScheme(Scheme, Node): ...@@ -562,9 +563,10 @@ class SubScheme(Scheme, Node):
def load_from_json(json_data: dict): def load_from_json(json_data: dict):
""" """
:param json_data: scheme description :param json_data: scheme description.
:raise ValueError: if sink or source channel missing or if link id :raise ValueError: if sink or source channel missing or if link id
missing or if sink or source node missing missing or if sink or source node missing.
""" """
nodes, links, sub_schemes, title, description = Scheme.load_scheme_info_from_json(json_data) nodes, links, sub_schemes, title, description = Scheme.load_scheme_info_from_json(json_data)
_id, _properties, _process_pt = Node.load_node_info_from_json(json_data) _id, _properties, _process_pt = Node.load_node_info_from_json(json_data)
......