Commit c3150517 authored by payno's avatar payno
Browse files

[core][larch] add larch xftf process

parent 0c8cf4cb
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "07/25/2019"
import os
import unittest
import tempfile
import shutil
import urllib.request
from xas.core.types import Spectrum, XASObject
try:
import larch
except ImportError:
has_larch = False
else:
has_larch = True
from xas.core.process.larch.xftf import larch_xftf, process_spectr_xftf
from xas.core.process.larch.autobk import process_spectr_autobk
from xas.io.utils.larch import read_ascii
@unittest.skipIf(has_larch is False, 'xraylarch not installed')
class TestLarchSpectrum(unittest.TestCase):
"""Make sure computation on one spectrum is valid"""
def setUp(self):
self.outputdir = tempfile.mkdtemp()
xmu_url = "https://raw.githubusercontent.com/xraypy/xraylarch/master/examples/xafs/cu_rt01.xmu"
self.data_file = os.path.join(self.outputdir, 'cu_rt01.xmu')
with urllib.request.urlopen(xmu_url) as response, open(self.data_file, 'wb') as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
assert os.path.exists(self.data_file)
energy, mu = read_ascii(self.data_file)
self.spectrum = Spectrum(energy=energy, mu=mu)
self.configuration = {'window': 'hanning', 'kweight': 2, 'kmin': 3,
'kmax': 13, 'dk': 1}
# for xftf we need to compute pre edge before
process_spectr_autobk(self.spectrum, configuration={}, overwrite=True)
assert hasattr(self.spectrum, 'k')
assert hasattr(self.spectrum, 'chi')
def tearDown(self):
shutil.rmtree(self.outputdir)
def testProcess(self):
process_spectr_xftf(self.spectrum, self.configuration, overwrite=True)
self.assertTrue(hasattr(self.spectrum, 'chir_re'))
self.assertTrue(hasattr(self.spectrum, 'chir_im'))
self.assertTrue(hasattr(self.spectrum, 'chir'))
self.assertTrue(hasattr(self.spectrum, 'chir_mag'))
@unittest.skipIf(has_larch is False, 'xraylarch not installed')
class TestLarchSpectra(unittest.TestCase):
"""Make sure computation on spectra is valid (n spectrum)"""
def setUp(self):
self.outputdir = tempfile.mkdtemp()
# Download test file from xraylarch
xmu_url = "https://raw.githubusercontent.com/xraypy/xraylarch/master/examples/xafs/cu_rt01.xmu"
self.data_file = os.path.join(self.outputdir, 'cu_rt01.xmu')
with urllib.request.urlopen(xmu_url) as response, open(self.data_file, 'wb') as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
assert os.path.exists(self.data_file)
# define the xas object
self.configuration = {'z': 29}
energy, mu = read_ascii(self.data_file)
spectrum = Spectrum(energy=energy, mu=mu)
process_spectr_autobk(spectrum, configuration={}, overwrite=True)
self.xas_object = XASObject(spectra=(spectrum,),
energy=energy, dim1=1, dim2=1,
configuration=self.configuration)
# for xftf we need to compute pre edge before
spectrum = self.xas_object.spectra[0]
assert hasattr(spectrum, 'k')
assert hasattr(spectrum, 'chi')
def tearDown(self):
shutil.rmtree(self.outputdir)
def testProcessXASObject(self):
res = larch_xftf(self.xas_object)
spectrum = res.spectra[0]
self.assertTrue(hasattr(spectrum, 'chir_re'))
self.assertTrue(hasattr(spectrum, 'chir_im'))
self.assertTrue(hasattr(spectrum, 'chir'))
self.assertTrue(hasattr(spectrum, 'chir_mag'))
def testProcessAsDict(self):
res = larch_xftf(self.xas_object.to_dict())
spectrum = res.spectra[0]
self.assertTrue(hasattr(spectrum, 'chir_re'))
self.assertTrue(hasattr(spectrum, 'chir_im'))
self.assertTrue(hasattr(spectrum, 'chir'))
self.assertTrue(hasattr(spectrum, 'chir_mag'))
def suite():
test_suite = unittest.TestSuite()
for ui in (TestLarchSpectrum, TestLarchSpectra):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest="suite")
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""wrapper to the larch mback process"""
from xas.core.types import Spectrum, XASObject
from xas.core.process.process import Process
from larch.xafs.xafsft import xftf
import multiprocessing
import functools
import logging
_logger = logging.getLogger(__name__)
_DEBUG = True
if _DEBUG:
from larch.symboltable import Group
def process_spectr_xftf(spectrum, configuration, overwrite=True, callback=None,
output=None, output_dict=None):
"""
:param :class:`.Spectrum` spectrum: spectrum to process
:param dict configuration: configuration of the pymca normalization
:param bool overwrite: False if we want to return a new Spectrum instance
:param function pointer callback: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param dict output_dict: key: input spectrum, value: index in the output
list.
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
assert isinstance(spectrum, Spectrum)
if not hasattr(spectrum, 'k') or not hasattr(spectrum, 'chi'):
_logger.error('k and/or chi is/are not specified, unable to compute '
'xftf. Maybe you need to run autobk process before ?')
return None, None
opts = {}
for opt_name in ('kmin', 'kmax', 'kweight', 'dk', 'dk2', 'with_phase',
'window', 'rmax_out', 'nfft', 'kstep'):
if opt_name in configuration:
opts[opt_name] = configuration[opt_name]
if _DEBUG is True:
assert isinstance(spectrum, Group)
if overwrite:
_spectrum = spectrum
else:
_spectrum = Spectrum().update(spectrum=spectrum)
res = xftf(_spectrum, **opts)
print('--------')
print(res)
return configuration, _spectrum
def larch_xftf(xas_obj):
"""
:param xas_obj: object containing the configuration and spectra to process
:type: Union[XASObject, dict]
:return: spectra dict
:rtype: XASObject
"""
mback_obj = Larch_xftf()
return mback_obj.process(xas_obj=xas_obj)
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with push workflow for now.
class Larch_xftf(Process):
def __init__(self):
Process.__init__(self, name='xftf')
def setProperties(self, properties):
if '_pymcaSettings' in properties:
self._settings = properties['_pymcaSettings']
def process(self, xas_obj):
_xas_obj = self.getXasObject(xas_obj=xas_obj)
if self._settings:
_xas_obj.configuration['xftf'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(_xas_obj,
data_keys=('chi', 'chir', 'chir_mag', 'k', 'r'))
return _xas_obj
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_xftf(spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_xftf,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=False,
output=res_list,
output_dict=output_dict)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
def definition(self):
return "xftf calculation"
def program_version(self):
import larch.version
return larch.version.version_data()['larch']
def program_name(self):
return 'larch_xftf'
__call__ = process
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment