Commit 6f4bb157 authored by Henri Payno's avatar Henri Payno
Browse files

[reprocessing] test reprocessing for larch process

parent 95b04c7a
Pipeline #13393 failed with stage
in 4 minutes and 41 seconds
......@@ -35,6 +35,8 @@ import numpy
from xas.core.io import XASWriter
from xas.core.types import XASObject
from xas.core.utils import spectra as spectra_utils
import urllib.request
from xas.core.types import Spectrum
from ..reprocessing import exec_
try:
import PyMca5
......@@ -46,16 +48,30 @@ else:
from xas.core.process.pymca.ft import PyMca_ft
from xas.core.process.pymca.k_weight import PyMca_k_weight
from xas.core.process.pymca.normalization import PyMca_normalization
from xas.io.utils.pymca import read_spectrum
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
try:
import larch
except ImportError:
has_larch = False
else:
has_larch = True
from xas.core.process.larch.pre_edge import Larch_pre_edge
from xas.core.process.larch.xftf import Larch_xftf
from xas.core.process.larch.autobk import Larch_autobk
from xas.io.utils.larch import read_ascii
@unittest.skipIf(has_pymca is False, 'PyMca5 is not installed')
class TestReprocessing(unittest.TestCase):
"""test construction of XAS object"""
class TestReprocessingPyMca(unittest.TestCase):
"""test reprocessing work for some pymca process"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
spectra = spectra_utils.create_dataset(shape=(10, 4, 4))
energy = numpy.linspace(start=3.26, stop=3.96, num=10)
self.xas_obj_ref = XASObject(spectra=spectra, energy=energy)
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
energy, mu = read_spectrum(data_file)
self.spectrum = Spectrum(energy=energy, mu=mu)
self.xas_obj_ref = XASObject(spectra=(self.spectrum, ), energy=energy,
dim1=1, dim2=1)
self.h5_file = os.path.join(self.output_dir, 'output_file.h5')
out = PyMca_normalization()(xas_obj=self.xas_obj_ref)
out = PyMca_exafs()(xas_obj=out)
......@@ -67,6 +83,7 @@ class TestReprocessing(unittest.TestCase):
writer.output_file = self.h5_file
writer(out)
assert out.spectra[0].ft.intensity is not None
assert len(out.get_process_flow()) is 5
def tearDown(self):
shutil.rmtree(self.output_dir)
......@@ -75,12 +92,59 @@ class TestReprocessing(unittest.TestCase):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(res_xas_obj.spectra[0].ft.intensity,
self.xas_obj_ref.spectra[0].ft.intensity)
self.xas_obj_ref.spectra[0].ft.intensity)
@unittest.skipIf(has_larch is False, 'Larch is not installed')
class TestReprocessingLarch(unittest.TestCase):
"""test reprocessing work for some larch process"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
xmu_url = "https://raw.githubusercontent.com/xraypy/xraylarch/master/examples/xafs/cu_rt01.xmu"
self.data_file = os.path.join(self.output_dir, 'cu_rt01.xmu')
with urllib.request.urlopen(xmu_url) as response, open(self.data_file, 'wb') as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
assert os.path.exists(self.data_file)
energy, mu = read_ascii(self.data_file)
self.spectrum = Spectrum(energy=energy, mu=mu)
spectra = (self.spectrum, )
self.xas_obj_ref = XASObject(spectra=spectra, energy=energy, dim1=1,
dim2=1)
self.h5_file = os.path.join(self.output_dir, 'output_file.h5')
pre_edge_process = Larch_pre_edge()
pre_edge_process.setProperties({'rbkg': 1.0, 'kweight': 2})
autobk_process = Larch_autobk()
autobk_process.setProperties({'kmin': 2, 'kmax': 16, 'dk': 3,
'window': 'hanning', 'kweight': 2})
xftf_process = Larch_xftf()
xftf_process.setProperties({'kweight': 2})
out = pre_edge_process(xas_obj=self.xas_obj_ref)
out = autobk_process(xas_obj=out)
out = xftf_process(xas_obj=out)
writer = XASWriter()
writer.output_file = self.h5_file
writer(out)
assert len(out.get_process_flow()) is 3
def tearDown(self):
shutil.rmtree(self.output_dir)
def test(self):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(res_xas_obj.spectra[0].chir_mag,
self.xas_obj_ref.spectra[0].chir_mag)
def suite():
test_suite = unittest.TestSuite()
for ui in (TestReprocessing, ):
for ui in (TestReprocessingPyMca, TestReprocessingLarch):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
......
......@@ -118,6 +118,8 @@ class Process(object):
relative_to = 'energy'
elif key in ('chir_re', 'chir_im', 'chir_mag', 'r'):
relative_to = 'r'
elif key in ('ft.radius', 'ft.intensity', 'ft.imaginary'):
relative_to = 'ft.radius'
try:
_data[key] = XASObject._spectra_volume(xas_obj.spectra,
key=key,
......
......@@ -158,9 +158,12 @@ class PyMca_ft(Process):
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(_xas_obj, data_keys=("ft.FTRadius",
"ft.FTIntensity",
"ft.FTImaginary"))
assert hasattr(_xas_obj.spectra[0], 'ft')
assert hasattr(_xas_obj.spectra[0].ft, 'intensity')
assert hasattr(_xas_obj.spectra[0].ft, 'imaginary')
self.register_process(_xas_obj, data_keys=("ft.radius",
"ft.intensity",
"ft.imaginary"))
return _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -225,29 +225,40 @@ class XASObject(object):
def _spectra_volume(spectra, key, dim_1, dim_2, relative_to='energy'):
"""Convert a list of spectra (mu) to a numpy array.
..note: only convert raw data for now"""
def get_param(object, name):
if hasattr(object, name):
return getattr(object, name)
else:
return object[name]
def get_relative_to_value(relative):
paths_ = relative_to.split('.')
value = getattr(spectra[0], paths_[0])
for path_ in paths_[1:]:
value = getattr(value, path_)
return value
if len(spectra) is 0:
return None
else:
assert len(spectra) == dim_1 * dim_2
if relative_to:
array = numpy.zeros((len(getattr(spectra[0], relative_to)),
array = numpy.zeros((len(get_relative_to_value(relative_to)),
dim_1 * dim_2))
else:
array = numpy.zeros(dim_1 * dim_2)
for i_spectrum, spectrum in enumerate(spectra):
try:
if '.' in key:
subkeys, key_ = key.split('.')
value = spectrum[subkeys[0]]
subkeys = key.split('.')
key_ = subkeys[-1]
subkeys = subkeys[:-1]
value = get_param(spectrum, subkeys[0])
for subkey in subkeys[1:]:
value = value[subkey]
value = getattr(value, key_)
value = get_param(value, subkey)
value = get_param(value, key_)
else:
if key in spectrum.keys():
value = spectrum[key]
else:
value = getattr(spectrum, key)
value = get_param(spectrum, key)
except:
_logger.error('fail to access to', key)
else:
......@@ -258,9 +269,8 @@ class XASObject(object):
array[:, i_spectrum] = value
else:
array[i_spectrum] = value
if relative_to is not None:
return array.reshape((len(getattr(spectra[0], relative_to)),
return array.reshape((len(get_relative_to_value(relative_to)),
dim_1, dim_2))
else:
return array.reshape(dim_1, dim_2)
......@@ -797,18 +807,18 @@ class _FT(object):
_INTENSITY_KEY = 'FTIntensity'
_IMAGINERY_KEY = 'FTImaginary'
_IMAGINARY_KEY = 'FTImaginary'
def __init__(self, ddict):
self.__radius = None
self.__intensity = None
self.__imaginery = None
self.__imaginary = None
self.__other_parameters = {}
self.__key_mapper = {
self._RADIUS_KEY: self.__class__.radius,
self._INTENSITY_KEY: self.__class__.intensity,
self._IMAGINERY_KEY: self.__class__.imaginery,
self._IMAGINARY_KEY: self.__class__.imaginary,
}
if ddict is not None:
......@@ -832,12 +842,12 @@ class _FT(object):
self.__intensity = intensity
@property
def imaginery(self):
return self.__imaginery
def imaginary(self):
return self.__imaginary
@imaginery.setter
def imaginery(self, imaginery):
self.__imaginery = imaginery
@imaginary.setter
def imaginary(self, imaginery):
self.__imaginary = imaginery
def __getitem__(self, key):
"""Need for pymca compatibility"""
......@@ -860,7 +870,7 @@ class _FT(object):
res = {
self._RADIUS_KEY: self.radius,
self._INTENSITY_KEY: self.intensity,
self._IMAGINERY_KEY: self.imaginery,
self._IMAGINARY_KEY: self.imaginary,
}
res.update(self.__other_parameters)
return res
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment