Commit b854a288 authored by payno's avatar payno
Browse files

[app] add reprocessing application.

Allows to reexecute all treatment from a .h5 containing the process definition.
parent c68743b3
Pipeline #12722 passed with stage
in 4 minutes and 23 seconds
import argparse
import sys
import logging
from xas.core.types import XASObject
from xas.core.reprocessing import get_process_instance_frm_h5_desc
try:
import h5py
has_h5py = True
except:
has_h5py = False
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger(__name__)
def exec_(h5_file, entry='scan1', spectra_path=None, energy_path=None,
configuration_path=None):
try:
xas_obj = XASObject.from_file(h5_file=h5_file,
entry=entry or 'scan1',
spectra_path=spectra_path or 'data/absorbed_beam',
energy_path=energy_path or 'data/energy',
configuration_path=configuration_path)
xas_obj.link_to_h5(h5_file=h5_file)
except Exception as e:
_logger.error(e)
xas_obj = None
if not xas_obj:
_logger.error('Unable to create an object from the given file and path')
xas_processes = xas_obj.get_process_flow()
if len(xas_processes) == 0:
_logger.warning('no xas process found in the given h5 file')
return
xas_obj.clean_process_flow()
for i_process, process in xas_processes.items():
process_ins = get_process_instance_frm_h5_desc(process)
xas_obj = process_ins(xas_obj)
return xas_obj
def getinputinfo():
return "xas reprocessing workflow_file.h5"
def getInputFrmHdf5(file_path):
raise NotImplementedError()
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'workflow_file',
help='Workflow file providing the workflow description (.h5)')
parser.add_argument("--spectra_path", dest="spectra_path", default=None,
help="path to the spectra data")
parser.add_argument("--energy_path", dest="energy_path", default=None,
help="path to the energy data")
parser.add_argument("--configuration_path", dest="configuration_path", default=None,
help="path to the process configuration")
parser.add_argument("--entry", dest="entry to load",
default=None,
help="path to the process configuration")
options = parser.parse_args(argv[1:])
exec_(h5_file=options.workflow_file, spectra_path=options.spectra_path,
energy_path=options.energy_path,
configuration_path=options.configuration_path)
if __name__ == "__main__":
main(sys.argv)
......@@ -30,6 +30,7 @@ __date__ = "06/12/2019"
import logging
import unittest
from .test_exec import suite as test_exec_suite
from .test_reprocessing import suite as test_reprocessing_suite
_logger = logging.getLogger(__name__)
......
......@@ -30,7 +30,7 @@ __date__ = "06/26/2019"
import unittest
import tempfile
import shutil
from xas.core.utils import DownloadDataset, url_base
from xas.core.utils import DownloadDataset
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
from ...pushworkflow.scheme.parser import scheme_load
from ..process import exec_
......
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/26/2019"
from xas.core.process.exafs import PyMca_exafs
from xas.core.process.normalization import PyMca_normalization
from xas.core.process.k_weight import PyMca_k_weight
from xas.core.process.ft import PyMca_ft
from xas.core.utils import spectra as spectra_utils
from xas.core.types import XASObject
from xas.core.io import XASWriter
from ..reprocessing import exec_
import unittest
import tempfile
import shutil
import numpy
import os
class TestReprocessing(unittest.TestCase):
"""test construction of XAS object"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
spectra = spectra_utils.create_dataset(shape=(10, 4, 4))
energy = numpy.linspace(start=3.26, stop=3.96, num=10)
self.xas_obj_ref = XASObject(spectra=spectra, energy=energy)
self.h5_file = os.path.join(self.output_dir, 'output_file.h5')
out = PyMca_normalization()(xas_obj=self.xas_obj_ref)
out = PyMca_exafs()(xas_obj=out)
out = PyMca_k_weight()(xas_obj=out)
out = PyMca_ft()(xas_obj=out)
out = PyMca_normalization()(xas_obj=out)
writer = XASWriter()
writer.output_file = self.h5_file
writer(out)
assert out.spectra[0].ft.intensity is not None
def tearDown(self):
shutil.rmtree(self.output_dir)
def test(self):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(res_xas_obj.spectra[0].ft.intensity,
self.xas_obj_ref.spectra[0].ft.intensity)
def suite():
test_suite = unittest.TestSuite()
for ui in (TestReprocessing, ):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest="suite")
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""some utils function for executing reprocessing"""
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "07/18/2019"
import importlib
import logging
_logger = logging.getLogger(__name__)
def get_process_instance_frm_h5_desc(desc):
"""
:param dict desc: description of the process to instanciate
:return: instance of the process to execute, configured from the description
"""
assert 'program' in desc
assert 'class_instance' in desc
tmp = str(desc['class_instance']).split('.')
module_name = '.'.join(tmp[:-1])
class_name = tmp[-1]
try:
_module = importlib.import_module(module_name)
instance = getattr(_module, class_name)()
except Exception as e:
_logger.warning(' '.join(('Fail to instanciate', module_name, class_name,
'reason is', e)))
instance = None
else:
if 'configuration' in desc:
instance.setConfiguration(desc['configuration'])
return instance
......@@ -99,15 +99,6 @@ def read_xas(spectra_url, channel_url, config_url=None):
return (spectra, energy, configuration)
def str_to_utf8(text):
"""Convert str or sequence of str to type compatible with h5py
:param Union[str,List[str]] text:
:rtype: numpy.ndarray
"""
return numpy.array(text, dtype=h5py.special_dtype(vlen=str))
# TODO: process, sample, energy and data can be embed in a 'result' / output object
def write_xas_proc(h5_file, entry, process, data, processing_order, data_path='/'):
"""
......@@ -132,6 +123,9 @@ def write_xas_proc(h5_file, entry, process, data, processing_order, data_path='/
nx_process['version'] = process.program_version()
nx_process['date'] = datetime.now().replace(microsecond=0).isoformat()
nx_process['processing_order'] = numpy.int32(processing_order)
_class = process.__class__
nx_process['class_instance'] = '.'.join((_class.__module__,
_class.__name__))
nx_data = nx_entry.require_group('data')
nx_data.attrs['NX_class'] = "NXdata"
......@@ -232,7 +226,7 @@ def get_xasproc(h5_file, entry):
res = {}
res['_h5py_path'] = h5_group.name
relevant_keys = ('program', 'version', 'data', 'parameters',
'processing_order')
'processing_order', 'configuration', 'class_instance')
for key in h5_group.keys():
# for now we don't want to copy the numpy array (data)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment