Commit 24aa590d authored by payno's avatar payno
Browse files

[app] rework exec application

- for now tests fails with larch but this come from larch (issue at the workfonw parametrization)
parent fb8d1b46
import argparse
import sys
from pypushflow.Workflow import ProcessableWorkflow
from pypushflow.representation.scheme.ows_parser import OwsParser
import logging
import signal
from pypushflow.representation.scheme.scheme import Scheme
from silx.io.url import DataUrl
from est.units import ur
from est.io.utils.information import InputInformation
from est.io.utils.information import SpecInputInformation
from ewoksorange.owsconvert import ows_to_ewoks
from .utils import get_unit
from .utils import get_url
from .utils import convert_spectra_dims
......@@ -20,86 +15,11 @@ try:
has_h5py = True
except:
has_h5py = False
from typing import Union
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger(__name__)
def _insert_input_in_scheme(scheme, input_information):
"""update 'starting' node properties to include the provided input"""
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and "_spec_file_setting" in node.properties:
if input_information.is_spec_input():
node.properties["_spec_file_setting"] = input_information.spec_info
if input_information.spectra_url is not None:
node.properties[
"_spectra_url_setting"
] = input_information.spectra_url.path()
if input_information.dimensions is not None:
node.properties["_dimensions_setting"] = input_information.dimensions
if input_information.channel_url is not None:
node.properties[
"_energy_url_setting"
] = input_information.channel_url.path()
if input_information.configuration_url is not None:
node.properties[
"_configuration_url_setting"
] = input_information.configuration_url.path()
def _insert_output_in_scheme(scheme, output_):
"""update 'starting' node properties to include the provided input"""
found_output = False
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and "_output_file_setting" in node.properties:
node.properties["_output_file_setting"] = output_
found_output = True
if not found_output:
_logger.warning(
"No node for processing output found. output "
"information provided will be ignored"
)
def exec_(
scheme: Scheme,
input_information: InputInformation,
output_: Union[str, None, dict] = None,
timeout: Union[int, None] = None,
):
if not input_information.is_valid():
raise ValueError("You cannot provide an input file and input urls")
_insert_input_in_scheme(input_information=input_information, scheme=scheme)
if output_ is not None:
_insert_output_in_scheme(scheme=scheme, output_=output_)
workflow = ProcessableWorkflow(scheme=scheme)
# add SIGINT capture
def signal_handler(sig, frame):
_logger.warning("stop workflow execution on user request")
workflow._end_actor.join(0)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
xas_obj = get_xas_obj(input_information=input_information)
workflow._start_actor.trigger(("data", xas_obj.to_dict()))
workflow._end_actor.join(timeout)
res = workflow._end_actor.out_data
title = scheme.title or "unknow"
_logger.info(
"workflow '{}' completed with {}".format(title, str(input_information))
)
return res
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
......@@ -229,8 +149,6 @@ def main(argv):
"instance of XASOutputOW",
)
options = parser.parse_args(argv[1:])
scheme = OwsParser.scheme_load(options.workflow_file, load_handlers=True)
input_information = InputInformation(
spectra_url=get_url(options.input_spectra),
channel_url=get_url(options.input_channel),
......@@ -249,7 +167,16 @@ def main(argv):
input_information.I1 = get_url(options.input_I1)
input_information.I2 = get_url(options.input_I2)
input_information.mu_ref = get_url(options.input_mu_ref)
exec_(scheme=scheme, input_information=input_information)
xas_obj = get_xas_obj(input_information=input_information)
graph = ows_to_ewoks(filename=options.workflow_file)
varinfo = {
"xas_obj": xas_obj.to_dict(),
}
if options.output_file:
varinfo["outputdir"] = options.output_file
graph.execute(varinfo=varinfo)
if __name__ == "__main__":
......
......@@ -31,8 +31,8 @@ import unittest
import tempfile
import shutil
from est.core.utils import DownloadDataset
from ..process import exec_
from pypushflow.representation.scheme.ows_parser import OwsParser
import pytest
from ewoksorange.owsconvert import ows_to_ewoks
import os
from est.io.utils.information import InputInformation
from silx.io.url import DataUrl
......@@ -52,28 +52,41 @@ class TestWorkflowFromOwsFile(unittest.TestCase):
def setUp(self):
self.outputdir = tempfile.mkdtemp()
file_ = "ows_files/pymca_workflow_2.ows"
DownloadDataset(dataset=file_, output_folder=self.outputdir, timeout=2.0)
self.orange_file = os.path.join(self.outputdir, "pymca_workflow_2.ows")
self.input_file1 = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
for workflow_file in (
"ows_files/example_pymca.ows",
"ows_files/example_larch.ows",
):
DownloadDataset(
dataset=workflow_file, output_folder=self.outputdir, timeout=2.0
)
self.pymca_orange_file = os.path.join(self.outputdir, "example_pymca.ows")
self.larch_orange_file = os.path.join(self.outputdir, "example_larch.ows")
self.data_input_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
self.output_file = os.path.join(self.outputdir, "output.h5")
def tearDown(self):
shutil.rmtree(self.outputdir)
def testPyMcaWorkflow(self):
def testExecWorkflow(self):
"""Test regarding the instantiation of the pymcaXAS"""
exec_(
scheme=OwsParser.scheme_load(self.orange_file, load_handlers=True),
input_information=InputInformation(
spectra_url=DataUrl(
file_path=self.input_file1, scheme="PyMca", data_path="Column 1"
),
channel_url=DataUrl(
file_path=self.input_file1, scheme="PyMca", data_path="Column 2"
),
),
output_=self.output_file,
)
self.assertTrue(os.path.exists(self.output_file))
# TODO: check the values in the output file
for workflow_file in (self.pymca_orange_file, self.larch_orange_file):
with self.subTest(workflow_file=workflow_file):
graph = ows_to_ewoks(filename=workflow_file)
input_information = InputInformation(
spectra_url=DataUrl(
file_path=self.data_input_file,
scheme="PyMca",
data_path="Column 1",
),
channel_url=DataUrl(
file_path=self.data_input_file,
scheme="PyMca",
data_path="Column 2",
),
)
varinfo = {
"input_information": input_information.to_dict(),
"output_file": self.output_file,
}
graph.execute(varinfo=varinfo)
self.assertTrue(os.path.exists(self.output_file))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment