Commit c4d9c140 authored by payno's avatar payno
Browse files

[io] add manamgent of process flow within .h5 file.

parent adbb7b3a
......@@ -130,7 +130,6 @@ class PyMca_exafs(Process):
advancement"""
def __init__(self):
Process.__init__(self, name='exafs')
self._settings = None
def setProperties(self, properties):
if '_pymcaSettings' in properties:
......@@ -145,6 +144,8 @@ class PyMca_exafs(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("EXAFSKValues",
"EXAFSSignal"))
return _xas_obj
def _pool_process(self, xas_obj):
......@@ -187,9 +188,6 @@ class PyMca_exafs(Process):
return PyMca5.version()
def program_name(self):
return 'exafs'
def getConfiguration(self):
return self._settings
return 'pymca_exafs'
__call__ = process
......@@ -126,7 +126,6 @@ _USE_MULTIPROCESSING_POOL = False
class PyMca_ft(Process):
def __init__(self):
Process.__init__(self, name='ft')
self._settings = None
def setProperties(self, properties):
if '_pymcaSettings' in properties:
......@@ -148,6 +147,9 @@ class PyMca_ft(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("FTRadius",
"FTIntensity",
"FTImaginary"))
return _xas_obj
def _pool_process(self, xas_obj):
......@@ -179,4 +181,15 @@ class PyMca_ft(Process):
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
def definition(self):
return "fourier transform"
def program_version(self):
import PyMca5
return PyMca5.version()
def program_name(self):
return 'pymca_ft'
__call__ = process
......@@ -100,7 +100,12 @@ class PyMca_k_weight(Process):
def setProperties(self, properties):
if '_kWeightSetting' in properties:
self._k_weight = properties['_kWeightSetting']
self._settings['k_weight'] = properties['_kWeightSetting']
def _k_weight(self):
if 'k_weight' in self._settings:
return self._settings['k_weight']
return None
def process(self, xas_obj):
"""
......@@ -139,6 +144,7 @@ class PyMca_k_weight(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj=xas_obj, data_keys={})
return _xas_obj
def _pool_process(self, xas_obj):
......@@ -158,4 +164,14 @@ class PyMca_k_weight(Process):
overwrite=True)
p.map(partial_, xas_obj.spectra)
def definition(self):
return "Define k weight for xas treatment"
def program_version(self):
import PyMca5
return PyMca5.version()
def program_name(self):
return 'pymca_k_weight'
__call__ = process
......@@ -108,7 +108,6 @@ class PyMca_normalization(Process):
def __init__(self):
Process.__init__(self, 'normalization')
self._advancement = Progress(self.name)
self._settings = None
def setProperties(self, properties):
if '_pymcaSettings' in properties:
......@@ -136,6 +135,9 @@ class PyMca_normalization(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("NormalizedEnergy",
"NormalizedMu",
"NormalizedSignal"))
return _xas_obj
def _pool_process(self, xas_obj):
......@@ -177,9 +179,6 @@ class PyMca_normalization(Process):
return PyMca5.version()
def program_name(self):
return 'normalization'
def getConfiguration(self):
return self._settings
return 'pymca_normalization'
__call__ = process
......@@ -45,6 +45,8 @@ class Process(object):
self._advancement = Progress(name=name)
self.__stop = False
"""flag to notice when a end of process is required"""
self._settings = {}
# configuration
@property
def name(self):
......@@ -86,4 +88,28 @@ class Process(object):
:return: configuration of the process
:rtype: dict
"""
raise NotImplementedError('Base class')
if len(self._settings) > 0:
return self._settings
else:
return None
def register_process(self, xas_obj, data_keys):
"""
Store the current process in the linked h5 file if any,
output data stored will be the one defined by the data_keys
:param :class:`.XASObject` xas_obj: object for which we want to save the
treatment
:param tuple data_keys: keys of the id to save
"""
if xas_obj.has_linked_file():
_data = {}
for key in data_keys:
try:
_data[key] = XASObject._spectra_volume(xas_obj.spectra,
key=key,
dim_1=xas_obj.dim1,
dim_2=xas_obj.dim2)
except KeyError:
pass
xas_obj.register_processing(process=self, data=_data)
......@@ -39,11 +39,19 @@ from xas.core.process.normalization import pymca_normalization, PyMca_normalizat
from xas.core.process.roi import xas_roi, _ROI as XASROI
from xas.core.utils import spectra as spectra_utils
from xas.core.types import XASObject
import tempfile
import shutil
import numpy
class TestStreamSingleSpectrum(unittest.TestCase):
"""Make sure the process have valid io"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.output_dir)
def test_pymca_process(self):
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
out = read_pymca_xas(spectra_url=DataUrl(file_path=data_file,
......@@ -97,6 +105,41 @@ class TestStreamSingleSpectrum(unittest.TestCase):
self.assertTrue(out.spectra[0].ft is not None)
self.assertTrue(len(out.spectra[0].ft['FTIntensity']) > 0)
def test_h5_link_xas_object(self):
"""Test that the processing can be stored continuously on a .h5 file"""
spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
energy = numpy.linspace(start=3.26, stop=3.96, num=256)
roi = XASROI(origin=(0, 2), size=(5, 1))
xas_obj = XASObject(spectra=spectra, energy=energy,
configuration={'roi': roi.to_dict()})
h5_file = os.path.join(self.output_dir, 'putput_file.h5')
xas_obj.link_to_h5(h5_file)
self.assertTrue(xas_obj.linked_h5_file is not None)
out = PyMca_normalization()(xas_obj=xas_obj)
out = PyMca_exafs()(xas_obj=out)
out = PyMca_k_weight()(xas_obj=out)
out = PyMca_ft()(xas_obj=out)
out = PyMca_normalization()(xas_obj=out)
assert isinstance(out, XASObject)
assert out.linked_h5_file is h5_file
# then check all process are correctly registered with the valid id...
processes = xas_obj.get_process_flow()
shutil.copy(h5_file, '/nobackup/linazimov/payno/dev/esrf/xas/test_workflow.h5')
self.assertEqual(len(processes), 5)
self.assertEqual(processes[1]['program'], 'pymca_normalization')
self.assertEqual(processes[2]['program'], 'pymca_exafs')
self.assertEqual(processes[5]['program'], 'pymca_normalization')
# TODO: make sure configuration is also store
# then test the remove processed entry
xas_obj.clean_process_flow()
processes = xas_obj.get_process_flow()
self.assertEqual(len(processes), 0)
def test_h5_link_dict(self):
"""Same test as test_h5_link_xas_object but with a dict pass between
processes"""
pass
def suite():
test_suite = unittest.TestSuite()
......
......@@ -32,6 +32,7 @@ from silx.io.dictdump import dicttoh5, h5todict
import copy
import numpy
import logging
import h5py
_logger = logging.getLogger(__name__)
......@@ -39,12 +40,15 @@ class XASObject(object):
"""Base class of XAS"""
def __init__(self, spectra=None, energy=None, configuration=None, dim1=None,
dim2=None):
dim2=None, name='scan1'):
self.__channels = None
self.__spectra = []
self.__energy = None
self.__dim1 = 0
self.__dim2 = 0
self.__processing_index = 0
self.__h5_file = None
self.__entry_name = name
self.spectra = (energy, spectra, dim1, dim2)
self.configuration = configuration
......@@ -277,6 +281,97 @@ class XASObject(object):
assert isinstance(self.spectra[0], Spectrum)
return self.spectra[0].keys()
@property
def linked_h5_file(self):
return self.__h5_file
def link_to_h5(self, h5_file):
"""
Associate a .h5 file to the XASObject. This can be used for storing
process flow.
:param h5_file:
:return:
"""
self.__h5_file = h5_file
def has_linked_file(self):
return self.__h5_file is not None
def get_next_processing_index(self):
self.__processing_index += 1
return self.__processing_index
def register_processing(self, process, data):
"""
Register one process for the current xas object. This require to having
link a h5file to this object
:param :class:`.Process` process:
:param data: result of the processing. If there is more than one
result then a dictionary with the key under which result
should be saved and a numpy.ndarray
:type: Union[numpy.ndarray, dict]
"""
import xas.io
xas.io.write_xas_proc(self.linked_h5_file, entry=self.__entry_name,
processing_order=self.get_next_processing_index(),
process=process, data=data)
def get_process_flow(self):
"""
:return: the dict of process information
:rtype: dict
"""
import xas.io
if not self.linked_h5_file:
_logger.warning('process flow is store in the linked .h5 file. If'
'no link is defined then this information is not'
'stored')
return {}
else:
recognized_process = xas.io.get_xasproc(self.linked_h5_file,
entry=self.__entry_name)
know_process = ('pymca_normalization', 'pymca_exafs', 'pymca_ft',
'pymca_k_weight')
def filter_recognized_process(process_list):
res = []
for process_ in process_list:
if 'program' in process_.keys() and process_['program'] in know_process:
res.append(process_)
return res
recognized_process = filter_recognized_process(recognized_process)
def get_ordered_process(process_list):
res = {}
for process_ in process_list:
if not 'processing_order' in process_:
_logger.warning('one processing has not processing order: ' + process_['program'])
else:
processing_order = int(process_['processing_order'])
res[processing_order] = process_
return res
return get_ordered_process(recognized_process)
def clean_process_flow(self):
"""
Remove existing process flow
"""
if not self.linked_h5_file:
_logger.warning('process flow is store in the linked .h5 file. If'
'no link is defined then this information is not'
'stored')
else:
process_flow = self.get_process_flow()
with h5py.File(self.linked_h5_file) as h5f:
for index, process_ in process_flow.items():
del h5f[process_['_h5py_path']]
pass
# TODO: add the spectra class. Would speed up and simplify stuff probably
class Spectra(object):
......
......@@ -36,6 +36,7 @@ from xas.core.types import XASObject
from xas.core.process.process import Process
from silx.utils.enum import Enum
from datetime import datetime
import numpy
_logger = logging.getLogger(__name__)
......@@ -191,7 +192,7 @@ def write_xas_proc(h5_file, entry, process, data, processing_order, data_path='/
nx_entry = h5f.require_group('/'.join((data_path, entry)))
nx_entry.attrs["NX_class"] = "NXentry"
nx_process = nx_entry.require_group(process.name)
nx_process = nx_entry.require_group('xas_process_' + str(processing_order))
nx_process.attrs['NX_class'] = "NXprocess"
nx_process['program'] = process.program_name()
nx_process['version'] = process.program_version()
......@@ -202,8 +203,23 @@ def write_xas_proc(h5_file, entry, process, data, processing_order, data_path='/
nx_data.attrs['NX_class'] = "NXdata"
nx_data.attrs['signal'] = 'data'
nx_process['data'] = data
nx_process['data'].attrs['interpretation'] = 'image'
def get_interpretation(mydata):
if isinstance(mydata, numpy.ndarray):
if mydata.ndim is 1:
return 'spectrum'
elif mydata.ndim in (2, 3):
return 'image'
return None
if isinstance(data, numpy.ndarray):
data_ = {'data': data}
else:
data_ = data
for key, value in data_.items():
nx_process[key] = value
interpretation = get_interpretation(value)
if interpretation:
nx_process[key].attrs['interpretation'] = interpretation
if process.getConfiguration() is not None:
from silx.io.dictdump import dicttoh5
......@@ -268,6 +284,49 @@ def write_xas(h5_file, entry, sample, energy, mu, start_time=None,
nx_entry['definition'] = definition
def get_xasproc(h5_file, entry):
"""
Return the list of all NXxasproc existing at the data_path level
:param str h5_file: hdf5 file
:param str entry: data location
:return:
:rtype: list
"""
def copy_nx_xas_process(h5_group):
"""copy base information from nx_xas_process"""
res = {}
res['_h5py_path'] = h5_group.name
relevant_keys = ('program', 'version', 'data', 'parameters',
'processing_order')
for key in h5_group.keys():
# for now we don't want to copy the numpy array (data)
if key in relevant_keys:
res[key] = h5_group[key][...]
return res
res = []
with h5py.File(h5_file) as h5f:
try:
root_group = h5f[entry]
except KeyError:
_logger.warning(entry + ' does not exist in ' + h5_file)
else:
for key in root_group.keys():
elmt = root_group[key]
if hasattr(elmt, 'attrs') and 'NX_class' in elmt.attrs:
if elmt.attrs['NX_class'] == 'NXprocess':
nx_xas_proc = copy_nx_xas_process(elmt)
if len(nx_xas_proc) == 0:
_logger.warning('one xas process was not readable '
'from the hdf5 file at:' + key)
else:
res.append(nx_xas_proc)
return res
if __name__ == '__main__':
import os
from xas.core.process.normalization import PyMca_normalization
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment