Commit bc449ff9 authored by payno's avatar payno
Browse files

[app] add application to interprete .ows files

- we can convert .ows to a gui free workflow.
- user can give input and output file to overwrite the one existing in xas.

./bootstrap.py xas process pymca_workflow.ows -i /usr/share/pymca/EXAFS_Cu.dat -o ./output.h5
parent ccfdf061
Pipeline #12037 failed with stage
in 13 minutes and 12 seconds
......@@ -14,6 +14,7 @@ variables:
- export PYTHONPATH="${PYTHONPATH}:/usr/lib/python3/dist-packages/"
- export LD_LIBRARY_PATH=/lib/i386-linux-gnu/:${LD_LIBRARY_PATH}
- export LD_LIBRARY_PATH=/lib/x86_64-linux-gnu/:${LD_LIBRARY_PATH}
- export XAS_TEST_PROCESS=False
- python --version
- python -m pip install pip --upgrade
- python -m pip install setuptools --upgrade
......
......@@ -34,6 +34,7 @@ from silx.gui import qt
from Orange.widgets.settings import Setting
from xas.core.types import XASBase
from xas.core.types import XASFactory
import xas.core.process.io
import logging
_logger = logging.getLogger(__file__)
......@@ -56,6 +57,7 @@ class XASOutputOW(OWWidget):
inputs = [("spectrum", XASBase, 'process')]
_output_file_setting = Setting(str())
process_function = xas.core.process.io.writer_pymca_cas
def __init__(self):
super().__init__()
......
import argparse
import sys
import os
from ..pushworkflow.scheme.parser import scheme_load
from silx.io.url import DataUrl
import logging
try:
import h5py
......@@ -10,10 +8,34 @@ try:
except:
has_h5py = False
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger(__name__)
def exec_(scheme, input_):
def exec_(scheme, input_, output_):
def manage_input():
if input_:
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and '_input_file_setting' in node.properties:
node.properties['_input_file_setting'] = input_
manage_input()
def manage_output():
if output_:
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and '_output_file_setting' in node.properties:
node.properties['_output_file_setting'] = output_
# TODO: if the input file contains configuration, then overwrite the node
# properties...
manage_output()
if len(scheme._start_actor.listDownStreamActor) is 0:
_logger.warning('no downstream node defined')
return None
scheme._start_actor.trigger(input_)
scheme._end_actor.join()
return scheme._end_actor.outData
......@@ -33,25 +55,15 @@ def main(argv):
'workflow_file',
help='Workflow file providing the workflow description (.ows, .xml)')
parser.add_argument("-i", dest="input_", default=None,
help="Input of the workflow")
help="Input of the workflow. require at most one "
"instance of XASInputOW")
parser.add_argument("-o", dest="output_", default=None,
help="Output file of the workflow. Require at most one "
"instance of XASOutputOW")
options = parser.parse_args(argv[1:])
scheme = scheme_load(_file=options.file_path)
if options.input_ is None:
input_ = None
else:
if not os.path.isfile(options.input_):
raise ValueError('input should be a valid file path, containing'
'spectrum information with optional configuration')
if has_h5py and h5py.is_hdf5(options.input_):
input_ = getInputFrmHdf5()
else:
input_ = DataUrl(file_path=options.input_, scheme='PyMca')
exec_(scheme, input_)
scheme = scheme_load(_file=options.workflow_file)
exec_(scheme, options.input_, options.output_)
if __name__ == "__main__":
main(sys.argv)
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/12/2019"
import logging
import unittest
from .test_exec import suite as test_exec_suite
_logger = logging.getLogger(__name__)
def suite():
test_suite = unittest.TestSuite()
test_suite.addTest(test_exec_suite())
return test_suite
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/26/2019"
import unittest
import tempfile
import shutil
from xas.core.utils import DownloadDataset, url_base
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
from ...pushworkflow.scheme.parser import scheme_load
from ..process import exec_
import os
class TestWorkflowFromOwsFile(unittest.TestCase):
"""test construction of XAS object"""
def setUp(self):
self.outputdir = tempfile.mkdtemp()
file_ = 'pymca_workflow.ows'
DownloadDataset(dataset=file_,
output_folder=self.outputdir,
timeout=10.0)
self.orange_file = os.path.join(self.outputdir, file_)
self.input_file1 = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
self.output_file = os.path.join(self.outputdir, 'output.h5')
def tearDown(self):
shutil.rmtree(self.outputdir)
def testPyMcaWorkflow(self):
"""Test regarding the instantiation of the pymcaXAS"""
exec_(scheme=scheme_load(self.orange_file),
input_=self.input_file1,
output_=self.output_file)
self.assertTrue(os.path.exists(self.output_file))
# TODO: check the values in the output file
def suite():
test_suite = unittest.TestSuite()
for ui in (TestWorkflowFromOwsFile, ):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest="suite")
......@@ -47,8 +47,10 @@ def read_pymca_xas(spectrum_url, config_url=None):
"""
if type(spectrum_url) is str:
spectrum_url = DataUrl(file_path=spectrum_url, scheme='PyMca')
assert isinstance(spectrum_url, DataUrl)
assert config_url is None or isinstance(config_url, DataUrl)
if not isinstance(spectrum_url, DataUrl):
raise TypeError('given input for spectrum is invalid')
if not (config_url is None or isinstance(config_url, DataUrl)):
raise TypeError('given input for configuration is invalid')
# deal with h5file path
if h5py.is_hdf5(spectrum_url.file_path()):
......@@ -72,3 +74,30 @@ def read_pymca_xas(spectrum_url, config_url=None):
raise ValueError('Energy and/or Mu keys are not available in '
'given url data', spectrum_url)
return PyMcaXAS(spectrum=spectrum, configuration=config)
class writer_pymca_cas(object):
"""
class to write the output file. In this case we need a class in order to
setup the output file before
"""
def __init__(self):
self._output_file = None
def setProperties(self, properties):
if '_output_file_setting' in properties:
self._output_file = properties['_output_file_setting']
def dump_pymca_xas(self, xas_obj):
if not self._output_file:
print('no output file defined, please give path to the output file')
self._output_file = input()
_logger.info('dumping xas obj... to', self._output_file)
if isinstance(xas_obj, dict):
_xas_obj = PyMcaXAS().load_frm_dict(xas_obj)
else:
_xas_obj = xas_obj
print('dump xas obj to', self._output_file)
_xas_obj.dump(self._output_file)
__call__ = dump_pymca_xas
......@@ -38,6 +38,7 @@ from xas.core.process.k_weight import pymca_k_weight
from xas.core.process.normalization import pymca_normalization
@unittest.skipUnless(os.environ.get('XAS_TEST_PROCESS', True), 'can fail on CI')
class TestStream(unittest.TestCase):
"""Make sure the process have valid io"""
def test_pymca_process(self):
......
# coding: utf-8
# ##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "17/05/2017"
import os
import logging
import shutil
from urllib.request import urlopen, ProxyHandler, build_opener
logger = logging.getLogger(__file__)
url_base = "http://www.edna-site.org/pub/xas/"
def DownloadDataset(dataset, output_folder, timeout, unpack=False):
# create if needed path scan
url = url_base + dataset
logger.info("Trying to download scan %s, timeout set to %ss",
dataset, timeout)
dictProxies = {}
if "http_proxy" in os.environ:
dictProxies['http'] = os.environ["http_proxy"]
dictProxies['https'] = os.environ["http_proxy"]
if "https_proxy" in os.environ:
dictProxies['https'] = os.environ["https_proxy"]
if dictProxies:
proxy_handler = ProxyHandler(dictProxies)
opener = build_opener(proxy_handler).open
else:
opener = urlopen
logger.info("wget %s" % url)
data = opener(url, data=None, timeout=timeout).read()
logger.info("Image %s successfully downloaded." % dataset)
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
try:
archive_folder = os.path.join(output_folder, os.path.basename(dataset))
with open(archive_folder, "wb") as outfile:
outfile.write(data)
except IOError:
raise IOError("unable to write downloaded \
data to disk at %s" % archive_folder)
if unpack is True:
shutil.unpack_archive(archive_folder, extract_dir=output_folder, format='bztar')
os.remove(archive_folder)
......@@ -40,12 +40,14 @@ logger = logging.getLogger(__name__)
def suite():
from ..core import test as test_core
from ..app import test as test_app
from ..pushworkflowactors import test as test_workflow_cases
test_suite = unittest.TestSuite()
# test sx first cause qui tests load ipython module
test_suite.addTest(test_core.suite())
test_suite.addTest(test_workflow_cases.suite())
test_suite.addTest(test_app.suite())
return test_suite
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment