Commit a85b5a76 authored by payno's avatar payno
Browse files

[test] add test for spectra.

- rename '_volurm_for' to 'map_to'. Remove dim1 and dim2 parameter: now useless because we can dircetly get them for the spectra class
parent 9dc6f969
Pipeline #39091 failed with stages
in 9 minutes and 30 seconds
......@@ -200,7 +200,7 @@ class XASWriter(object):
)
self._output_file = input()
_logger.info(("dump xas obj to", self._output_file))
_logger.info("dump xas obj to {}".format(self._output_file))
# write raw data
write_xas(
......
......@@ -171,8 +171,6 @@ class Process(object):
_data[key] = XASObject._spectra_volume(
xas_obj.spectra,
key=key,
dim_1=xas_obj.dim1,
dim_2=xas_obj.dim2,
relative_to=relative_to,
)
except KeyError:
......
......@@ -134,8 +134,6 @@ class ROIProcess(Process):
volume = xas_obj._spectra_volume(
spectra=xas_obj.spectra,
key=key,
dim_1=xas_obj.dim1,
dim_2=xas_obj.dim2,
)
volume_res = volume[:, ymin:ymax, xmin:xmax]
......
......@@ -69,10 +69,8 @@ class TestRoi(unittest.TestCase):
original_spectra = self.xas_obj._spectra_volume(
spectra=self.xas_obj.spectra,
key="mu",
dim_1=self.xas_obj.dim1,
dim_2=self.xas_obj.dim2,
).copy()
assert original_spectra.shape == (16, 100, 30)
self.assertEqual(original_spectra.shape, (16, 100, 30))
roi_dict = {"origin": (20, 50), "size": (10, 20)}
self.xas_obj.configuration = {"roi": roi_dict}
res_xas_obj = xas_roi(self.xas_obj)
......@@ -80,8 +78,6 @@ class TestRoi(unittest.TestCase):
reduces_spectra = res_xas_obj._spectra_volume(
spectra=res_xas_obj.spectra,
key="mu",
dim_1=res_xas_obj.dim1,
dim_2=res_xas_obj.dim2,
).copy()
assert reduces_spectra.shape == (16, 20, 10)
numpy.testing.assert_array_equal(
......
......@@ -35,16 +35,22 @@ _logger = logging.getLogger(__name__)
def suite():
test_suite = unittest.TestSuite()
from est.core.types import test as test_types
test_suite.addTest(test_types.suite())
from est.core.process import test as test_process
test_suite.addTest(test_process.suite())
from .test_types import suite as test_types_suite
test_suite.addTest(test_types_suite())
from .test_io import suite as test_io_suite
test_suite.addTest(test_io_suite())
return test_suite
......@@ -139,7 +139,8 @@ class TestXASObject(unittest.TestCase):
obj2 = XASObject.from_dict(ddict)
self.assertEqual(self.xas_obj.n_spectrum, obj2.n_spectrum)
obj2_mu_spectra = obj2._spectra_volume(
spectra=obj2.spectra, key="mu", dim_1=obj2.dim1, dim_2=obj2.dim2
spectra=obj2.spectra,
key="mu",
)
numpy.testing.assert_array_equal(original_spectra, obj2_mu_spectra)
......
......@@ -131,11 +131,11 @@ class Spectra:
self.__energy = None
else:
self.__energy = convert_unit_to(energy, ur.eV)
if len(self) > 0:
if len(self.data) > 0:
if len(self.data.flat[0].energy) != len(energy):
_logger.warning("spectra and energy have incoherent dimension")
def map_to(self, key: str, dim_1: int, dim_2: int, relative_to: str = "energy"):
def map_to(self, key: str, relative_to: str = "energy"):
def get_param(object, name):
if hasattr(object, name):
return getattr(object, name)
......@@ -149,17 +149,16 @@ class Spectra:
value = getattr(value, path_)
return value
if len(self.data) == 0:
if len(self.data.flat) == 0:
return None
else:
assert len(self.data.flat) == dim_1 * dim_2
if relative_to:
assert get_relative_to_value(relative_to) is not None
array = numpy.zeros(
(len(get_relative_to_value(relative_to)), dim_1 * dim_2)
(len(get_relative_to_value(relative_to)), len(self.data.flat))
)
else:
array = numpy.zeros(dim_1 * dim_2)
array = numpy.zeros(len(self.data.flat))
for i_spectrum, spectrum in enumerate(self.data.flat):
try:
if "." in key:
......@@ -184,29 +183,23 @@ class Spectra:
array[:, i_spectrum] = value
else:
array[i_spectrum] = value
if relative_to is not None:
return array.reshape(
(len(get_relative_to_value(relative_to)), dim_1, dim_2)
)
else:
return array.reshape(dim_1, dim_2)
shape = list(self.data.shape)
shape.insert(0, -1)
return array.reshape(shape)
def keys(self):
if len(self) > 0:
if len(self.data.flat) > 0:
assert isinstance(self.data.flat[0], Spectrum)
return tuple(self.data.flat[0].keys())
def __eq__(self, other):
if not isinstance(other, Spectra):
return False
if len(self) != len(other):
if len(self.data) != len(other):
return False
else:
return numpy.array_equal(self.data.flat, other.data.flat)
def __len__(self):
return len(self.data)
def __iter__(self):
return iter(self.data.flat)
......
# coding: utf-8
# /*##########################################################################
# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ############################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "07/25/2019"
import unittest
from .test_spectra import suite as test_spectra_suite
from .test_spectrum import suite as test_spectrum_suite
from .test_xas_object import suite as test_xas_object_suite
def suite():
test_suite = unittest.TestSuite()
test_suite.addTest(test_spectra_suite())
test_suite.addTest(test_spectrum_suite())
test_suite.addTest(test_xas_object_suite())
return test_suite
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/26/2019"
import tempfile
import os
import unittest
from est.core.types import Spectrum
from est.core.types import Dim
from est.core.utils import spectra as spectra_utils
from est.core.types import XASObject
import shutil
try:
import PyMca5
except ImportError:
has_pymca = False
else:
has_pymca = True
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
from est.io.utils.pymca import read_spectrum
class TestSpectra(unittest.TestCase):
"""Test the spectrum class"""
def setUp(self) -> None:
self._tmp_dir = tempfile.mkdtemp()
def tearDown(self) -> None:
shutil.rmtree(self._tmp_dir)
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
def test_from_dat(self):
"""check that we can create a Spectrum from a pymca .dat file"""
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
energy, mu = read_spectrum(
data_file, dimensions=(Dim.CHANNEL_ENERGY_DIM, Dim.X_DIM, Dim.Y_DIM)
)
spectrum = Spectrum(energy=energy, mu=mu)
self.assertTrue(spectrum.energy is not None)
self.assertTrue(spectrum.mu is not None)
def test_from_mock(self):
"""check that we can create a Spectrum from numpy arrays"""
self.output_dir = tempfile.mkdtemp()
energy, spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
self.xas_obj = XASObject(spectra=spectra, energy=energy, dim1=20, dim2=10)
spectra = self.xas_obj.spectra
self.assertEqual(self.xas_obj.n_spectrum, 20 * 10)
self.assertEqual(self.xas_obj.n_spectrum, 20 * 10)
spectra.keys()
self.assertTrue(spectra.data.flat[0] == spectra[0, 0])
spectra.map_to("mu")
def suite():
test_suite = unittest.TestSuite()
for ui in (TestSpectra,):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/26/2019"
import numpy
import os
import unittest
from est.core.types import Spectrum
from est.units import ur
from est.core.types import Dim
try:
import PyMca5
except ImportError:
has_pymca = False
else:
has_pymca = True
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
from est.io.utils.pymca import read_spectrum
class TestSpectrum(unittest.TestCase):
"""Test the spectrum class"""
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
def test_from_dat(self):
"""check that we can create a Spectrum from a pymca .dat file"""
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
energy, mu = read_spectrum(
data_file, dimensions=(Dim.CHANNEL_ENERGY_DIM, Dim.X_DIM, Dim.Y_DIM)
)
spectrum = Spectrum(energy=energy, mu=mu)
self.assertTrue(spectrum.energy is not None)
self.assertTrue(spectrum.mu is not None)
def test_from_numpy_array(self):
"""check that we can create a Spectrum from numpy arrays"""
energy = numpy.arange(10, 20)
mu = numpy.arange(10)
spectrum = Spectrum(energy=energy, mu=mu)
numpy.testing.assert_array_equal(spectrum.energy, (energy * ur.eV).m)
numpy.testing.assert_array_equal(spectrum.mu, mu)
mu_2 = numpy.arange(30, 40)
spectrum["Mu"] = mu_2
numpy.testing.assert_array_equal(spectrum.mu, mu_2)
def suite():
test_suite = unittest.TestSuite()
for ui in (TestSpectrum,):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/26/2019"
import numpy
import os
import unittest
import tempfile
import h5py
import shutil
from est.core.types import Spectrum, XASObject
from est.core.utils import spectra as spectra_utils
from est.core.io import read as read_xas
from silx.io.url import DataUrl
import json
import pint
import silx.io.utils
from est.core.types import Dim
try:
import PyMca5
except ImportError:
has_pymca = False
else:
has_pymca = True
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
from est.io.utils.pymca import read_spectrum
class TestXASObject(unittest.TestCase):
"""test construction of XAS object from a single spectra"""
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
def test_create_from_single_spectrum(self):
"""check that we can create a XASObject from a pymca .dat file"""
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
spectrum = {}
spectrum["Energy"], spectrum["Mu"] = read_spectrum(
data_file, dimensions=(Dim.X_DIM, Dim.Y_DIM)
)
self.spectrum = Spectrum(energy=spectrum["Energy"], mu=spectrum["Mu"])
self.configuration = {
"FT": {"KWeight": 1},
"EXAFS": {"EXAFSNormalized": numpy.array([1, 2, 3])},
}
obj = XASObject(
spectra=(self.spectrum,),
energy=self.spectrum.energy,
configuration=self.configuration,
dim1=1,
dim2=1,
)
self.assertEqual(obj.n_spectrum, 1)
ddict = obj.to_dict()
obj2 = XASObject.from_dict(ddict)
self.assertEqual(obj2, obj)
# insure the XASObject is serializable
# import json
# json.dumps(obj2.to_dict())
def test_create_from_several_spectrums(self):
"""check that we can create a XASObject from numpy arrays"""
self.energy, self.spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
self.output_dir = tempfile.mkdtemp()
spectra_path = "/data/NXdata/data"
channel_path = "/data/NXdata/Channel"
filename = os.path.join(self.output_dir, "myfile.h5")
with h5py.File(filename, "a") as f:
f[spectra_path] = self.spectra
f[channel_path] = self.energy
url_spectra = DataUrl(file_path=filename, data_path=spectra_path, scheme="silx")
self.xas_obj = read_xas(
spectra_url=url_spectra,
channel_url=DataUrl(
file_path=filename, data_path=channel_path, scheme="silx"
),
dimensions=(Dim.CHANNEL_ENERGY_DIM, Dim.Y_DIM, Dim.X_DIM),
)
self.assertEqual(self.xas_obj.dim1, 20)
self.assertEqual(self.xas_obj.dim2, 10)
self.assertEqual(self.xas_obj.n_spectrum, 20 * 10)
ddict = self.xas_obj.to_dict(with_process_details=False)
original_spectra = silx.io.utils.get_data(
DataUrl(file_path=filename, data_path=spectra_path, scheme="silx")
)
obj2 = XASObject.from_dict(ddict)
self.assertEqual(self.xas_obj.n_spectrum, obj2.n_spectrum)
obj2_mu_spectra = obj2._spectra_volume(
spectra=obj2.spectra,
key="mu",
)
numpy.testing.assert_array_equal(original_spectra, obj2_mu_spectra)
self.assertEqual(obj2, self.xas_obj)
class TestXASObjectSerialization(unittest.TestCase):
def setUp(self) -> None:
self.energy, self.spectra = spectra_utils.create_dataset(shape=(256, 20, 10))
self.output_dir = tempfile.mkdtemp()
self.spectra_path = "/data/NXdata/data"
self.channel_path = "/data/NXdata/Channel"
self.filename = os.path.join(self.output_dir, "myfile.h5")
with h5py.File(self.filename, "a") as f:
f[self.spectra_path] = self.spectra
f[self.channel_path] = self.energy
self.dimensions = (Dim.CHANNEL_ENERGY_DIM, Dim.Y_DIM, Dim.X_DIM)
self.url_spectra = DataUrl(
file_path=self.filename, data_path=self.spectra_path, scheme="silx"
)
self.url_energy = DataUrl(
file_path=self.filename, data_path=self.channel_path, scheme="silx"
)
self.process_flow_file = os.path.join(self.output_dir, "process_flow.h5")
def tearDown(self) -> None:
shutil.rmtree(self.output_dir)
def test_serialization_url_auto(self):
"""Make sure the `to_dict` and `from_dict` functions are working
if no url are provided"""
xas_obj = XASObject(
spectra=self.spectra,
energy=self.energy,
dim1=20,
dim2=10,
keep_process_flow=False,
)
# if no h5 file defined, should fail to copy it to a dictionary
with self.assertRaises(ValueError):
xas_obj.to_dict()
xas_obj.link_to_h5(self.process_flow_file)
dict_xas_obj = xas_obj.to_dict()
# make sure it is serializable
json.dumps(dict_xas_obj)
# make sure we find a comparable xas object from it
xas_obj_2 = XASObject.from_dict(dict_xas_obj)
numpy.testing.assert_array_equal(xas_obj.energy.m, xas_obj_2.energy.m)
self.assertEqual(xas_obj, xas_obj_2)
# simple test without the process_details
dict_xas_obj = xas_obj.to_dict(with_process_details=False)
# make sure it is serializable
json.dumps(dict_xas_obj)
def test_serialization_url_provided(self):
"""Make sure the `to_dict` and `from_dict` functions are working
if url are provided"""
xas_obj = XASObject(
spectra=self.spectra,
energy=self.energy,
dim1=20,
dim2=10,
keep_process_flow=False,
energy_url=self.url_energy,
spectra_url=self.spectra_path,
)
self.assertTrue(isinstance(xas_obj.energy, pint.Quantity))
# if no h5 file defined, should fail to copy it to a dictionary
with self.assertRaises(ValueError):
xas_obj.to_dict()
xas_obj.link_to_h5(self.process_flow_file)
dict_xas_obj = xas_obj.to_dict()
# make sure it is serializable
json.dumps(dict_xas_obj)
# make sure we find a comparable xas object from it
xas_obj_2 = XASObject.from_dict(dict_xas_obj)
self.assertTrue(isinstance(xas_obj_2.energy, pint.Quantity))
self.assertEqual(xas_obj.energy.units, xas_obj_2.energy.units)
numpy.testing.assert_array_equal(xas_obj.energy.m, xas_obj_2.energy.m)
self.assertEqual(xas_obj, xas_obj_2)
# simple test without the process_details
dict_xas_obj = xas_obj.to_dict(with_process_details=False)
# make sure it is serializable
json.dumps(dict_xas_obj)
def suite():
test_suite = unittest.TestSuite()
for ui in (
TestXASObject,
TestXASObjectSerialization,
):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == "__main__":