Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
workflow
representation
Commits
75addf31
Commit
75addf31
authored
Sep 11, 2019
by
payno
Browse files
Add a first json compatibility for Scheme, Node and Link
parent
5591a66f
Changes
3
Hide whitespace changes
Inline
Side-by-side
scheme/link.py
View file @
75addf31
...
...
@@ -27,6 +27,10 @@ __authors__ = ["H.Payno"]
__license__
=
"MIT"
__date__
=
"29/05/2017"
import
logging
_logger
=
logging
.
getLogger
(
__name__
)
# TODO: next lines should be removed
global
next_link_free_id
next_link_free_id
=
0
...
...
@@ -43,15 +47,23 @@ class Link(object):
Define a link between two node with an execution order. Sink_node will be
executed after source_node.
:param `.Node` source_node: upstream node
:param `.Node` sink_node: downstream node
:param source_node: upstream node
:type: Union[`.Node`,int]
:param sink_node: downstream node
:type: Union[`.Node`,int]
:param str source_channel: channel name used for connection
:param str sink_channel: channel name used for connection
"""
_JSON_SOURCE_CHANNEL
=
'source_channel'
_JSON_SINK_CHANNEL
=
'sink_channel'
_JSON_SOURCE_NODE_ID
=
'source_node_id'
_JSON_SINK_NODE_ID
=
'sink_node_id'
_JSON_LINK_ID
=
'link_id'
def
__init__
(
self
,
source_node
,
sink_node
,
source_channel
,
sink_channel
,
id
=
None
):
self
.
id
=
id
or
get_next_link_free_id
()
self
.
id
=
get_next_link_free_id
()
if
id
is
None
else
id
if
isinstance
(
source_node
,
int
):
self
.
source_node_id
=
source_node
else
:
...
...
@@ -64,3 +76,74 @@ class Link(object):
self
.
source_channel
=
source_channel
self
.
sink_channel
=
sink_channel
def
to_json
(
self
):
"""
:return: Link description to the json format
:rtype: dict
"""
return
{
self
.
_JSON_LINK_ID
:
self
.
id
,
self
.
_JSON_SOURCE_CHANNEL
:
self
.
source_channel
,
self
.
_JSON_SINK_CHANNEL
:
self
.
sink_channel
,
self
.
_JSON_SOURCE_NODE_ID
:
self
.
source_node_id
,
self
.
_JSON_SINK_NODE_ID
:
self
.
sink_node_id
,
}
@
staticmethod
def
from_json
(
json_data
):
"""
:param json_data: link description
:return: New Link created from the json description
:rtype: Link
:raise ValueError: if sink or source channel missing or if link id
missing or if sink or source node missing
"""
# load link id
if
Link
.
_JSON_LINK_ID
not
in
json_data
:
_id
=
None
_logger
.
error
(
'Missing link id information'
)
else
:
_id
=
json_data
[
Link
.
_JSON_LINK_ID
]
# load sink channel
if
Link
.
_JSON_SINK_CHANNEL
not
in
json_data
:
sink_channel
=
None
_logger
.
error
(
'Missing sink channel information'
)
else
:
sink_channel
=
json_data
[
Link
.
_JSON_SINK_CHANNEL
]
# load source channel
if
Link
.
_JSON_SOURCE_CHANNEL
not
in
json_data
:
source_channel
=
None
_logger
.
error
(
'Missing source channel information'
)
else
:
source_channel
=
json_data
[
Link
.
_JSON_SOURCE_CHANNEL
]
# load sink node id
if
Link
.
_JSON_SINK_NODE_ID
not
in
json_data
:
sink_node_id
=
None
_logger
.
error
(
'Missing source node id information'
)
else
:
sink_node_id
=
json_data
[
Link
.
_JSON_SINK_NODE_ID
]
# load source node id
if
Link
.
_JSON_SOURCE_NODE_ID
not
in
json_data
:
source_node_id
=
None
_logger
.
error
(
'Missing source node id information'
)
else
:
source_node_id
=
json_data
[
Link
.
_JSON_SOURCE_NODE_ID
]
if
(
sink_channel
is
None
or
source_channel
is
None
or
_id
is
None
or
source_node_id
is
None
or
sink_node_id
is
None
):
raise
ValueError
(
'Missing core information for creating a Link'
)
else
:
return
Link
(
id
=
_id
,
sink_channel
=
sink_channel
,
source_channel
=
source_channel
,
source_node
=
source_node_id
,
sink_node
=
sink_node_id
)
def
__str__
(
self
):
return
"node %s: source:(%s, %s), sink:(%s, %s)"
%
(
self
.
id
,
self
.
source_node_id
,
self
.
source_channel
,
self
.
sink_node_id
,
self
.
sink_channel
)
\ No newline at end of file
scheme/node.py
View file @
75addf31
...
...
@@ -30,7 +30,7 @@ __date__ = "29/05/2017"
import
functools
import
logging
import
traceback
from
collections
import
namedtuple
import
json
import
inspect
import
importlib
from
importlib.machinery
import
SourceFileLoader
...
...
@@ -83,9 +83,13 @@ class Node(object):
need_stop_join
=
False
"""flag to stop the node only when receive the 'stop' signal"""
_JSON_PROCESS_PT
=
'process_pt'
_JSON_ID
=
'id'
_JSON_PROPERTIES
=
'properties'
def
__init__
(
self
,
processing_pt
,
id
=
None
,
properties
=
None
,
error_handler
=
None
):
self
.
id
=
id
or
get_next_node_free_id
()
self
.
id
=
get_next_node_free_id
()
if
id
is
None
else
id
"""int of the node id"""
self
.
properties
=
properties
or
{}
"""dict of the node properties"""
...
...
@@ -199,6 +203,58 @@ class Node(object):
else
:
return
out
def
to_json
(
self
):
"""
:return: json description of the node
:rtype: dict
"""
return
{
self
.
_JSON_PROCESS_PT
:
self
.
process_pt
,
self
.
_JSON_ID
:
self
.
id
,
self
.
_JSON_PROPERTIES
:
self
.
properties
,
}
@
staticmethod
def
from_json
(
json_data
):
"""
:param json_data: node description
:return: New node created from the json description
:rtype: Node
:raise ValueError: if properties or id or processing_pt missing
"""
# load properties
if
Node
.
_JSON_PROPERTIES
not
in
json_data
:
_logger
.
error
(
'Missing node properties in json description'
)
_properties
=
None
else
:
_properties
=
json_data
[
Node
.
_JSON_PROPERTIES
]
assert
type
(
_properties
)
is
dict
# load id
if
Node
.
_JSON_ID
not
in
json_data
:
_logger
.
error
(
'Missing node id in json description'
)
_id
=
None
else
:
_id
=
json_data
[
Node
.
_JSON_ID
]
assert
type
(
_id
)
is
int
# load process_pt
if
Node
.
_JSON_PROCESS_PT
not
in
json_data
:
_logger
.
error
(
'Missing node process_pt in json description'
)
_process_pt
=
None
else
:
_process_pt
=
json_data
[
Node
.
_JSON_PROCESS_PT
]
if
_properties
is
None
or
_id
is
None
or
_process_pt
is
None
:
raise
ValueError
(
'Unable to create Node from json, core information '
'are missing'
)
else
:
return
Node
(
id
=
_id
,
properties
=
_properties
,
processing_pt
=
_process_pt
)
def
__str__
(
self
):
return
"node %s - %s"
%
(
self
.
id
,
self
.
processing_pt
)
class
WorkflowException
(
Exception
):
def
__init__
(
self
,
traceBack
=
""
,
data
=
None
,
msg
=
None
):
...
...
scheme/scheme.py
View file @
75addf31
...
...
@@ -37,8 +37,8 @@ import base64
import
pickle
import
logging
from
.node
import
Node
from
.link
import
Link
from
ast
import
literal_eval
import
importlib
_logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -50,9 +50,18 @@ class Scheme(object):
:param list nodes:
:param list links:
"""
def
__init__
(
self
,
nodes
=
None
,
links
=
None
):
self
.
title
=
''
self
.
description
=
''
_JSON_DESCRIPTION
=
'description'
_JSON_TITLE
=
'title'
_JSON_NODES
=
'nodes'
_JSON_LINKS
=
'links'
def
__init__
(
self
,
nodes
=
None
,
links
=
None
,
description
=
None
,
title
=
None
):
self
.
_reset
(
nodes
=
nodes
,
links
=
links
,
description
=
description
,
title
=
title
)
def
_reset
(
self
,
nodes
,
links
,
description
,
title
):
self
.
title
=
title
or
''
self
.
description
=
description
or
''
self
.
links
=
{}
"""keys are link ID, values are Link"""
if
links
is
not
None
:
...
...
@@ -64,7 +73,6 @@ class Scheme(object):
"""dict with node id as key and node as value"""
for
node
in
self
.
nodes
:
self
.
nodes_dict
[
node
.
id
]
=
node
if
links
is
not
None
:
self
.
_update_nodes_from_links
()
...
...
@@ -112,15 +120,172 @@ class Scheme(object):
"""
Save the scheme as an xml formated file to `stream`
:param output_file: name of the output file. For now only manage xml
files
:param output_file: name of the output file.
:type: str
"""
if
output_file
.
lower
.
endswith
(
'.json'
):
self
.
save_as_json
(
output_file
)
else
:
self
.
save_as_xml
(
output_file
)
def
save_as_xml
(
self
,
output_file
):
"""
save current scheme to a default xml format
:param str output_file: file path
"""
tree
=
self
.
scheme_to_etree
(
data_format
=
"literal"
)
indent
(
tree
.
getroot
(),
0
)
tree
.
write
(
output_file
)
def
save_as_json
(
self
,
output_file
):
"""
save current scheme to a default json format
:param str output_file: file path
"""
with
open
(
output_file
,
'w'
)
as
json_file
:
json
.
dump
(
self
.
to_json
(),
json_file
)
def
nodes_to_json
(
self
):
"""
:return: nodes to json compatible format
:rtype: list
"""
res
=
[]
for
node
in
self
.
nodes
:
res
.
append
(
node
.
to_json
())
return
res
def
nodes_from_json
(
self
,
json_data
):
"""
:param json_data: data containing the json definition
:return: list of Node defined by the json data
:rtype: list
"""
if
not
self
.
_JSON_NODES
in
json_data
:
raise
ValueError
(
'does not contain any Node description'
)
else
:
nodes_json_data
=
json_data
[
self
.
_JSON_NODES
]
nodes
=
[]
for
node_json_data
in
nodes_json_data
:
nodes
.
append
(
Node
.
from_json
(
node_json_data
))
return
nodes
def
links_to_json
(
self
):
"""
:return: links to json compatible format
:rtype: list
"""
res
=
[]
for
link
in
self
.
links
.
values
():
res
.
append
(
link
.
to_json
())
return
res
def
links_from_json
(
self
,
json_data
):
"""
:param json_data: data containing the json definition
:return: list of Link defined by the json data
:rtype: list
"""
if
not
self
.
_JSON_LINKS
in
json_data
:
raise
ValueError
(
'does not contain any Node description'
)
else
:
links_json_data
=
json_data
[
self
.
_JSON_LINKS
]
links
=
[]
for
link_json_data
in
links_json_data
:
links
.
append
(
Link
.
from_json
(
link_json_data
))
return
links
def
to_json
(
self
):
return
{
self
.
_JSON_DESCRIPTION
:
self
.
description
,
self
.
_JSON_TITLE
:
self
.
title
,
self
.
_JSON_NODES
:
self
.
nodes_to_json
(),
self
.
_JSON_LINKS
:
self
.
links_to_json
(),
}
@
staticmethod
def
from_json_file
(
json_file_path
):
scheme
=
Scheme
()
try
:
scheme
.
load_from_json_file
(
json_file_path
)
except
ValueError
as
e
:
_logger
.
error
(
e
)
return
None
else
:
return
scheme
def
load_from_json_file
(
self
,
json_file_path
):
"""
:param str json_file_path: path to the json file containing the scheme
description
:return: Scheme fitting the json description contains if the file.
If description is incomplete, return None
:rtype:Union[Scheme,None]
"""
try
:
with
open
(
json_file_path
,
'r'
)
as
json_file
:
json_data
=
json
.
load
(
json_file
)
except
IOError
as
e
:
_logger
.
error
(
'fail to read json file'
,
str
(
e
))
else
:
self
.
load_from_json
(
json_data
=
json_data
)
def
load_from_json
(
self
,
json_data
):
"""
:param json_data: scheme description
:raise ValueError: if sink or source channel missing or if link id
missing or if sink or source node missing
"""
# load title
if
self
.
_JSON_TITLE
not
in
json_data
:
_logger
.
warning
(
'no title found in the json'
)
title
=
None
else
:
title
=
json_data
[
self
.
_JSON_TITLE
]
# load description
if
self
.
_JSON_DESCRIPTION
not
in
json_data
:
_logger
.
warning
(
'no description found in the json'
)
description
=
None
else
:
description
=
json_data
[
self
.
_JSON_DESCRIPTION
]
# load links
if
self
.
_JSON_LINKS
not
in
json_data
:
_logger
.
error
(
'no link found in the json'
)
links
=
None
else
:
try
:
links
=
self
.
links_from_json
(
json_data
=
json_data
)
except
ValueError
as
e
:
_logger
.
error
(
e
)
links
=
None
# load nodes
if
self
.
_JSON_NODES
not
in
json_data
:
_logger
.
error
(
'no nodes found in the json'
)
nodes
=
None
else
:
try
:
nodes
=
self
.
nodes_from_json
(
json_data
=
json_data
)
except
ValueError
as
e
:
_logger
.
error
(
e
)
nodes
=
None
# create scheme if possible
if
nodes
is
None
or
links
is
None
:
raise
ValueError
(
'unable to load scheme from json description.'
'Information missing'
)
else
:
self
.
_reset
(
nodes
=
nodes
,
links
=
links
,
description
=
description
,
title
=
title
)
def
scheme_to_etree
(
self
,
data_format
=
"literal"
,
pickle_fallback
=
False
):
"""
Return an `xml.etree.ElementTree` representation of the `scheme.
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment