Commits (8)
......@@ -35,7 +35,7 @@ import importlib
from typing import Union
from importlib.machinery import SourceFileLoader
_logger = logging.getLogger(__file__)
_logger = logging.getLogger(__name__)
global next_node_free_idF
next_node_free_id = 0
......@@ -52,7 +52,7 @@ def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
outData = func(*args, **kwargs)
out_data = func(*args, **kwargs)
except Exception as e:
_logger.exception(e)
errorMessage = '{0}'.format(e)
......@@ -62,7 +62,7 @@ def trace_unhandled_exceptions(func):
traceBack=traceBack,
data=args[1]
)
return outData
return out_data
return wrapped_func
......@@ -89,7 +89,7 @@ class Node(object):
_JSON_ERROR_HANDLER = 'error_handler'
def __init__(self, processing_pt, id: Union[None, int] = None,
properties:Union[None, dict] = None,
properties: Union[None, dict] = None,
error_handler=None):
self.id = get_next_node_free_id() if id is None else id
"""int of the node id"""
......@@ -106,8 +106,24 @@ class Node(object):
self._handlers = {}
"""handlers with link name as key and callback as value.
The default handler is store under the 'None' value"""
self._input_type_to_name = {}
"""link input type to a signal name"""
self._output_type_to_name = {}
"""link output type to a signal name"""
self._error_handler = error_handler
self.outData = None
self.out_data = None
def get_input_channel_name(self, data_object):
for dtype, channel_name in self._input_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
def get_output_channel_name(self, data_object):
for dtype, channel_name in self._output_type_to_name.items():
if isinstance(data_object, dtype):
return channel_name
return None
@property
def handlers(self) -> dict:
......@@ -117,6 +133,10 @@ class Node(object):
def process_pt(self):
return self._process_pt
@property
def class_instance(self):
return self.__process_instance
def isfinal(self) -> bool:
"""
......@@ -144,6 +164,8 @@ class Node(object):
definition
"""
self._handlers.clear()
self._input_type_to_name.clear()
self._output_type_to_name.clear()
assert self._process_pt is not None
if callable(self._process_pt):
self.__process_instance = self._process_pt
......@@ -154,7 +176,7 @@ class Node(object):
else:
sname = self._process_pt.rsplit('.')
if not (len(sname) > 1):
raise ValueError('Invalid name')
raise ValueError(self._process_pt + ' is not recognized as a valid name')
class_name = sname[-1]
del sname[-1]
module_name = '.'.join(sname)
......@@ -167,6 +189,7 @@ class Node(object):
class_or_fct = getattr(module, class_name)
if inspect.isclass(class_or_fct):
_logger.debug('instanciate ' + str(class_or_fct))
self.__process_instance = class_or_fct()
else:
self.__process_instance = class_or_fct
......@@ -175,15 +198,34 @@ class Node(object):
# manage the case where a class has several input handler
if hasattr(self.__process_instance, 'inputs'):
for input_ in self.__process_instance.inputs:
input_name, input_type, input_handler = input_
input_name, input_type, input_handler = input_[:3]
_logger.debug('[node: %s] add input_name: %s, '
'input_type: %s, input_handler: %s' % (self._process_pt, input_name, input_type, input_handler))
if str(input_type) in self._input_type_to_name:
raise ValueError('Several input name found for the '
'same input type. This case is not managed.')
self._input_type_to_name[input_type] = input_name
self._handlers[input_name] = input_handler
# self._handlers[input_name] = getattr(self.__process_instance, input_handler)
if hasattr(self.__process_instance, 'outputs'):
for output_ in self.__process_instance.outputs:
output_name, output_type, output_handler = output_[:3]
_logger.debug('[node: %s] add output_name: %s, '
'output_type: %s, output_handler: %s' % (
self._process_pt, input_name, input_type,
input_handler))
if output_type in self._output_type_to_name:
raise ValueError(
'Several output name found for the '
'same output type. This case is not managed.')
self._output_type_to_name[output_type] = output_name
if len(self._handlers) == 0:
raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt))
@staticmethod
def execute(process_pt, properties: dict, input_name: str,
input_data: object):
input_data: object) -> tuple:
"""
Create an instance of a node with 'process_pt' and execute it with the
given input_name, properties and input_data.
......@@ -194,20 +236,36 @@ class Node(object):
:param str input_name: name of the input data
:param input_data: input data :warning: Should be serializable
:return: output data. :warning: Should be serializable
:return: (output data type, output data)
:warning: Should be serializable
"""
node = Node(processing_pt=process_pt, properties=properties)
node.load_handlers()
logging.info('start execution of {0} with {1} through channel {2}'
''.format(str(process_pt), input_data, input_name))
if hasattr(node.__process_instance, 'set_properties'):
node.__process_instance.set_properties(properties)
else:
raise ValueError('no function set properties found')
if input_name in node.handlers:
out = node.handlers[input_name](input_data)
out = getattr(node.__process_instance, node.handlers[input_name])(input_data)
elif None in node.handlers:
out = node.handlers[None](input_data)
out = getattr(node.__process_instance, node.handlers[None])(input_data)
else:
err = '"{0}" channel is not managed by {1}'.format(input_name, node._process_pt)
raise KeyError(err)
# retrieve output channel
if out is None:
output_channel = None
else:
raise KeyError(input_name, 'is not managed by', str(node._process_pt))
output_channel = node.get_output_channel_name(out)
if hasattr(out, 'to_dict'):
return out.to_dict()
return output_channel, out.to_dict()
else:
return out
return output_channel, out
def to_json(self) -> dict:
"""
......
......@@ -56,14 +56,14 @@ class Parser(object):
"""
aliases = {}
try:
import pypushflowaddon
import ppfaddon
except ImportError:
return aliases
else:
import pkgutil
for importer, modname, ispkg in pkgutil.iter_modules(pypushflowaddon.__path__):
for importer, modname, ispkg in pkgutil.iter_modules(ppfaddon.__path__):
try:
mod_name = '.'.join((pypushflowaddon.__name__, modname, 'aliases'))
mod_name = '.'.join((ppfaddon.__name__, modname, 'aliases'))
module = importlib.import_module(mod_name)
except ImportError:
_logger.warning(modname + ' does not fit the add-on design, skip it')
......
......@@ -154,7 +154,7 @@ class Scheme(object):
"""
:return: list of nodes starting the workflow. Those does not require
any input_data
any input_data.
:rtype: list
"""
res = []
......@@ -302,9 +302,10 @@ class Scheme(object):
:param str json_file_path: path to the json file containing the scheme
description
:return: Scheme fitting the json description contains if the file.
If description is incomplete, return None
:rtype:Union[Scheme,None]
:rtype: Union[Scheme, None]
"""
try:
with open(json_file_path, 'r') as json_file:
......@@ -378,7 +379,7 @@ class Scheme(object):
def scheme_to_etree(self, data_format:str = "literal",
pickle_fallback: bool = False):
"""
Return an `xml.etree.ElementTree` representation of the `scheme.
Return an 'xml.etree.ElementTree' representation of the scheme.
"""
builder = TreeBuilder(element_factory=Element)
builder.start("scheme", {"version": "2.0",
......@@ -562,9 +563,10 @@ class SubScheme(Scheme, Node):
def load_from_json(json_data: dict):
"""
:param json_data: scheme description
:param json_data: scheme description.
:raise ValueError: if sink or source channel missing or if link id
missing or if sink or source node missing
missing or if sink or source node missing.
"""
nodes, links, sub_schemes, title, description = Scheme.load_scheme_info_from_json(json_data)
_id, _properties, _process_pt = Node.load_node_info_from_json(json_data)
......