Commit 4438dfc1 authored by payno's avatar payno
Browse files

[core][XASObject] deal with the linked h5 file within dict

parent c4d9c140
......@@ -144,7 +144,7 @@ class PyMca_exafs(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("EXAFSKValues",
self.register_process(_xas_obj, data_keys=("EXAFSKValues",
"EXAFSSignal"))
return _xas_obj
......
......@@ -147,7 +147,7 @@ class PyMca_ft(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("FTRadius",
self.register_process(_xas_obj, data_keys=("FTRadius",
"FTIntensity",
"FTImaginary"))
return _xas_obj
......
......@@ -144,7 +144,7 @@ class PyMca_k_weight(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj=xas_obj, data_keys={})
self.register_process(xas_obj=_xas_obj, data_keys={})
return _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -135,7 +135,7 @@ class PyMca_normalization(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(xas_obj, data_keys=("NormalizedEnergy",
self.register_process(_xas_obj, data_keys=("NormalizedEnergy",
"NormalizedMu",
"NormalizedSignal"))
return _xas_obj
......
......@@ -46,12 +46,6 @@ import numpy
class TestStreamSingleSpectrum(unittest.TestCase):
"""Make sure the process have valid io"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.output_dir)
def test_pymca_process(self):
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
out = read_pymca_xas(spectra_url=DataUrl(file_path=data_file,
......@@ -71,9 +65,9 @@ class TestStreamSingleSpectrum(unittest.TestCase):
channel_url=DataUrl(file_path=data_file,
scheme='PyMca'))
out = pymca_normalization(xas_obj=out.to_dict())
out = pymca_exafs(xas_obj=out)
out = pymca_k_weight(xas_obj=out)
out = pymca_ft(xas_obj=out)
out = pymca_exafs(xas_obj=out.to_dict())
out = pymca_k_weight(xas_obj=out.to_dict())
out = pymca_ft(xas_obj=out.to_dict())
assert isinstance(out, XASObject)
def test_pymca_process_with_cls(self):
......@@ -105,45 +99,73 @@ class TestStreamSingleSpectrum(unittest.TestCase):
self.assertTrue(out.spectra[0].ft is not None)
self.assertTrue(len(out.spectra[0].ft['FTIntensity']) > 0)
def test_h5_link_xas_object(self):
"""Test that the processing can be stored continuously on a .h5 file"""
class TestWorkflowAndH5LinkedFile(unittest.TestCase):
"""
Test that the workflow can process and store process in a targeted h5 file
"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
energy = numpy.linspace(start=3.26, stop=3.96, num=256)
roi = XASROI(origin=(0, 2), size=(5, 1))
xas_obj = XASObject(spectra=spectra, energy=energy,
configuration={'roi': roi.to_dict()})
h5_file = os.path.join(self.output_dir, 'putput_file.h5')
xas_obj.link_to_h5(h5_file)
self.assertTrue(xas_obj.linked_h5_file is not None)
out = PyMca_normalization()(xas_obj=xas_obj)
self.xas_obj = XASObject(spectra=spectra, energy=energy,
configuration={'roi': roi.to_dict()})
self.h5_file = os.path.join(self.output_dir, 'putput_file.h5')
self.xas_obj.link_to_h5(self.h5_file)
def tearDown(self):
shutil.rmtree(self.output_dir)
def test_h5_link_xas_object(self):
"""Test that the processing can be stored continuously on a .h5 file"""
self.assertTrue(self.xas_obj.linked_h5_file is not None)
out = PyMca_normalization()(xas_obj=self.xas_obj)
out = PyMca_exafs()(xas_obj=out)
out = PyMca_k_weight()(xas_obj=out)
out = PyMca_ft()(xas_obj=out)
out = PyMca_normalization()(xas_obj=out)
assert isinstance(out, XASObject)
assert out.linked_h5_file is h5_file
assert out.linked_h5_file is self.h5_file
# then check all process are correctly registered with the valid id...
processes = xas_obj.get_process_flow()
shutil.copy(h5_file, '/nobackup/linazimov/payno/dev/esrf/xas/test_workflow.h5')
processes = self.xas_obj.get_process_flow()
shutil.copy(self.h5_file, '/nobackup/linazimov/payno/dev/esrf/xas/test_workflow.h5')
self.assertEqual(len(processes), 5)
self.assertEqual(processes[1]['program'], 'pymca_normalization')
self.assertEqual(processes[2]['program'], 'pymca_exafs')
self.assertEqual(processes[5]['program'], 'pymca_normalization')
# TODO: make sure configuration is also store
# then test the remove processed entry
xas_obj.clean_process_flow()
processes = xas_obj.get_process_flow()
self.xas_obj.clean_process_flow()
processes = self.xas_obj.get_process_flow()
self.assertEqual(len(processes), 0)
def test_h5_link_dict(self):
"""Same test as test_h5_link_xas_object but with a dict pass between
processes"""
pass
self.assertTrue(self.xas_obj.linked_h5_file is not None)
out = PyMca_normalization()(xas_obj=self.xas_obj.to_dict())
out = PyMca_exafs()(xas_obj=out.to_dict())
out = PyMca_k_weight()(xas_obj=out.to_dict())
out = PyMca_ft()(xas_obj=out.to_dict())
out = PyMca_normalization()(xas_obj=out.to_dict())
assert isinstance(out, XASObject)
assert out.linked_h5_file
assert out.linked_h5_file == self.h5_file
# then check all process are correctly registered with the valid id...
processes = self.xas_obj.get_process_flow()
shutil.copy(self.h5_file, '/nobackup/linazimov/payno/dev/esrf/xas/test_workflow.h5')
self.assertEqual(len(processes), 5)
self.assertEqual(processes[1]['program'], 'pymca_normalization')
self.assertEqual(processes[2]['program'], 'pymca_exafs')
self.assertEqual(processes[5]['program'], 'pymca_normalization')
self.xas_obj.clean_process_flow()
processes = self.xas_obj.get_process_flow()
self.assertEqual(len(processes), 0)
def suite():
test_suite = unittest.TestSuite()
for ui in (TestStreamSingleSpectrum,):
for ui in (TestStreamSingleSpectrum, TestWorkflowAndH5LinkedFile):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
......
......@@ -167,6 +167,8 @@ class XASObject(object):
}
if with_process_details is True:
res['spectra'] = get_list_spectra()
res['linked_h5_file'] = self.linked_h5_file
res['current_processing_index'] = self.__processing_index
return res
......@@ -225,6 +227,11 @@ class XASObject(object):
dim2 = ddict['dim2']
else:
dim2 = None
if 'linked_h5_file' in ddict:
assert 'current_processing_index' in ddict
self.link_to_h5(ddict['linked_h5_file'])
self.__processing_index = ddict['current_processing_index']
self.spectra = (energy, spectra, dim1, dim2)
if not contains_config_spectrum:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment