Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
workflow
representation
Compare Revisions
e99104e134c9932d2be81c3870f5c923ba62cccc...4513c7e79561e6e687badce4f98fb9dd8a26ac04
Commits (8)
[node] fix load_handlers. Load directly the function is contains specific handlers
· 5bb51278
payno
authored
Apr 07, 2020
5bb51278
[doc] remove some warning during doc generation
· e6cbb1de
payno
authored
Apr 07, 2020
e6cbb1de
PEP8 outData -> out_data
· ac97541e
payno
authored
Apr 07, 2020
ac97541e
[node] fix typo in logger __name__ -> __file__
· a25e4d2b
payno
authored
Jul 31, 2020
a25e4d2b
[parser] fix typo, the add on are named ppfaddon instead of pypushflowaddon
· d378f770
payno
authored
Jul 31, 2020
d378f770
[node] reword node to access and use channels names
· 7b582358
payno
authored
Jul 31, 2020
7b582358
Merge branch 'master' of gitlab.esrf.fr:workflow/representation
· 06c5fb30
payno
authored
Jul 31, 2020
06c5fb30
[node] call set_properties if exists
· 4513c7e7
payno
authored
Jul 31, 2020
4513c7e7
Hide whitespace changes
Inline
Side-by-side
scheme/node.py
View file @
4513c7e7
...
...
@@ -35,7 +35,7 @@ import importlib
from
typing
import
Union
from
importlib.machinery
import
SourceFileLoader
_logger
=
logging
.
getLogger
(
__
fil
e__
)
_logger
=
logging
.
getLogger
(
__
nam
e__
)
global
next_node_free_idF
next_node_free_id
=
0
...
...
@@ -52,7 +52,7 @@ def trace_unhandled_exceptions(func):
@
functools
.
wraps
(
func
)
def
wrapped_func
(
*
args
,
**
kwargs
):
try
:
out
D
ata
=
func
(
*
args
,
**
kwargs
)
out
_d
ata
=
func
(
*
args
,
**
kwargs
)
except
Exception
as
e
:
_logger
.
exception
(
e
)
errorMessage
=
'{0}'
.
format
(
e
)
...
...
@@ -62,7 +62,7 @@ def trace_unhandled_exceptions(func):
traceBack
=
traceBack
,
data
=
args
[
1
]
)
return
out
D
ata
return
out
_d
ata
return
wrapped_func
...
...
@@ -89,7 +89,7 @@ class Node(object):
_JSON_ERROR_HANDLER
=
'error_handler'
def
__init__
(
self
,
processing_pt
,
id
:
Union
[
None
,
int
]
=
None
,
properties
:
Union
[
None
,
dict
]
=
None
,
properties
:
Union
[
None
,
dict
]
=
None
,
error_handler
=
None
):
self
.
id
=
get_next_node_free_id
()
if
id
is
None
else
id
"""int of the node id"""
...
...
@@ -106,8 +106,24 @@ class Node(object):
self
.
_handlers
=
{}
"""handlers with link name as key and callback as value.
The default handler is store under the 'None' value"""
self
.
_input_type_to_name
=
{}
"""link input type to a signal name"""
self
.
_output_type_to_name
=
{}
"""link output type to a signal name"""
self
.
_error_handler
=
error_handler
self
.
outData
=
None
self
.
out_data
=
None
def
get_input_channel_name
(
self
,
data_object
):
for
dtype
,
channel_name
in
self
.
_input_type_to_name
.
items
():
if
isinstance
(
data_object
,
dtype
):
return
channel_name
return
None
def
get_output_channel_name
(
self
,
data_object
):
for
dtype
,
channel_name
in
self
.
_output_type_to_name
.
items
():
if
isinstance
(
data_object
,
dtype
):
return
channel_name
return
None
@
property
def
handlers
(
self
)
->
dict
:
...
...
@@ -117,6 +133,10 @@ class Node(object):
def
process_pt
(
self
):
return
self
.
_process_pt
@
property
def
class_instance
(
self
):
return
self
.
__process_instance
def
isfinal
(
self
)
->
bool
:
"""
...
...
@@ -144,6 +164,8 @@ class Node(object):
definition
"""
self
.
_handlers
.
clear
()
self
.
_input_type_to_name
.
clear
()
self
.
_output_type_to_name
.
clear
()
assert
self
.
_process_pt
is
not
None
if
callable
(
self
.
_process_pt
):
self
.
__process_instance
=
self
.
_process_pt
...
...
@@ -154,7 +176,7 @@ class Node(object):
else
:
sname
=
self
.
_process_pt
.
rsplit
(
'.'
)
if
not
(
len
(
sname
)
>
1
):
raise
ValueError
(
'In
valid name'
)
raise
ValueError
(
self
.
_process_pt
+
' is not recognized as a
valid name'
)
class_name
=
sname
[
-
1
]
del
sname
[
-
1
]
module_name
=
'.'
.
join
(
sname
)
...
...
@@ -167,6 +189,7 @@ class Node(object):
class_or_fct
=
getattr
(
module
,
class_name
)
if
inspect
.
isclass
(
class_or_fct
):
_logger
.
debug
(
'instanciate '
+
str
(
class_or_fct
))
self
.
__process_instance
=
class_or_fct
()
else
:
self
.
__process_instance
=
class_or_fct
...
...
@@ -175,15 +198,34 @@ class Node(object):
# manage the case where a class has several input handler
if
hasattr
(
self
.
__process_instance
,
'inputs'
):
for
input_
in
self
.
__process_instance
.
inputs
:
input_name
,
input_type
,
input_handler
=
input_
input_name
,
input_type
,
input_handler
=
input_
[:
3
]
_logger
.
debug
(
'[node: %s] add input_name: %s, '
'input_type: %s, input_handler: %s'
%
(
self
.
_process_pt
,
input_name
,
input_type
,
input_handler
))
if
str
(
input_type
)
in
self
.
_input_type_to_name
:
raise
ValueError
(
'Several input name found for the '
'same input type. This case is not managed.'
)
self
.
_input_type_to_name
[
input_type
]
=
input_name
self
.
_handlers
[
input_name
]
=
input_handler
# self._handlers[input_name] = getattr(self.__process_instance, input_handler)
if
hasattr
(
self
.
__process_instance
,
'outputs'
):
for
output_
in
self
.
__process_instance
.
outputs
:
output_name
,
output_type
,
output_handler
=
output_
[:
3
]
_logger
.
debug
(
'[node: %s] add output_name: %s, '
'output_type: %s, output_handler: %s'
%
(
self
.
_process_pt
,
input_name
,
input_type
,
input_handler
))
if
output_type
in
self
.
_output_type_to_name
:
raise
ValueError
(
'Several output name found for the '
'same output type. This case is not managed.'
)
self
.
_output_type_to_name
[
output_type
]
=
output_name
if
len
(
self
.
_handlers
)
==
0
:
raise
ValueError
(
'Fail to init handlers, none defined for '
+
str
(
self
.
_process_pt
))
@
staticmethod
def
execute
(
process_pt
,
properties
:
dict
,
input_name
:
str
,
input_data
:
object
):
input_data
:
object
)
->
tuple
:
"""
Create an instance of a node with 'process_pt' and execute it with the
given input_name, properties and input_data.
...
...
@@ -194,20 +236,36 @@ class Node(object):
:param str input_name: name of the input data
:param input_data: input data :warning: Should be serializable
:return: output data. :warning: Should be serializable
:return: (output data type, output data)
:warning: Should be serializable
"""
node
=
Node
(
processing_pt
=
process_pt
,
properties
=
properties
)
node
.
load_handlers
()
logging
.
info
(
'start execution of {0} with {1} through channel {2}'
''
.
format
(
str
(
process_pt
),
input_data
,
input_name
))
if
hasattr
(
node
.
__process_instance
,
'set_properties'
):
node
.
__process_instance
.
set_properties
(
properties
)
else
:
raise
ValueError
(
'no function set properties found'
)
if
input_name
in
node
.
handlers
:
out
=
node
.
handlers
[
input_name
](
input_data
)
out
=
getattr
(
node
.
__process_instance
,
node
.
handlers
[
input_name
]
)
(
input_data
)
elif
None
in
node
.
handlers
:
out
=
node
.
handlers
[
None
](
input_data
)
out
=
getattr
(
node
.
__process_instance
,
node
.
handlers
[
None
])(
input_data
)
else
:
err
=
'"{0}" channel is not managed by {1}'
.
format
(
input_name
,
node
.
_process_pt
)
raise
KeyError
(
err
)
# retrieve output channel
if
out
is
None
:
output_channel
=
None
else
:
raise
KeyError
(
input_name
,
'is not managed by'
,
str
(
node
.
_process_pt
))
output_channel
=
node
.
get_output_channel_name
(
out
)
if
hasattr
(
out
,
'to_dict'
):
return
out
.
to_dict
()
return
output_channel
,
out
.
to_dict
()
else
:
return
out
return
out
put_channel
,
out
def
to_json
(
self
)
->
dict
:
"""
...
...
@@ -301,4 +359,4 @@ class ErrorHandler(Node):
error_handler
=
None
)
def
_get_error_handler_json
(
self
):
return
{}
\ No newline at end of file
return
{}
scheme/parser.py
View file @
4513c7e7
...
...
@@ -56,14 +56,14 @@ class Parser(object):
"""
aliases
=
{}
try
:
import
p
ypushflow
addon
import
p
pf
addon
except
ImportError
:
return
aliases
else
:
import
pkgutil
for
importer
,
modname
,
ispkg
in
pkgutil
.
iter_modules
(
p
ypushflow
addon
.
__path__
):
for
importer
,
modname
,
ispkg
in
pkgutil
.
iter_modules
(
p
pf
addon
.
__path__
):
try
:
mod_name
=
'.'
.
join
((
p
ypushflow
addon
.
__name__
,
modname
,
'aliases'
))
mod_name
=
'.'
.
join
((
p
pf
addon
.
__name__
,
modname
,
'aliases'
))
module
=
importlib
.
import_module
(
mod_name
)
except
ImportError
:
_logger
.
warning
(
modname
+
' does not fit the add-on design, skip it'
)
...
...
scheme/scheme.py
View file @
4513c7e7
...
...
@@ -154,7 +154,7 @@ class Scheme(object):
"""
:return: list of nodes starting the workflow. Those does not require
any input_data
any input_data
.
:rtype: list
"""
res
=
[]
...
...
@@ -302,9 +302,10 @@ class Scheme(object):
:param str json_file_path: path to the json file containing the scheme
description
:return: Scheme fitting the json description contains if the file.
If description is incomplete, return None
:rtype:Union[Scheme,None]
:rtype:
Union[Scheme,
None]
"""
try
:
with
open
(
json_file_path
,
'r'
)
as
json_file
:
...
...
@@ -378,7 +379,7 @@ class Scheme(object):
def
scheme_to_etree
(
self
,
data_format
:
str
=
"literal"
,
pickle_fallback
:
bool
=
False
):
"""
Return an
`
xml.etree.ElementTree
`
representation of the
`
scheme.
Return an
'
xml.etree.ElementTree
'
representation of the scheme.
"""
builder
=
TreeBuilder
(
element_factory
=
Element
)
builder
.
start
(
"scheme"
,
{
"version"
:
"2.0"
,
...
...
@@ -562,9 +563,10 @@ class SubScheme(Scheme, Node):
def
load_from_json
(
json_data
:
dict
):
"""
:param json_data: scheme description
:param json_data: scheme description.
:raise ValueError: if sink or source channel missing or if link id
missing or if sink or source node missing
missing or if sink or source node missing
.
"""
nodes
,
links
,
sub_schemes
,
title
,
description
=
Scheme
.
load_scheme_info_from_json
(
json_data
)
_id
,
_properties
,
_process_pt
=
Node
.
load_node_info_from_json
(
json_data
)
...
...