Commit 94601c1a authored by payno's avatar payno
Browse files

[Workflow] add tool to write a python script defining the workflow.

parent 3cd1ff43
Pipeline #31163 passed with stage
in 1 minute and 3 seconds
......@@ -19,7 +19,7 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__authors__ = ["O. Svensson", 'H. Payno']
__license__ = "MIT"
__date__ = "28/05/2019"
......@@ -137,3 +137,145 @@ class ProcessableWorkflow(_BaseWorkflow):
if node.need_stop_join:
return True
return False
class Converter:
"""
Write a Workflow to a python file which can be executed later
"""
def __init__(self, workflow, output_file):
if not isinstance(workflow, _BaseWorkflow):
raise TypeError(
'workflow should be an instance of `_BaseWorkflow`')
self.workflow = workflow
self.output_file = output_file
def process(self):
self._write_import()
self._write_util_functions()
self._write_main_section()
self._write_processes_creation()
self._write_processing()
self._close_main_section()
def _write_main_section(self):
with open(self.output_file, 'a') as file_:
file_.write('\n\n')
file_.write('def main(input_data, channel):\n')
def _close_main_section(self):
with open(self.output_file, 'a') as file_:
file_.write('\n\n')
def _write_import(self):
with open(self.output_file, 'a') as file_:
file_.write('\n')
for node in self.workflow._representation.nodes:
class_name, mod_name = node.get_class_name_and_module_name()
file_.write('import {}\n'.format(mod_name))
# create logger
file_.write('{}\n{}\n'.format('import logging',
'_logger = logging.getLogger(__name__)'))
# add import ignore process
file_.write('{}\n'.format('from tomwer.core.process.utils import IgnoreProcess'))
def _write_processes_creation(self):
with open(self.output_file, 'a') as file_:
file_.write('\n')
for node in self.workflow._representation.nodes:
class_name, mod_name = node.get_class_name_and_module_name()
file_.write(' process_{} = {}.{}()\n'.format(node.id, mod_name, class_name))
if hasattr(node.class_instance, 'set_properties'):
# filter some orange properties
properties = node.properties
for param in ('controlAreaVisible', 'savedWidgetGeometry',
'__version__', 'libraryListSource'):
if param in properties:
del properties[param]
file_.write(' process_{}.set_properties({})\n'.format(node.id,
properties))
def _write_processing(self):
for _, link in self.workflow._representation.links.items():
print(link)
self._write_connection(sink_channel=link.sink_channel,
source_channel=link.source_channel,
sink_node_id=link.sink_node_id,
source_node_id=link.source_node_id)
with open(self.output_file, 'a') as file_:
file_.write('\n# start processing\n')
for node in self.workflow._representation.start_nodes():
self._write_starter(node)
def _write_starter(self, node):
with open(self.output_file, 'a') as file_:
file_.write(' trigger(process_{}, input_data, channel)'.format(node.id))
def _write_connection(self, sink_channel, source_channel, sink_node_id,
source_node_id):
with open(self.output_file, 'a') as file_:
file_.write(' connect(process_{}, "{}", process_{}, '
'"{}")\n'.format(source_node_id, source_channel,
sink_node_id, sink_channel))
def _write_util_functions(self):
with open(self.output_file, 'a') as file_:
file_.write(self._get_utils_functions())
def _get_utils_functions(self):
return """
connections = {}
def get_output_channel_name(class_inst, output):
assert hasattr(class_inst, 'outputs')
for output_ in class_inst.outputs:
output_name, output_type_, output_handler = output_[:3]
if isinstance(output, output_type_):
return output_name
return None
def get_handler(class_inst, channel_name):
'''Find the associate handler for channel `channel_name`'''
if hasattr(class_inst, 'inputs'):
for input_ in class_inst.inputs:
input_name, input_type, input_handler = input_[:3]
if input_name == channel_name:
return input_handler
return None
return None
def connect(source_process, source_channel, sink_process, sink_channel):
if (source_process, source_channel) not in connections:
connections[(source_process, source_channel)] = []
handler = get_handler(sink_process, channel_name=sink_channel)
if handler is None:
return TypeError('{} channel is not managed by {}'.format(sink_channel,
sink_process))
connections[source_process, source_channel].append((sink_process, handler))
'''Register for each cuple process / outputtype the process and handler
to launch when processes end'''
def trigger(process, input_data, channel_name):
_logger.info('{} has been trigger on channel {} with input {}'
''.format(process, channel_name, input_data))
if isinstance(process, IgnoreProcess):
output_data = input_data
output_channel_name = channel_name
else:
handler = get_handler(class_inst=process, channel_name=channel_name)
if handler is None:
raise ValueError('Fail to find handler for {} on channel {}'
''.format(process, channel_name))
assert hasattr(process, handler)
output_data = getattr(process, handler)(input_data)
output_channel_name = get_output_channel_name(process, output_data)
if (process, output_channel_name) in connections:
for downstream in connections[(process, output_channel_name)]:
sink_process, handler = downstream
trigger(sink_process, output_data, output_channel_name)
"""
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment