Commit a32b41b1 authored by payno's avatar payno
Browse files

[core][process] main rework of the processes.

- now process can take an output and an output_dict for storing the result in a multiprocessing.Proxy object
- rework process action to avoid several call to pymca `processSpectrum` function
- add an option to XasObject `to_dict` function to simply store raw data (which is the reference), but also to keep processing results to be used later.
This is need for the 'non gui' workflow processing which uses a dictionnary instead of a XasObject.
parent d3650a58
Pipeline #12520 passed with stage
in 4 minutes and 18 seconds
......@@ -30,58 +30,83 @@ __date__ = "06/11/2019"
from xas.core.types import XASObject, Spectrum
from .process import Process
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from PyMca5.PyMcaPhysics.xas.XASClass import e2k
from PyMca5.PyMcaPhysics.xas.XASClass import e2k, k2e
import multiprocessing
import functools
import numpy
import logging
_logger = logging.getLogger(__name__)
def process_spectr_exafs(spectrum, configuration, overwrite=True):
def process_spectr_exafs(spectrum, configuration, overwrite=True, callback=None,
output=None, output_dict=None):
"""
:param :class:`.Spectrum` spectrum: spectrum to process
:param dict configuration: configuration of the pymca normalization
:param bool overwrite: False if we want to return a new Spectrum instance
:return:
:param function pointer callback: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param dict output_dict: key: input spectrum, value: index in the output
list.
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
if spectrum.energy is None or spectrum.mu is None:
_logger.error('Energy and or Mu is/are not specified, unable to '
'compute exafs')
return None, None
pymca_xas = XASClass()
pymca_xas.setSpectrum(energy=spectrum.energy,
mu=spectrum.mu)
if configuration is not None:
pymca_xas.setConfiguration(configuration)
assert 'NormalizedBackground' in spectrum
pymca_xas.setConfiguration(configuration)
pymca_xas.processSpectrum()
# try:
# pymca_xas.processSpectrum()
# except (IndexError, ValueError) as e:
# _logger.error(e)
# return None, None
if 'Energy' not in spectrum or 'Mu' not in spectrum:
_logger.error('Energy and or Mu is/are not specified, unable to '
'compute exafs')
if 'NormalizedBackground' not in spectrum:
_logger.warning('spectrum has not been normalized, will not process exafs')
return None, None
if 'EXAFSKValues' in spectrum and 'Mu' in spectrum:
kValues = spectrum['EXAFSKValues']
else:
if 'Mu' not in spectrum:
print('********************** computing **********************')
if 'Edge' not in spectrum:
spectrum.update(pymca_xas.processSpectrum())
e0 = spectrum['Edge']
energy = spectrum['Energy']
kValues = e2k(energy - e0)
spectrum['EXAFSKValues'] = kValues
mu = spectrum.mu
if overwrite:
spectrum['EXAFS'] = pymca_xas.postEdge(k=kValues, mu=mu)
return configuration, spectrum
else:
new_spectrum = Spectrum(energy=spectrum.energy, mu=energy.mu)
return configuration, new_spectrum
e0 = pymca_xas.calculateE0()
ddict = spectrum.to_dict()
ddict["Energy"] = pymca_xas._energy
ddict["Mu"] = pymca_xas._mu
cleanMu = pymca_xas._mu - ddict["NormalizedBackground"]
kValues = e2k(pymca_xas._energy - e0)
ddict.update(pymca_xas.postEdge(kValues, cleanMu))
dataSet = numpy.zeros((cleanMu.size, 2), numpy.float)
dataSet[:, 0] = kValues
dataSet[:, 1] = cleanMu
# exafs normalization
exafs = (cleanMu - ddict["PostEdgeB"]) / ddict["PostEdgeB"]
ddict["EXAFSEnergy"] = k2e(kValues)
ddict["EXAFSKValues"] = kValues
ddict["EXAFSSignal"] = cleanMu
if ddict["KWeight"]:
exafs *= pow(kValues, ddict["KWeight"])
ddict["EXAFSNormalized"] = exafs
if callback:
callback()
res_spectrum = Spectrum.from_dict(ddict=ddict)
def get_output(orignal_spec, res_spec):
if overwrite:
orignal_spec.update(res_spec)
return orignal_spec
else:
return res_spec
if output is not None:
assert output_dict is not None
output[output_dict[spectrum]] = get_output(spectrum, res_spectrum)
return configuration, get_output(spectrum, res_spectrum)
def pymca_exafs(xas_obj):
......@@ -96,6 +121,10 @@ def pymca_exafs(xas_obj):
return exafs_obj.process(xas_obj=xas_obj)
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with push workflow for now.
class PyMca_exafs(Process):
"""Process spectra for exafs and get information about the processing
advancement"""
......@@ -108,23 +137,46 @@ class PyMca_exafs(Process):
self._settings = properties['_pymcaSettings']
def process(self, xas_obj):
_xas_obj = self._getXasObject(xas_obj=xas_obj)
_xas_obj = self.getXasObject(xas_obj=xas_obj)
if self._settings:
_xas_obj.configuration['EXAFS'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=xas_obj)
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
return _xas_obj
def _pool_process(self, xas_obj):
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_exafs,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
p.map(partial_, xas_obj.spectra)
xas_obj.configuration = xas_obj.spectra[0].configuration
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
assert 'NormalizedBackground' in spectrum
process_spectr_exafs(spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
assert 'EXAFSKValues' in spectrum
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_exafs,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=False,
output=res_list,
output_dict=output_dict)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
__call__ = process
......@@ -32,18 +32,25 @@ from xas.core.types import XASObject, Spectrum
from .process import Process
import functools
import multiprocessing
import numpy
import logging
_logger = logging.getLogger(__name__)
def process_spectr_ft(spectrum, configuration, overwrite=True):
def process_spectr_ft(spectrum, configuration, overwrite=True, callback=None,
output=None, output_dict=None):
"""
:param :class:`.Spectrum` spectrum: spectrum to process
:param dict configuration: configuration of the pymca normalization
:param bool overwrite: False if we want to return a new Spectrum instance
:return:
:param function pointer callback: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param dict output_dict: key: input spectrum, value: index in the output
list.
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
pymca_xas = XASClass()
......@@ -51,15 +58,52 @@ def process_spectr_ft(spectrum, configuration, overwrite=True):
_logger.error('Energy and or Mu is/are not specified, unable to '
'compute exafs')
return None, None
pymca_xas.setConfiguration(configuration)
if configuration is not None:
pymca_xas.setConfiguration(configuration)
pymca_xas.setSpectrum(energy=spectrum.energy,
mu=spectrum.mu)
spectrum_ = Spectrum.from_dict(pymca_xas.processSpectrum())
if 'EXAFSSignal' not in spectrum:
_logger.warning('exafs has not been processed yet, unable to process'
'fourier transform')
return None, None
cleanMu = spectrum['EXAFSSignal']
kValues = spectrum['EXAFSKValues']
exafs = (cleanMu - spectrum["PostEdgeB"]) / spectrum["PostEdgeB"]
if spectrum["KWeight"]:
exafs *= pow(kValues, spectrum["KWeight"])
dataSet = numpy.zeros((cleanMu.size, 2), numpy.float)
dataSet[:, 0] = kValues
dataSet[:, 1] = cleanMu
set2 = dataSet.copy()
set2[:, 1] = exafs
# remove points with k<2
goodi = (set2[:, 0] >= spectrum["KMin"]) & (set2[:, 0] <= spectrum["KMax"])
set2 = set2[goodi, :]
ft = pymca_xas.fourierTransform(set2[:,0], set2[:, 1], kMin=spectrum["KMin"], kMax=spectrum["KMax"])
if callback:
callback()
if overwrite:
spectrum.load_frm_dict(spectrum_.to_dict())
return configuration, spectrum
spectrum_ = Spectrum()
spectrum_.update(spectrum)
else:
return configuration, spectrum_
spectrum_ = spectrum
assert spectrum_
spectrum_.ft = ft
if output is not None:
assert output_dict is not None
output[output_dict[spectrum]] = spectrum_
return configuration, spectrum_
def pymca_ft(xas_obj):
......@@ -71,9 +115,14 @@ def pymca_ft(xas_obj):
:rtype: dict
"""
ft_obj = PyMca_ft()
assert xas_obj is not None
return ft_obj.process(xas_obj=xas_obj)
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with push workflow for now.
class PyMca_ft(Process):
def __init__(self):
Process.__init__(self, name='ft')
......@@ -91,23 +140,43 @@ class PyMca_ft(Process):
:return: spectra dict
:rtype: dict
"""
_xas_obj = self._getXasObject(xas_obj=xas_obj)
_xas_obj = self.getXasObject(xas_obj=xas_obj)
if self._settings:
_xas_obj.configuration['FT'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=xas_obj)
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
return _xas_obj
def _pool_process(self, xas_obj):
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_ft,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
p.map(partial_, xas_obj.spectra)
xas_obj.configuration = xas_obj.spectra[0].configuration
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_ft(spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(1) as p:
partial_ = functools.partial(process_spectr_ft,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=False,
output=res_list,
output_dict=output_dict)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
__call__ = process
......@@ -37,13 +37,19 @@ import logging
_logger = logging.getLogger(__name__)
def process_spectr_k(spectrum, configuration, overwrite=True):
def process_spectr_k(spectrum, configuration, overwrite=True, callback=None,
output=None, output_dict=None):
"""
:param :class:`.Spectrum` spectrum: spectrum to process
:param dict configuration: configuration of the pymca normalization
:param bool overwrite: False if we want to return a new Spectrum instance
:return:
:param function pointer callback: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param dict output_dict: key: input spectrum, value: index in the output
list.
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
assert spectrum is not None
......@@ -54,13 +60,20 @@ def process_spectr_k(spectrum, configuration, overwrite=True):
'compute exafs')
pymca_xas.setSpectrum(energy=spectrum.energy,
mu=spectrum.mu)
pymca_xas.setConfiguration(configuration)
if configuration is not None:
pymca_xas.setConfiguration(configuration)
spectrum_ = Spectrum.from_dict(pymca_xas.processSpectrum())
if callback:
callback()
if overwrite:
spectrum.load_frm_dict(spectrum_.to_dict())
return configuration, spectrum
else:
return configuration, spectrum_
spectrum_ = spectrum
if output is not None:
assert output_dict is not None
output[output_dict[spectrum]] = spectrum_
return configuration, spectrum_
def pymca_k_weight(xas_obj):
......@@ -75,6 +88,10 @@ def pymca_k_weight(xas_obj):
return k_weight_obj.process(xas_obj=xas_obj)
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with push workflow for now.
class PyMca_k_weight(Process):
def __init__(self):
Process.__init__(self, name='k weight')
......@@ -93,7 +110,7 @@ class PyMca_k_weight(Process):
:rtype: dict
"""
assert xas_obj is not None
_xas_obj = self._getXasObject(xas_obj=xas_obj)
_xas_obj = self.getXasObject(xas_obj=xas_obj)
if self._k_weight:
_xas_obj.configuration['SET_KWEIGHT'] = self._k_weight
......@@ -119,19 +136,25 @@ class PyMca_k_weight(Process):
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=xas_obj)
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
return _xas_obj
def _pool_process(self, xas_obj):
"""process normalization from a pool"""
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_k,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
p.map(partial_, xas_obj.spectra)
xas_obj.configuration = xas_obj.spectra[0].configuration
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_k(spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
else:
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_k,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
p.map(partial_, xas_obj.spectra)
__call__ = process
......@@ -29,7 +29,7 @@ __date__ = "06/11/2019"
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from xas.core.types import Spectrum
from xas.core.types import Spectrum, XASObject
from .process import Process, Progress
import multiprocessing
import functools
......@@ -37,37 +37,54 @@ import logging
_logger = logging.getLogger(__name__)
def process_spectr_norm(spectrum, configuration, overwrite=True):
def process_spectr_norm(spectrum, configuration, overwrite=True, callback=None,
output=None, output_dict=None):
"""
:param :class:`.Spectrum` spectrum: spectrum to process
:param dict configuration: configuration of the pymca normalization
:param bool overwrite: False if we want to return a new Spectrum instance
:return:
:param function pointer callback: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param dict output_dict: key: input spectrum, value: index in the output
list.
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
if spectrum.mu is None:
_logger.error('Mu is not specified, unable to normalize')
if spectrum.energy is None or spectrum.mu is None:
_logger.error('Energy and or Mu is/are not specified, unable to '
'compute exafs')
return None, None
pymca_xas = XASClass()
pymca_xas.setSpectrum(energy=spectrum.energy,
mu=spectrum.mu)
pymca_xas.setConfiguration(configuration)
if configuration is not None:
pymca_xas.setConfiguration(configuration)
configuration = pymca_xas.getConfiguration()
# try:
pymca_xas.processSpectrum()
ddict = spectrum.to_dict()
res = pymca_xas.normalize()
ddict.update(res)
spectrum_ = Spectrum.from_dict(ddict)
# except (IndexError, ValueError) as e:
# _logger.error(e)
# return None, None
if overwrite:
spectrum.load_frm_dict(spectrum_.to_dict())
return configuration, spectrum
else:
return configuration, spectrum_
try:
res = pymca_xas.normalize()
ddict = spectrum.to_dict()
ddict.update(res)
spectrum_ = Spectrum.from_dict(ddict)
except (IndexError, ValueError) as e:
_logger.error(e)
return None, None
if callback:
callback()
def get_output(orignal_spec, res_spec):
if overwrite:
orignal_spec.update(res_spec)
return orignal_spec
else:
return res_spec
if output is not None:
assert output_dict is not None
output[output_dict[spectrum]] = get_output(spectrum, spectrum_)
return configuration, get_output(spectrum, spectrum_)
def pymca_normalization(xas_obj):
......@@ -83,6 +100,10 @@ def pymca_normalization(xas_obj):
return normalization_obj.process(xas_obj=xas_obj)
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with push workflow for now.
class PyMca_normalization(Process):
def __init__(self):
Process.__init__(self, 'normalization')
......@@ -102,7 +123,7 @@ class PyMca_normalization(Process):
:return: updated XASObject
:rtype: :class:`.XASObject`
"""
_xas_obj = self._getXasObject(xas_obj)
_xas_obj = self.getXasObject(xas_obj)
if _xas_obj.energy is None:
_logger.error('Energy not specified, unable to normalize spectra')
......@@ -113,19 +134,40 @@ class PyMca_normalization(Process):
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=xas_obj)
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
return _xas_obj
def _pool_process(self, xas_obj):
"""process normalization from a pool"""
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_norm,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
p.map(partial_, xas_obj.spectra)
xas_obj.configuration = xas_obj.spectra[0].configuration
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_norm(spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(process_spectr_norm,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=False,
output=res_list,
output_dict=output_dict)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
__call__ = process
......@@ -32,6 +32,11 @@ __date__ = "07/08/2019"
from .progress import Progress
from xas.core.types import XASObject
_DEBUG = True
if _DEBUG:
from xas.core.types import Spectrum
import numpy
class Process(object):
def __init__(self, name):
......@@ -48,8 +53,17 @@ class Process(object):
def stop(self):
self.__stop = True
def _getXasObject(self, xas_obj):
@staticmethod
def getXasObject(xas_obj):
if isinstance(xas_obj, dict):
return XASObject.from_dict(xas_obj)
_xas_obj = XASObject.from_dict(xas_obj)