Commit c83fff2a authored by payno's avatar payno

first commit: add description and first raw implementation

parents
# Default ignored files
/workspace.xml
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/scheme.iml" filepath="$PROJECT_DIR$/.idea/scheme.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
\ No newline at end of file
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H.Payno"]
__license__ = "MIT"
__date__ = "29/05/2017"
global next_link_free_id
next_link_free_id = 0
def get_next_link_free_id():
global next_link_free_id
_id = next_link_free_id
next_link_free_id += 1
return _id
class Link(object):
"""
:param `.Node` source_node:
:param `.Node` sink_node:
:param str source_channel:
:param str sink_channel:
"""
def __init__(self, source_node, sink_node, source_channel, sink_channel,
id=None):
self.id = id or get_next_link_free_id()
if isinstance(source_node, int):
self.source_node_id = source_node
else:
self.source_node_id = source_node.id
if isinstance(sink_node, int):
self.sink_node_id = sink_node
else:
self.sink_node_id = sink_node.id
self.source_channel = source_channel
self.sink_channel = sink_channel
__authors__ = ["Bioinformatics Laboratory, University of Ljubljana", "H.Payno"]
__license__ = "[GNU GPL v3+]: https://www.gnu.org/licenses/gpl-3.0.en.html"
__date__ = "29/05/2017"
from xml.etree.ElementTree import parse
from .parser import _scheme, _node, _link, _nxNodeProperty, _nxRelation, _nxLink, _nxNode
import logging
from .scheme import Node, Scheme
from .parser import Parser
logger = logging.getLogger(__name__)
class MomlParser(Parser):
"""This is a dictionnary to convert an OrangeWidget into a core process.
"""
@staticmethod
def scheme_load(_file):
desc = MomlParser.parse_moml_stream(_file)
return Scheme.from_desc(desc)
@staticmethod
def parse_moml_stream(stream):
doc = parse(stream)
scheme = MomlParser.parse_moml_etree(doc)
return scheme
@staticmethod
def parse_moml_etree(tree):
def convertToOrangeLinks(nx_links, nx_relations):
def getLink(_id):
if _id not in links:
links[_id] = {"id": _id}
return links[_id]
links = {}
# convert nx (.omlm) liks and relations to orange links
for nx_link in nx_links:
node_id, input_output = nx_link.port.split('.', -1)
link = getLink(nx_link.relation)
if input_output.lower() in ('output', 'other', 'true'):
link["source_node_id"] = node_id
else:
link["sink_node_id"] = node_id
for relation in nx_relations:
assert relation.id in links
links[relation.id]["source_channel"] = relation.class_
links[relation.id]["sink_channel"] = relation.class_
links[relation.id]["properties"] = relation.properties
orangeLinks = []
for linkid, link in links.items():
l = _link(id=link['id'],
source_node_id=link["source_node_id"],
sink_node_id=link["sink_node_id"],
source_channel=link["source_channel"],
sink_channel=link["sink_channel"],
enabled=True)
orangeLinks.append(l)
return orangeLinks
nodes, nx_links, nx_relations = [], [], []
# Collect all nodes
for node in tree.findall(".//entity"):
node_id = node.get('name')
node_class = node.get('class')
node_properties = []
for property in node.findall("property"):
_property = _nxNodeProperty(name=property.get("name"),
class_=property.get("class"),
value=property.get("value"))
node_properties.append(_property)
nodes.append(_nxNode(id=node_id,
class_=node_class,
properties=node_properties,
data=None,
qualified_name=node_class))
# collect all nx links
for link in tree.findall("link"):
_my_link = _nxLink(port=link.get("port"),
relation=link.get("relation"))
nx_links.append(_my_link)
# collect all nx relations
for relation in tree.findall("relation"):
relation_properties = []
for property in relation.findall("property"):
_property = _nxNodeProperty(name=property.get("name"),
class_=property.get("class"),
value=property.get("value"))
relation_properties.append(_property)
_relation = _nxRelation(id=relation.get("name"),
class_=relation.get("class"),
properties=relation_properties)
nx_relations.append(_relation)
links = convertToOrangeLinks(nx_links=nx_links, nx_relations=nx_relations)
return _scheme(
version="nx 0.1",
title=("undefined"),
description=None,
nodes=nodes,
links=links,
annotations=None
)
\ No newline at end of file
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H.Payno"]
__license__ = "MIT"
__date__ = "29/05/2017"
import functools
import logging
import traceback
from collections import namedtuple
import inspect
_logger = logging.getLogger(__file__)
global next_node_free_idF
next_node_free_id = 0
def get_next_node_free_id():
global next_node_free_id
_id = next_node_free_id
next_node_free_id += 1
return _id
_callback_info = namedtuple("_callback_info",
["callback", "handler", "need_instanciation"])
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
outData = func(*args, **kwargs)
except Exception as e:
_logger.exception(e)
errorMessage = '{0}'.format(e)
traceBack = traceback.format_exc()
return WorkflowException(
msg=errorMessage,
traceBack=traceBack,
data=args[1]
)
return outData
return wrapped_func
class Node(object):
"""
Node in the `.Scheme`. Will be associated to a tomwer process.
:param callback: pointer to a class or a function or str defining the
callback. If the callback is a class then the handler
should be defined or the class should have a default
'process' function that will be called by default.
:param int id: unique id of the node.
:param dict properties: properties of the node
:param str luigi_task: luigi task associate to this node
"""
need_stop_join = False
"""flag to stop the node only when receive the 'stop' signal"""
def __init__(self, callback, id=None, properties=None,
error_handler=None):
self.id = id or get_next_node_free_id()
"""int of the node id"""
self.properties = properties or {}
"""dict of the node properties"""
self.upstream_nodes = set()
"""Set of upstream nodes"""
self.downstream_nodes = set()
"""Set of downstream nodes"""
self.__process_instance = None
""""""
self.callback = callback
"""process instance"""
self._error_handler = error_handler
self.outData = None
@property
def callback(self):
return self.__callback
@callback.setter
def callback(self, callback):
need_instanciation = type(callback) is str or inspect.isclass(callback)
self.__callback = _callback_info(callback=callback, handler=None,
need_instanciation=need_instanciation)
def isfinal(self):
return len(self.downstream_nodes) is 0
def isstart(self):
return len(self.upstream_nodes) is 0
class WorkflowException(Exception):
def __init__(self, traceBack="", data=None, msg=None):
if data is None:
data = {}
super(WorkflowException, self).__init__(msg)
self.errorMessage = msg
self.data = data
self.traceBack = traceBack
__authors__ = ["Bioinformatics Laboratory, University of Ljubljana", "H.Payno"]
__license__ = "[GNU GPL v3+]: https://www.gnu.org/licenses/gpl-3.0.en.html"
__date__ = "29/05/2017"
from xml.etree.ElementTree import parse
from .parser import _scheme, _node, _link
import ast
import logging
from .scheme import Scheme
from .parser import Parser
logger = logging.getLogger(__name__)
class OwsParser(Parser):
"""
Parser managing the .ows (orange) files
"""
@staticmethod
def scheme_load(_file):
desc = OwsParser.parse_ows_stream(_file)
return Scheme.from_desc(desc)
@staticmethod
def parse_ows_etree_v_2_0(tree):
scheme = tree.getroot()
nodes, links, annotations = [], [], []
# First collect all properties
properties = {}
for property in tree.findall("node_properties/properties"):
node_id = property.get("node_id")
format = property.get("format")
if "data" in property.attrib:
data = property.get("data")
else:
data = property.text
properties[node_id] = _data(format, data)
# Collect all nodes
for node in tree.findall("nodes/node"):
node_id = node.get("id")
node = _node(
id=node_id,
title=node.get("title"),
name=node.get("name"),
position=tuple_eval(node.get("position", None)),
project_name=node.get("project_name", None),
qualified_name=node.get("qualified_name"),
version=node.get("version", ""),
data=properties.get(node_id, None)
)
nodes.append(node)
for link in tree.findall("links/link"):
params = _link(
id=link.get("id"),
source_node_id=link.get("source_node_id"),
sink_node_id=link.get("sink_node_id"),
source_channel=link.get("source_channel"),
sink_channel=link.get("sink_channel"),
enabled=link.get("enabled") == "true",
)
links.append(params)
for annot in tree.findall("annotations/*"):
if annot.tag == "text":
rect = tuple_eval(annot.get("rect", "(0.0, 0.0, 20.0, 20.0)"))
font_family = annot.get("font-family", "").strip()
font_size = annot.get("font-size", "").strip()
font = {}
if font_family:
font["family"] = font_family
if font_size:
font["size"] = int(font_size)
annotation = _annotation(
id=annot.get("id"),
type="text",
params=_text_params(rect, annot.text or "", font),
)
elif annot.tag == "arrow":
start = tuple_eval(annot.get("start", "(0, 0)"))
end = tuple_eval(annot.get("end", "(0, 0)"))
color = annot.get("fill", "red")
annotation = _annotation(
id=annot.get("id"),
type="arrow",
params=_arrow_params((start, end), color)
)
annotations.append(annotation)
return _scheme(
version=scheme.get("version"),
title=scheme.get("title", ""),
description=scheme.get("description"),
nodes=nodes,
links=links,
annotations=annotations
)
@staticmethod
def parse_ows_stream(stream):
doc = parse(stream)
scheme_el = doc.getroot()
version = scheme_el.get("version", None)
if version is None:
# Fallback: check for "widgets" tag.
if scheme_el.find("widgets") is not None:
version = "1.0"
else:
log.warning("<scheme> tag does not have a 'version' attribute")
version = "2.0"
if version == "1.0":
raise ValueError('old .ows version are not managed')
elif version == "2.0":
return OwsParser.parse_ows_etree_v_2_0(doc)
else:
raise ValueError('unrecognize scheme definition version')
# ---- TAKE back from Orange3 ---------
def tuple_eval(source):
"""
Evaluate a python tuple literal `source` where the elements are
constrained to be int, float or string. Raise ValueError if not
a tuple literal.
>>> tuple_eval("(1, 2, "3")")
(1, 2, '3')
"""
if source is None:
return None
node = ast.parse(source, "<source>", mode="eval")
if not isinstance(node.body, ast.Tuple):
raise ValueError("%r is not a tuple literal" % source)
if not all(isinstance(el, (ast.Str, ast.Num)) or
# allow signed number literals in Python3 (i.e. -1|+1|-1.0)
(isinstance(el, ast.UnaryOp) and
isinstance(el.op, (ast.UAdd, ast.USub)) and
isinstance(el.operand, ast.Num))
for el in node.body.elts):
raise ValueError("Can only contain numbers or strings")
return literal_eval(source)
def resolve_replaced(scheme_desc, registry):
widgets = registry.widgets()
nodes_by_id = {} # type: Dict[str, _node]
replacements = {}
replacements_channels = {} # type: Dict[str, Tuple[dict, dict]]
# collect all the replacement mappings
for desc in widgets: # type: WidgetDescription
if desc.replaces:
for repl_qname in desc.replaces:
replacements[repl_qname] = desc.qualified_name
input_repl = {}
for idesc in desc.inputs or []: # type: InputSignal
for repl_qname in idesc.replaces or []: # type: str
input_repl[repl_qname] = idesc.name
output_repl = {}
for odesc in desc.outputs: # type: OutputSignal
for repl_qname in odesc.replaces or []: # type: str
output_repl[repl_qname] = odesc.name
replacements_channels[desc.qualified_name] = (input_repl, output_repl)
# replace the nodes
nodes = scheme_desc.nodes
for i, node in list(enumerate(nodes)):
if not registry.has_widget(node.qualified_name) and \
node.qualified_name in replacements:
qname = replacements[node.qualified_name]
desc = registry.widget(qname)
nodes[i] = node._replace(qualified_name=desc.qualified_name,
project_name=desc.project_name)
nodes_by_id[node.id] = nodes[i]
# replace links
links = scheme_desc.links
for i, link in list(enumerate(links)): # type: _link
nsource = nodes_by_id[link.source_node_id]
nsink = nodes_by_id[link.sink_node_id]
_, source_rep = replacements_channels.get(
nsource.qualified_name, ({}, {}))
sink_rep, _ = replacements_channels.get(
nsink.qualified_name, ({}, {}))
if link.source_channel in source_rep:
link = link._replace(
source_channel=source_rep[link.source_channel])
if link.sink_channel in sink_rep:
link = link._replace(
sink_channel=sink_rep[link.sink_channel])
links[i] = link
return scheme_desc._replace(nodes=nodes, links=links)
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H.Payno"]
__license__ = "MIT"
__date__ = "19/08/2019"
from collections import namedtuple
class Parser(object):
@staticmethod
def scheme_load(file_):
"""
:param file_: file containig the scheme definition
:return: :class:`Scheme`
"""
raise NotImplementedError()
__main__ = scheme_load
_scheme = namedtuple(
"_scheme",
["title", "version", "description", "nodes", "links", "annotations"])
_node = namedtuple(
"_node",
["id", "title", "name", "position", "project_name", "qualified_name",
"version", "data"])
_data = namedtuple(
"_data",
["format", "data"])
_link = namedtuple(
"_link",
["id", "source_node_id", "sink_node_id", "source_channel", "sink_channel",
"enabled"])
_annotation = namedtuple(
"_annotation",
["id", "type", "params"])
_text_params = namedtuple(
"_text_params",
["geometry", "text", "font"])