Commit 82247dad authored by payno's avatar payno

[normalization][proto] split `process` function using a `_pool_thread` function

parent 19917577
......@@ -245,7 +245,7 @@ class NormalizationOW(OWWidget):
def _endProcess(self, xas_obj):
if self._callback_finish:
try:
self.getProcessingThread()._xas_obj._advancement.sigProgress.disconnect(self._setProgressValue)
self.getProcessingThread()._process_obj._advancement.sigProgress.disconnect(self._setProgressValue)
except ... as e:
_logger.error(str(e))
......@@ -263,55 +263,30 @@ from xas.core.process.normalization import _process_spectrum
# TODO: use directly the pymca_normalization, just use a different pool_process
class QPyMca_normalization(qt.QObject):
class QPyMca_normalization(xas.core.process.normalization.PyMca_normalization, qt.QObject):
# TODO: if possible remove the inheritance from qt.QObject
def __init__(self, parent=None):
qt.QObject.__init__(self, parent)
self._settings = None
xas.core.process.normalization.PyMca_normalization.__init__(self)
self._advancement = QProgress('normalization')
def setProperties(self, properties):
if '_pymcaSettings' in properties:
self._settings = properties['_pymcaSettings']
def process(self, xas_obj):
"""
:param xas_obj: object containing the configuration and spectra to process
:type: Union[XASObject, dict]. If is a dict, should contain configuration or
spectra keys. Otherwise is simply the spectra
:return: spectra dict
:rtype: dict
"""
if isinstance(xas_obj, dict):
_xas_obj = XASObject.from_dict(xas_obj)
else:
_xas_obj = xas_obj
if _xas_obj.energy is None:
_logger.error('Energy not specified, unable to normalize spectra')
return
if self._settings:
_xas_obj.configuration['Normalization'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
def _pool_process(self, xas_obj):
self.pool = qt.QThreadPool()
self.pool.setMaxThreadCount(5)
for spectrum in _xas_obj.spectra:
for spectrum in xas_obj.spectra:
runnable = NormalizationRunnable(spectrum=spectrum,
configuration=_xas_obj.configuration,
callback=self._test)
configuration=xas_obj.configuration,
callback=self._advancement_callback)
self.pool.start(runnable)
self.pool.waitForDone()
def _test(self):
# TODO: this should be removed
def _advancement_callback(self):
self._advancement.increaseAdvancement()
__call__ = process
# TODO: this runnable can be generalized.
class NormalizationRunnable(qt.QRunnable):
def __init__(self, spectrum, configuration, callback=None):
qt.QRunnable.__init__(self)
......@@ -327,6 +302,7 @@ class NormalizationRunnable(qt.QRunnable):
self._callback()
# TODO: this process thread can be generalized to.
class NormalizationQThread(qt.QThread):
def __init__(self, parent=None):
qt.QThread.__init__(self, parent)
......
......@@ -29,8 +29,8 @@ __date__ = "06/11/2019"
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from xas.core.types import XASObject, Spectrum
from .process import Process
from xas.core.types import Spectrum
from .process import Process, Progress
import multiprocessing
import functools
import logging
......@@ -79,12 +79,14 @@ def pymca_normalization(xas_obj):
class PyMca_normalization(Process):
def __init__(self):
Process.__init__(self, 'normalization')
self._advancement = Progress(self.name)
self._settings = None
def setProperties(self, properties):
if '_pymcaSettings' in properties:
self._settings = properties['_pymcaSettings']
# TODO: this can be generalized from process.
def process(self, xas_obj):
"""
......@@ -94,10 +96,7 @@ class PyMca_normalization(Process):
:return: spectra dict
:rtype: dict
"""
if isinstance(xas_obj, dict):
_xas_obj = XASObject.from_dict(xas_obj)
else:
_xas_obj = xas_obj
_xas_obj = self._getXasObject(xas_obj)
if _xas_obj.energy is None:
_logger.error('Energy not specified, unable to normalize spectra')
......@@ -108,20 +107,24 @@ class PyMca_normalization(Process):
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=xas_obj)
self._advancement.endProcess()
return _xas_obj
def _pool_process(self, xas_obj):
"""process normalization from a pool"""
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(_process_spectrum,
configuration=_xas_obj.configuration,
callback=self._advancement.increaseAdvancement)
res = p.map(partial_, _xas_obj.spectra)
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=True)
res = p.map(partial_, xas_obj.spectra)
# update xas_obj TODO: find a better way for setting this
for i, res_ite in enumerate(res):
configuration, spectrum = res_ite
_xas_obj.spectra[i].spectrum = spectrum
_xas_obj.spectra[i].configuration = configuration
_xas_obj.configuration = _xas_obj.spectra[0].configuration
self._advancement.endProcess()
return _xas_obj
xas_obj.spectra[i].spectrum = spectrum
xas_obj.spectra[i].configuration = configuration
xas_obj.configuration = xas_obj.spectra[0].configuration
__call__ = process
......@@ -30,6 +30,7 @@ __date__ = "07/08/2019"
from .progress import Progress
from xas.core.types import XASObject
class Process(object):
......@@ -46,3 +47,9 @@ class Process(object):
def stop(self):
self.__stop = True
def _getXasObject(self, xas_obj):
if isinstance(xas_obj, dict):
return XASObject.from_dict(xas_obj)
else:
return xas_obj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment