Commit b4c9f16e authored by payno's avatar payno

[fix] to work with the first version of representation and imstanciation

parent 7f2b5c38
from .scheme.scheme import Scheme
from .scheme.node import Node
from .scheme.link import Link
\ No newline at end of file
......@@ -18,9 +18,19 @@ class MomlParser(Parser):
"""
@staticmethod
def scheme_load(_file):
desc = MomlParser.parse_moml_stream(_file)
return Scheme.from_desc(desc)
def scheme_load(file_, load_handlers=True):
"""
:param file_: file containing the scheme definition
:param bool load_handlers: try to load the handlers of each node. Used
to make sure the process won't fail
:return: :class:`Scheme`
"""
desc = MomlParser.parse_moml_stream(file_)
scheme = Scheme.from_desc(desc)
if load_handlers is True:
scheme.load_handlers()
return scheme
@staticmethod
def parse_moml_stream(stream):
......@@ -39,12 +49,17 @@ class MomlParser(Parser):
links = {}
# convert nx (.omlm) liks and relations to orange links
for nx_link in nx_links:
node_id, input_output = nx_link.port.split('.', -1)
link = getLink(nx_link.relation)
if input_output.lower() in ('output', 'other', 'true'):
link["source_node_id"] = node_id
# TODO: for now some port / link type are not managed
if nx_link.port in ('In', 'Out', 'No mesh defined'):
logger.warning(nx_link.port + ' not managed yet')
continue
else:
link["sink_node_id"] = node_id
node_id, input_output = nx_link.port.split('.', -1)
if input_output.lower() in ('output', 'other', 'true'):
link["source_node_id"] = node_id
else:
link["sink_node_id"] = node_id
for relation in nx_relations:
assert relation.id in links
......@@ -54,6 +69,10 @@ class MomlParser(Parser):
orangeLinks = []
for linkid, link in links.items():
# TODO: this condition is due from the case that some link are
# not managed yet.
if 'source_node_id' not in link or 'sink_node_id' not in link:
continue
l = _link(id=link['id'],
source_node_id=link["source_node_id"],
sink_node_id=link["sink_node_id"],
......
......@@ -48,10 +48,6 @@ def get_next_node_free_id():
return _id
_callback_info = namedtuple("_callback_info",
["callback", "handler", "need_instanciation"])
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
......@@ -73,12 +69,12 @@ def trace_unhandled_exceptions(func):
class Node(object):
"""
Node in the `.Scheme`. Will be associated to a tomwer process.
Node in the `.Scheme`. Will be associated to a core process.
:param callback: pointer to a class or a function or str defining the
callback. If the callback is a class then the handler
should be defined or the class should have a default
'process' function that will be called by default.
:param processing_pt: pointer to a class or a function or str defining the
callback. If the callback is a class then the handler
should be defined or the class should have a default
'process' function that will be called by default.
:param int id: unique id of the node.
:param dict properties: properties of the node
:param str luigi_task: luigi task associate to this node
......@@ -87,7 +83,7 @@ class Node(object):
need_stop_join = False
"""flag to stop the node only when receive the 'stop' signal"""
def __init__(self, callback, id=None, properties=None,
def __init__(self, processing_pt, id=None, properties=None,
error_handler=None):
self.id = id or get_next_node_free_id()
"""int of the node id"""
......@@ -99,7 +95,7 @@ class Node(object):
"""Set of downstream nodes"""
self.__process_instance = None
""""""
self._process_pt = callback
self._process_pt = processing_pt
"""process instance"""
self._handlers = {}
"""handlers with link name as key and callback as value.
......@@ -112,14 +108,8 @@ class Node(object):
return self._handlers
@property
def _process_pt(self):
return self.__callback
@_process_pt.setter
def _process_pt(self, callback):
need_instanciation = type(callback) is str or inspect.isclass(callback)
self.__callback = _callback_info(callback=callback, handler=None,
need_instanciation=need_instanciation)
def process_pt(self):
return self._process_pt
def isfinal(self):
return len(self.downstream_nodes) is 0
......@@ -145,14 +135,16 @@ class Node(object):
module_name = '.'.join(sname)
if module_name.endswith('.py'):
# warning: in this case the file should not have any relative
# import
module = SourceFileLoader(module_name,
module_name).load_module()
else:
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
self.__process_instance = class_()
class_or_fct = getattr(module, class_name)
if inspect.isclass(class_or_fct):
self.__process_instance = class_or_fct()
else:
self.__process_instance = class_or_fct
if callable(self.__process_instance):
self._handlers[None] = self.__process_instance
# manage the case where a class has several input handler
......@@ -165,8 +157,9 @@ class Node(object):
raise ValueError('Fail to init handlers, none defined for ' + str(self._process_pt))
@staticmethod
def exec_process(process_pt, properties, input_name, input_data):
node = Node(callback=process_pt, properties=properties)
def execute(process_pt, properties, input_name, input_data):
node = Node(processing_pt=process_pt, properties=properties)
node.load_handlers()
if input_name in node.handlers:
out = node.handlers[input_name](input_data)
elif None in node.handlers:
......
......@@ -20,14 +20,23 @@ class OwsParser(Parser):
Parser managing the .ows (orange) files
"""
@staticmethod
def scheme_load(_file):
desc = OwsParser.parse_ows_stream(_file)
return Scheme.from_desc(desc)
def scheme_load(file_, load_handlers=True):
"""
:param file_: file containing the scheme definition
:param bool load_handlers: try to load the handlers of each node. Used
to make sure the process won't fail
:return: :class:`Scheme`
"""
desc = OwsParser.parse_ows_stream(file_)
scheme = Scheme.from_desc(desc)
if load_handlers is True:
scheme.load_handlers()
return scheme
@staticmethod
def parse_ows_etree_v_2_0(tree):
aliases = Parser.get_aliases()
print('aliases are: ', aliases)
scheme = tree.getroot()
nodes, links, annotations = [], [], []
......@@ -47,7 +56,7 @@ class OwsParser(Parser):
node_id = node.get("id")
qualified_name = node.get("qualified_name")
if qualified_name in aliases:
logger.info('replace', qualified_name, 'by', aliases[qualified_name])
logger.info('replace' + str(qualified_name) + 'by' + str(aliases[qualified_name]))
qualified_name = aliases[qualified_name]
node = _node(
......
......@@ -36,10 +36,12 @@ _logger = logging.getLogger(__name__)
class Parser(object):
@staticmethod
def scheme_load(file_):
def scheme_load(file_, load_handlers=True):
"""
:param file_: file containig the scheme definition
:param file_: file containing the scheme definition
:param bool load_handlers: try to load the handlers of each node. Used
to make sure the process won't fail
:return: :class:`Scheme`
"""
raise NotImplementedError()
......@@ -66,6 +68,7 @@ class Parser(object):
raise TypeError('aliases should be an instance of dict')
else:
aliases.update(new_aliases)
return aliases
__main__ = scheme_load
......
......@@ -39,7 +39,6 @@ import logging
from .node import Node
from ast import literal_eval
import importlib
from ..workflow import ActorFactory, StartActor, StopActor, JoinUntilStopSignal
_logger = logging.getLogger(__name__)
......@@ -69,39 +68,7 @@ class Scheme(object):
if links is not None:
self._update_nodes_from_links()
self._actor_factory = {}
for node in self.nodes:
self._actor_factory[node] = ActorFactory(node)
# deal with connect
for node in self.nodes:
actor_factory = self._actor_factory[node]
for downstream_node in node.downstream_nodes:
downstream_actor_factory = self._actor_factory[downstream_node]
actor_factory.connect(downstream_actor_factory)
# add start actor
self._start_actor = StartActor()
for node in self.start_nodes():
actor_factory = self._actor_factory[node]
self._start_actor.connect(actor_factory)
def connect_finals_nodes(actor):
# add end actor
for node in self.finalsNodes():
actor_factory = self._actor_factory[node]
actor_factory.connect(actor)
self._end_actor = StopActor()
if self.has_final_join():
self._join_actor = JoinUntilStopSignal('stop join')
connect_finals_nodes(self._join_actor)
self._join_actor.connect(self._end_actor)
else:
connect_finals_nodes(self._end_actor)
def finalsNodes(self):
def final_nodes(self):
"""
:return: list of final nodes (with no output) and which hasn't any
......@@ -249,8 +216,7 @@ class Scheme(object):
nodes_dict = {}
for node_d in desc.nodes:
callback = _get_callback(node_d)
node = Node(id=node_d.id, callback=callback)
node = Node(id=node_d.id, processing_pt=node_d.qualified_name)
nodes.append(node)
nodes_dict[node.id] = node
data = node_d.data
......@@ -286,22 +252,6 @@ def contains_control_nodes(nodes_list):
return False
def _get_callback(node):
name = node.qualified_name
sname = name.rsplit('.')
assert (len(sname) > 1)
class_name = sname[-1]
del sname[-1]
module_name = '.'.join(sname)
m = importlib.import_module(module_name)
class_ = getattr(m, class_name)
if hasattr(class_, 'process_function'):
return class_.process_function
else:
raise ValueError('No process function defined in the class core')
def loads(string, format):
if format == "literal":
return literal_eval(string)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment