Commit 9dc6f969 authored by payno's avatar payno
Browse files

[core] add Spectra class.

- refactor types to create submodule
- Spectra is now a class instead of a simple list from XasObject
- Spectra contains data which is a numpy ndarray of Spectrum. This was it is simpler
to reshape it andd should offer some flexibility for the futures.
- this is the main PR but several smaller one are expected to come for cleaning...
parent 2f342180
Pipeline #39086 failed with stages
in 60 minutes and 24 seconds
......@@ -85,7 +85,7 @@ class TestReprocessingPyMca(unittest.TestCase):
writer = XASWriter()
writer.output_file = self.h5_file
writer(out)
self.assertTrue(out.spectra[0].ft.intensity is not None)
self.assertTrue(out.spectra.data.flat[0].ft.intensity is not None)
self.assertEqual(len(out.get_process_flow()), 5)
def tearDown(self):
......@@ -95,8 +95,8 @@ class TestReprocessingPyMca(unittest.TestCase):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(
res_xas_obj.spectra[0].ft.intensity,
self.xas_obj_ref.spectra[0].ft.intensity,
res_xas_obj.spectra.data.flat[0].ft.intensity,
self.xas_obj_ref.spectra.data.flat[0].ft.intensity,
)
......@@ -147,7 +147,8 @@ class TestReprocessingLarch(unittest.TestCase):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(
res_xas_obj.spectra[0].chir_mag, self.xas_obj_ref.spectra[0].chir_mag
res_xas_obj.spectra.data.flat[0].chir_mag,
self.xas_obj_ref.spectra.data.flat[0].chir_mag,
)
......
......@@ -106,14 +106,14 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessXASObject(self):
res = larch_autobk(self.xas_object)
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "k"))
self.assertTrue(hasattr(spectrum0, "chi"))
def testProcessAsDict(self):
res = larch_autobk(self.xas_object.to_dict())
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "k"))
self.assertTrue(hasattr(spectrum0, "chi"))
......
......@@ -106,14 +106,14 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessXASObject(self):
res = larch_mback(self.xas_object)
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "fpp"))
self.assertTrue(hasattr(spectrum0, "f2"))
def testProcessAsDict(self):
res = larch_mback(self.xas_object.to_dict())
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "fpp"))
self.assertTrue(hasattr(spectrum0, "f2"))
......
......@@ -117,14 +117,14 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessXASObject(self):
res = larch_mback_norm(self.xas_object)
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "mback_mu"))
self.assertTrue(hasattr(spectrum0, "norm_mback"))
def testProcessAsDict(self):
res = larch_mback_norm(self.xas_object.to_dict())
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "mback_mu"))
self.assertTrue(hasattr(spectrum0, "norm_mback"))
......
......@@ -106,14 +106,14 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessXASObject(self):
res = larch_pre_edge(self.xas_object)
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "pre_edge"))
self.assertTrue(hasattr(spectrum0, "edge"))
def testProcessAsDict(self):
res = larch_pre_edge(self.xas_object.to_dict())
assert isinstance(res, XASObject)
spectrum0 = res.spectra[0]
spectrum0 = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum0, "pre_edge"))
self.assertTrue(hasattr(spectrum0, "edge"))
......
......@@ -113,7 +113,7 @@ class TestLarchSpectra(unittest.TestCase):
configuration=self.configuration,
)
# for xftf we need to compute pre edge before
spectrum = self.xas_object.spectra[0]
spectrum = self.xas_object.spectra.data.flat[0]
assert hasattr(spectrum, "k")
assert hasattr(spectrum, "chi")
......@@ -123,7 +123,7 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessXASObject(self):
res = larch_xftf(self.xas_object)
assert isinstance(res, XASObject)
spectrum = res.spectra[0]
spectrum = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum, "chir_re"))
self.assertTrue(hasattr(spectrum, "chir_im"))
self.assertTrue(hasattr(spectrum, "chir"))
......@@ -132,7 +132,7 @@ class TestLarchSpectra(unittest.TestCase):
def testProcessAsDict(self):
res = larch_xftf(self.xas_object.to_dict())
assert isinstance(res, XASObject)
spectrum = res.spectra[0]
spectrum = res.spectra.data.flat[0]
self.assertTrue(hasattr(spectrum, "chir_re"))
self.assertTrue(hasattr(spectrum, "chir_im"))
self.assertTrue(hasattr(spectrum, "chir"))
......
......@@ -83,10 +83,7 @@ class Process(object):
_xas_obj = xas_obj
assert isinstance(_xas_obj, XASObject)
if _xas_obj.n_spectrum > 0:
for spectrum in _xas_obj.spectra:
assert isinstance(spectrum, Spectrum)
assert isinstance(spectrum.energy, (numpy.ndarray, pint.Quantity))
assert isinstance(spectrum.mu, numpy.ndarray)
_xas_obj.spectra.check_validity()
assert isinstance(_xas_obj, XASObject)
return _xas_obj
......
......@@ -169,7 +169,7 @@ class PyMca_exafs(Process):
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
for spectrum in xas_obj.spectra.data.flat:
assert "NormalizedBackground" in spectrum
process_spectr_exafs(
spectrum=spectrum,
......
......@@ -172,9 +172,9 @@ class PyMca_ft(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
assert hasattr(_xas_obj.spectra[0], "ft")
assert hasattr(_xas_obj.spectra[0].ft, "intensity")
assert hasattr(_xas_obj.spectra[0].ft, "imaginary")
assert hasattr(_xas_obj.spectra.data.flat[0], "ft")
assert hasattr(_xas_obj.spectra.data.flat[0].ft, "intensity")
assert hasattr(_xas_obj.spectra.data.flat[0].ft, "imaginary")
self.register_process(
_xas_obj, data_keys=("ft.radius", "ft.intensity", "ft.imaginary")
)
......
......@@ -199,7 +199,7 @@ class PyMca_k_weight(Process):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
assert "KWeight" in xas_obj.configuration
for spectrum in xas_obj.spectra:
for spectrum in xas_obj.spectra.data.flat:
process_spectr_k(
spectrum=spectrum,
configuration=xas_obj.configuration,
......
......@@ -41,7 +41,7 @@ _logger = logging.getLogger(__name__)
def process_spectr_norm(
spectrum,
spectrum: Spectrum,
configuration,
overwrite=True,
callbacks=None,
......@@ -65,6 +65,7 @@ def process_spectr_norm(
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
assert isinstance(spectrum, Spectrum)
_logger.debug("start normalization on spectrum (%s, %s)" % (spectrum.x, spectrum.y))
if spectrum.energy is None or spectrum.mu is None:
_logger.error(
......@@ -162,7 +163,7 @@ class PyMca_normalization(Process):
"""process normalization from a pool"""
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
for spectrum in xas_obj.spectra.data.flat:
process_spectr_norm(
spectrum=spectrum,
configuration=xas_obj.configuration,
......@@ -175,7 +176,7 @@ class PyMca_normalization(Process):
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
for i_spect, spect in enumerate(xas_obj.spectra.data.flat):
res_list.append(None)
output_dict[spect] = i_spect
......
......@@ -65,25 +65,25 @@ class TestEXAFSSingleSpectrum(unittest.TestCase):
# first process normalization
self.preproc_input_ = pymca_normalization(xas_obj=self.input_)
assert "NormalizedBackground" in self.preproc_input_.spectra[0]
for spectrum in self.preproc_input_.spectra:
assert "NormalizedBackground" in self.preproc_input_.spectra.data.flat[0]
for spectrum in self.preproc_input_.spectra.data.flat:
assert "NormalizedBackground" in spectrum
def testPyMcaXASAsInput(self):
res = pymca_exafs(self.preproc_input_)
self.assertTrue(isinstance(res, XASObject))
self.assertTrue("EXAFSKValues" in res.spectra[0])
self.assertTrue("EXAFSSignal" in res.spectra[0])
self.assertTrue("PostEdgeB" in res.spectra[0])
self.assertTrue("EXAFSKValues" in res.spectra.data.flat[0])
self.assertTrue("EXAFSSignal" in res.spectra.data.flat[0])
self.assertTrue("PostEdgeB" in res.spectra.data.flat[0])
def testDictAsInput(self):
"""Test succeed if the input is a dict"""
assert "NormalizedBackground" in self.preproc_input_.spectra[0]
assert "NormalizedBackground" in self.preproc_input_.spectra.data.flat[0]
res = pymca_exafs(self.preproc_input_)
self.assertTrue(isinstance(res, XASObject))
self.assertTrue("EXAFSKValues" in res.spectra[0])
self.assertTrue("EXAFSSignal" in res.spectra[0])
self.assertTrue("PostEdgeB" in res.spectra[0])
self.assertTrue("EXAFSKValues" in res.spectra.data.flat[0])
self.assertTrue("EXAFSSignal" in res.spectra.data.flat[0])
self.assertTrue("PostEdgeB" in res.spectra.data.flat[0])
def suite():
......
......@@ -65,16 +65,16 @@ class TestFTSingleSpectrum(unittest.TestCase):
def testPyMcaXASAsInput(self):
res = pymca_ft(xas_obj=self.xas_obj)
self.assertTrue(isinstance(res, XASObject))
self.assertTrue("FTRadius" in res.spectra[0]["FT"])
self.assertTrue("FTImaginary" in res.spectra[0]["FT"])
self.assertTrue("FTIntensity" in res.spectra[0]["FT"])
self.assertTrue("FTRadius" in res.spectra.data.flat[0]["FT"])
self.assertTrue("FTImaginary" in res.spectra.data.flat[0]["FT"])
self.assertTrue("FTIntensity" in res.spectra.data.flat[0]["FT"])
def testDictAsInput(self):
res = pymca_ft(xas_obj=self.xas_obj.to_dict())
self.assertTrue(isinstance(res, XASObject))
self.assertTrue("FTRadius" in res.spectra[0]["FT"])
self.assertTrue("FTImaginary" in res.spectra[0]["FT"])
self.assertTrue("FTIntensity" in res.spectra[0]["FT"])
self.assertTrue("FTRadius" in res.spectra.data.flat[0]["FT"])
self.assertTrue("FTImaginary" in res.spectra.data.flat[0]["FT"])
self.assertTrue("FTIntensity" in res.spectra.data.flat[0]["FT"])
def suite():
......
......@@ -68,8 +68,8 @@ class TestReadWrite(unittest.TestCase):
)
self.assertTrue(isinstance(res, XASObject))
self.assertEqual(res.n_spectrum, 1)
self.assertTrue("Mu" in res.spectra[0])
self.assertTrue("Energy" in res.spectra[0])
self.assertTrue("Mu" in res.spectra.data.flat[0])
self.assertTrue("Energy" in res.spectra.data.flat[0])
class TestNxWriting(unittest.TestCase):
......
......@@ -32,10 +32,10 @@ import shutil
import tempfile
import unittest
import h5py
import numpy
from silx.io.url import DataUrl
from est.core.io import read as read_xas
from est.core.types import Spectrum, XASObject
from est.core.types import Spectra
from est.core.utils import spectra as spectra_utils
from est.core.types import Dim
......@@ -65,8 +65,8 @@ class TestNormalizationSingleSpectrum(unittest.TestCase):
def testWithXASObjAsInput(self):
res = pymca_normalization(xas_obj=self.xas_obj)
self.assertTrue(isinstance(res, XASObject))
self.assertTrue(isinstance(res.spectra, (tuple, list)))
res_spectrum = res.spectra[0]
self.assertTrue(isinstance(res.spectra, (Spectra)))
res_spectrum = res.spectra.data.flat[0]
self.assertTrue(isinstance(res_spectrum, Spectrum))
self.assertTrue("NormalizedMu" in res_spectrum)
self.assertTrue("NormalizedEnergy" in res_spectrum)
......@@ -75,9 +75,9 @@ class TestNormalizationSingleSpectrum(unittest.TestCase):
def testWithDictAsInput(self):
res = pymca_normalization(xas_obj=self.xas_obj.to_dict())
self.assertTrue(isinstance(res, XASObject))
self.assertTrue("NormalizedMu" in res.spectra[0])
self.assertTrue("NormalizedEnergy" in res.spectra[0])
self.assertTrue("NormalizedSignal" in res.spectra[0])
self.assertTrue("NormalizedMu" in res.spectra.data.flat[0])
self.assertTrue("NormalizedEnergy" in res.spectra.data.flat[0])
self.assertTrue("NormalizedSignal" in res.spectra.data.flat[0])
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
......@@ -110,7 +110,7 @@ class TestNormalizationMultipleSpectrum(unittest.TestCase):
def testWithXASObjAsInput(self):
res = pymca_normalization(xas_obj=self.xas_obj)
self.assertTrue(isinstance(res, XASObject))
for spectrum in self.xas_obj.spectra:
for spectrum in self.xas_obj.spectra.data.flat:
self.assertTrue("NormalizedMu" in spectrum)
self.assertTrue("NormalizedEnergy" in spectrum)
self.assertTrue("NormalizedSignal" in spectrum)
......
......@@ -115,8 +115,8 @@ class TestStreamSingleSpectrum(unittest.TestCase):
out = kweight_process(xas_obj=out)
out = PyMca_ft()(xas_obj=out)
assert isinstance(out, XASObject)
self.assertTrue(out.spectra[0].ft is not None)
self.assertTrue(len(out.spectra[0].ft.intensity) > 0)
self.assertTrue(out.spectra.data.flat[0].ft is not None)
self.assertTrue(len(out.spectra.data.flat[0].ft.intensity) > 0)
def test_pymca_with_roi(self):
energy, spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
......@@ -128,7 +128,9 @@ class TestStreamSingleSpectrum(unittest.TestCase):
self.assertTrue("spectra" in dict_xas_obj.keys())
self.assertTrue("energy" in dict_xas_obj.keys())
tmp_obj = XASObject.from_dict(dict_xas_obj)
numpy.testing.assert_array_equal(tmp_obj.energy, tmp_obj.spectra[0].energy)
numpy.testing.assert_array_equal(
tmp_obj.energy, tmp_obj.spectra.data.flat[0].energy
)
out = xas_roi(dict_xas_obj)
out.configuration = {"EXAFS": self.exafs_configuration, "SET_KWEIGHT": 0}
out = pymca_normalization(xas_obj=out)
......@@ -138,8 +140,8 @@ class TestStreamSingleSpectrum(unittest.TestCase):
assert isinstance(out, XASObject)
self.assertEqual(out.dim1, 1)
self.assertEqual(out.dim2, 5)
self.assertTrue(out.spectra[0].ft is not None)
self.assertTrue(len(out.spectra[0].ft.intensity) > 0)
self.assertTrue(out.spectra.data.flat[0].ft is not None)
self.assertTrue(len(out.spectra.data.flat[0].ft.intensity) > 0)
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
......
......@@ -27,7 +27,9 @@ __authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/11/2019"
from est.core.types import XASObject, Spectrum
from est.core.types import XASObject
from est.core.types import Spectrum
from est.core.types import Spectra
from .process import Process
import logging
import numpy
......@@ -123,7 +125,9 @@ class ROIProcess(Process):
# the following. Should be part of spectra object
volumes = {}
for key in xas_obj.spectra_keys():
if isinstance(xas_obj.spectra[0][key], (numpy.ndarray, pint.Quantity)):
if isinstance(
xas_obj.spectra.data.flat[0][key], (numpy.ndarray, pint.Quantity)
):
# there is no processing for the _larch_grp_members case
if key == "_larch_grp_members":
continue
......@@ -163,7 +167,7 @@ class ROIProcess(Process):
new_spectrum[key] = slice
# set the spectra
xas_obj._setSpectra(spectra=new_spectra)
xas_obj.spectra = Spectra(energy=_energies[:, 0], spectra=new_spectra)
# update the dimensions
xas_obj.force_dims(dim1=roi.size[1], dim2=roi.size[0])
......
......@@ -98,8 +98,8 @@ class TestMeldWorkflow(unittest.TestCase):
# pymca ft
out = PyMca_ft()(xas_obj=out)
self.assertTrue(out.spectra[0].ft.intensity is not None)
self.assertTrue(len(out.spectra[0].ft.intensity) > 1)
self.assertTrue(out.spectra.data.flat[0].ft.intensity is not None)
self.assertTrue(len(out.spectra.data.flat[0].ft.intensity) > 1)
def test_workflow_preedge_exafs_xftf(self):
"""Test the following workflow:
......@@ -113,10 +113,11 @@ class TestMeldWorkflow(unittest.TestCase):
exafs.setConfiguration(self.configuration_exafs)
out = exafs(xas_obj=out)
# for now we cannot link xftf because chi is not set by pymca exafs
assert out.spectra[0]["EXAFSSignal"] is not None
assert out.spectra[0]["EXAFSKValues"] is not None
assert out.spectra[0].chi is not None
assert out.spectra[0].k is not None
spectrum_0 = out.spectra.data.flat[0]
self.assertFalse(spectrum_0["EXAFSSignal"] is None)
self.assertFalse(spectrum_0["EXAFSKValues"] is None)
self.assertFalse(spectrum_0.chi is None)
self.assertFalse(spectrum_0.k is None)
# larch xftf
out = Larch_xftf()(xas_obj=out)
......
......@@ -99,8 +99,10 @@ class TestSpectraDimensions(unittest.TestCase):
)
self.assertTrue(isinstance(xas_obj, XASObject))
self.assertTrue(xas_obj.n_spectrum == x_dim * y_dim)
numpy.testing.assert_array_equal(xas_obj.spectra[1].mu, spectra[1, 0, :])
numpy.testing.assert_array_equal(xas_obj.spectra[2].energy, channel)
numpy.testing.assert_array_equal(
xas_obj.spectra.data.flat[1].mu, spectra[1, 0, :]
)
numpy.testing.assert_array_equal(xas_obj.spectra.data.flat[2].energy, channel)
def testDimensionsChannelYX(self):
"""Test that spectra stored as Channel, Y, X can be read"""
......@@ -126,8 +128,12 @@ class TestSpectraDimensions(unittest.TestCase):
)
self.assertTrue(isinstance(xas_obj, XASObject))
self.assertTrue(xas_obj.n_spectrum == x_dim * y_dim)
numpy.testing.assert_array_equal(xas_obj.spectra[1].mu, spectra[:, 0, 1])
numpy.testing.assert_array_equal(xas_obj.spectra[2].energy, (channel * ur.eV).m)
numpy.testing.assert_array_equal(
xas_obj.spectra.data.flat[1].mu, spectra[:, 0, 1]
)
numpy.testing.assert_array_equal(
xas_obj.spectra.data.flat[2].energy, (channel * ur.eV).m
)
def suite():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment