k_weight.py 7.63 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
25
"""wrapper to pymca `k-weight` process"""
26
27
28

__authors__ = ["H. Payno"]
__license__ = "MIT"
29
__date__ = "05/10/2021"
30

31
import functools
32
import logging
33
34
import multiprocessing
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass, e2k
payno's avatar
payno committed
35
36
37
from est.core.process.process import Process
from est.core.process.pymca.exafs import process_spectr_exafs
from est.core.types import XASObject, Spectrum
38
39
40
41

_logger = logging.getLogger(__name__)


payno's avatar
payno committed
42
43
44
45
46
47
48
49
def process_spectr_k(
    spectrum,
    configuration,
    overwrite=True,
    callbacks=None,
    output=None,
    output_dict=None,
):
50
51
    """

52
53
54
55
56
57
58
    :param spectrum: spectrum to process
    :type: :class:`.Spectrum`
    :param configuration: configuration of the pymca normalization
    :type: dict
    :param overwrite: False if we want to return a new Spectrum instance
    :type: bool
    :param callback: callback to execute.
59
60
    :param output: list to store the result, needed for pool processing
    :type: multiprocessing.manager.list
61
62
63
    :param output_dict: key is input spectrum, value is index in the output
                        list.
    :type: dict
64
    :return: processed spectrum
65
66
    :rtype: tuple (configuration, spectrum)
    """
payno's avatar
payno committed
67
68
69
    _logger.debug(
        "start k weight definition on spectrum (%s, %s)" % (spectrum.x, spectrum.y)
    )
70
71
72
73
    assert spectrum is not None

    pymca_xas = XASClass()
    if spectrum.energy is None or spectrum.mu is None:
payno's avatar
payno committed
74
75
76
77
        raise ValueError(
            "Energy and or Mu is/are not specified, unable to " "compute exafs"
        )
    pymca_xas.setSpectrum(energy=spectrum.energy, mu=spectrum.mu)
78
    if configuration is not None:
payno's avatar
payno committed
79
        if "KWeight" in configuration and configuration["KWeight"] is not None:
payno's avatar
payno committed
80
81
            configuration["FT"]["KWeight"] = configuration["KWeight"]
            configuration["EXAFS"]["KWeight"] = configuration["KWeight"]
82
        pymca_xas.setConfiguration(configuration)
83
84
85
86
87
88

    if overwrite:
        spectrum_ = spectrum
    else:
        spectrum_ = Spectrum.from_dict(spectrum.to_dict())

payno's avatar
payno committed
89
    # we need to update EXAFSNormalized since we are overwriting it
payno's avatar
payno committed
90
91
92
    cf, exafs_res = process_spectr_exafs(
        spectrum=spectrum_, configuration=configuration
    )
93
    if exafs_res is None:
payno's avatar
payno committed
94
        err = "Failed to process exafs."
95
        if spectrum.x is not None or spectrum.y is not None:
payno's avatar
payno committed
96
97
98
99
100
            err = (
                err
                + "Spectrum (x, y) coords: "
                + ",".join((str(spectrum.x), str(spectrum.y)))
            )
101
        raise ValueError(err)
102

payno's avatar
payno committed
103
104
    # update EXAFSNormalized
    e0 = pymca_xas.calculateE0()
payno's avatar
payno committed
105

106
    kValues = e2k(spectrum_.energy - e0)
payno's avatar
payno committed
107
108
109
110
111
112
    exafs = exafs_res["EXAFSNormalized"]
    if "KWeight" in configuration and configuration["KWeight"] is not None:
        exafs *= pow(kValues, configuration["KWeight"])
    spectrum_["EXAFSNormalized"] = exafs
    configuration_ = pymca_xas.getConfiguration()

113
114
115
    if callbacks:
        for callback in callbacks:
            callback()
116
117
118
119

    if output is not None:
        assert output_dict is not None
        output[output_dict[spectrum]] = spectrum_
payno's avatar
payno committed
120
    return configuration_, spectrum_
121
122


123
def pymca_k_weight(xas_obj):
124
    """
payno's avatar
payno committed
125
    Set weight for exafs values
126

127
    :param xas_obj: object containing the configuration and spectra to process
payno's avatar
payno committed
128
    :type: Union[:class:`.XASObject`, dict]
129
    :return: spectra dict
payno's avatar
payno committed
130
    :rtype: :class:`.XASObject`
131
    """
payno's avatar
payno committed
132
133
    k_weight_obj = PyMca_k_weight(inputs={"xas_obj": xas_obj})
    return k_weight_obj.run()
134
135


136
class PyMca_k_weight(
137
138
139
140
141
    Process,
    name="k weight",
    input_names=["xas_obj"],
    output_names=["xas_obj"],
    optional_input_names=["k_weight"],
142
):
143
    def set_properties(self, properties):
payno's avatar
payno committed
144
        if "_kWeightSetting" in properties:
145
            _properties = properties.copy()
payno's avatar
payno committed
146
147
            _properties["k_weight"] = _properties["_kWeightSetting"]
            del _properties["_kWeightSetting"]
148
            self.setConfiguration(_properties)
149

150
    def run(self):
151
        """
payno's avatar
payno committed
152

153
        :param xas_obj: object containing the configuration and spectra to process
payno's avatar
payno committed
154
        :type: Union[:class:`.XASObject`, dict]
155
        :return: spectra dict
payno's avatar
payno committed
156
        :rtype: :class:`.XASObject`
157
        """
158
        xas_obj = self.inputs.xas_obj
159
        if xas_obj is None:
160
            raise ValueError("xas_obj should be provided")
161
        _xas_obj = self.getXasObject(xas_obj=xas_obj)
162
163
164
165

        if self.inputs.k_weight:
            self.setConfiguration({"k_weight": self.inputs.k_weight})
            _xas_obj.configuration["SET_KWEIGHT"] = self.inputs.k_weight
166

payno's avatar
payno committed
167
        if "SET_KWEIGHT" not in _xas_obj.configuration:
168
            _logger.warning(
payno's avatar
payno committed
169
170
171
172
                "Missing configuration to know which value we should set "
                "to k weight, will be set to 0 by default"
            )
            _xas_obj.configuration["SET_KWEIGHT"] = 0
173

payno's avatar
payno committed
174
        for key in ("FT", "EXAFS", "Normalization"):
175
176
177
            if key not in _xas_obj.configuration:
                _xas_obj.configuration[key] = {}

payno's avatar
payno committed
178
179
180
181
182
183
184
185
        _xas_obj.configuration["KWeight"] = _xas_obj.configuration["SET_KWEIGHT"]
        _xas_obj.configuration["FT"]["KWeight"] = _xas_obj.configuration["SET_KWEIGHT"]
        _xas_obj.configuration["EXAFS"]["KWeight"] = _xas_obj.configuration[
            "SET_KWEIGHT"
        ]
        _xas_obj.configuration["Normalization"]["KWeight"] = _xas_obj.configuration[
            "SET_KWEIGHT"
        ]
186

187
        self._advancement.reset(max_=_xas_obj.n_spectrum)
188
        self.progress = 0.0
189
        self._pool_process(xas_obj=_xas_obj)
190
        self.progress = 100.0
191
        self.register_process(xas_obj=_xas_obj, data_keys=tuple())
192
        self.outputs.xas_obj = _xas_obj.to_dict()
193
        return _xas_obj
194

195
196
    def _pool_process(self, xas_obj):
        """process normalization from a pool"""
197
        assert isinstance(xas_obj, XASObject)
198
199
200
201
202
203
204
205
206
        assert "KWeight" in xas_obj.configuration
        n_s = len(xas_obj.spectra.data.flat)
        for i_s, spectrum in enumerate(xas_obj.spectra.data.flat):
            process_spectr_k(
                spectrum=spectrum,
                configuration=xas_obj.configuration,
                callbacks=self.callbacks,
                overwrite=True,
            )
payno's avatar
payno committed
207
            assert "KWeight" in xas_obj.configuration
208
            self.progress = i_s / n_s * 100.0
209

210
211
212
213
214
    def definition(self):
        return "Define k weight for xas treatment"

    def program_version(self):
        import PyMca5
payno's avatar
payno committed
215

216
217
        return PyMca5.version()

218
219
    @staticmethod
    def program_name():
payno's avatar
payno committed
220
        return "pymca_k_weight"
221

222
    __call__ = run