Commit 19917577 authored by payno's avatar payno

[pool process + processing thread] proto - make processing in a pool and start...

[pool process + processing thread] proto - make processing in a pool and start from a separated thread to avoid gui freeze.

Move Normalization to multiprocessing.Pool and the call from orange from a dedicated thread and with a QThreadPool.
This prevent any gui freeze.
For now this is just a prototype. Some thing to do:

- create a pool_process that will be inherited from PyMca_normalization and QPyMca_normalization.
- generalize this structure for each orange widget.
parent 2cd39480
Pipeline #12425 failed with stage
in 60 minutes and 1 second
......@@ -34,9 +34,11 @@ from xas.core.process.progress import Progress
class QProgress(Progress, qt.QObject):
sigProgress = qt.Signal(int)
def __init__(self, name):
Progress.__init__(self, name)
assert name is not None
qt.QObject.__init__(self)
Progress.__init__(self, name)
def startProcess(self):
self.sigProgress.emit(0)
......
......@@ -217,4 +217,4 @@ class ExafsOW(OWWidget):
class QPyMca_exafs(xas.core.process.exafs.PyMca_exafs):
def __init__(self):
xas.core.process.exafs.PyMca_exafs.__init__(self)
self._advancement = QProgress('exafs')
self._advancement = QProgress(self.name)
......@@ -185,4 +185,4 @@ class FTOW(OWWidget):
class QPyMca_ft(xas.core.process.ft.PyMca_ft):
def __init__(self):
xas.core.process.ft.PyMca_ft.__init__(self)
self._advancement = QProgress('exafs')
self._advancement = QProgress(self.name)
......@@ -204,4 +204,4 @@ class KWeightOW(OWWidget):
class QPyMca_K_weight(xas.core.process.k_weight.PyMca_k_weight):
def __init__(self):
xas.core.process.k_weight.PyMca_k_weight.__init__(self)
self._advancement = QProgress('k weight')
self._advancement = QProgress(self.name)
......@@ -38,6 +38,7 @@ from ..progress import QProgress
from xas.core.types import XASObject, Spectrum
from xas.gui.XasObjectViewer import XasObjectViewer, _CurveOperation, ViewType
import xas.core.process.normalization
import functools
import logging
_logger = logging.getLogger(__file__)
......@@ -181,6 +182,7 @@ class NormalizationOW(OWWidget):
self._window = NormalizationWindow(parent=self)
layout = gui.vBox(self.mainArea, 'fourier transform').layout()
layout.addWidget(self._window)
self.__processingThread = None
self._window.xasObjViewer.setWindowTitle('spectra')
# progress
......@@ -201,17 +203,33 @@ class NormalizationOW(OWWidget):
if xas_obj is None:
return
# TODO: move process to multiprocessing + add advancement progress
self._stopProcessing()
self._latest_xas_obj = xas_obj.copy()
# setup the normalization process
process_obj = QPyMca_normalization()
process_obj._advancement.sigProgress.connect(self._setProgressValue)
process_obj.setProperties({'_pymcaSettings': self._window._pymcaWindow.getParameters()})
xas_obj = process_obj.process(xas_obj)
process_obj._advancement.sigProgress.disconnect(self._setProgressValue)
self._window.xasObjViewer.setXASObj(xas_obj=xas_obj)
# emit signal for the plot
self.send("spectra", xas_obj)
# connect processing thread
thread = self.getProcessingThread()
thread.init(process_obj=process_obj, xas_obj=xas_obj)
self._callback_finish = functools.partial(self._endProcess, xas_obj)
thread.finished.connect(self._callback_finish)
# start processing
thread.start(priority=qt.QThread.LowPriority)
def _stopProcessing(self):
if self.__processingThread is None:
return
else:
self.__processingThread.finished.disconnect(self._callback_finish)
self.__processingThread.quit()
def getProcessingThread(self):
if self.__processingThread is None:
self.__processingThread = NormalizationQThread(parent=self)
return self.__processingThread
def _updateProcess(self):
self._update_settings()
......@@ -224,8 +242,98 @@ class NormalizationOW(OWWidget):
def _setProgressValue(self, value):
self._progress.widget.progressBarSet(value)
def _endProcess(self, xas_obj):
if self._callback_finish:
try:
self.getProcessingThread()._xas_obj._advancement.sigProgress.disconnect(self._setProgressValue)
except ... as e:
_logger.error(str(e))
class QPyMca_normalization(xas.core.process.normalization.PyMca_normalization):
def __init__(self):
xas.core.process.normalization.PyMca_normalization.__init__(self)
self.getProcessingThread().finished.disconnect(self._callback_finish)
self._callback_finish = None
if xas_obj is None:
return
else:
self._window.xasObjViewer.setXASObj(xas_obj=xas_obj)
# emit signal for the plot
self.send("spectra", xas_obj)
from xas.core.process.normalization import _process_spectrum
# TODO: use directly the pymca_normalization, just use a different pool_process
class QPyMca_normalization(qt.QObject):
def __init__(self, parent=None):
qt.QObject.__init__(self, parent)
self._settings = None
self._advancement = QProgress('normalization')
def setProperties(self, properties):
if '_pymcaSettings' in properties:
self._settings = properties['_pymcaSettings']
def process(self, xas_obj):
"""
:param xas_obj: object containing the configuration and spectra to process
:type: Union[XASObject, dict]. If is a dict, should contain configuration or
spectra keys. Otherwise is simply the spectra
:return: spectra dict
:rtype: dict
"""
if isinstance(xas_obj, dict):
_xas_obj = XASObject.from_dict(xas_obj)
else:
_xas_obj = xas_obj
if _xas_obj.energy is None:
_logger.error('Energy not specified, unable to normalize spectra')
return
if self._settings:
_xas_obj.configuration['Normalization'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.pool = qt.QThreadPool()
self.pool.setMaxThreadCount(5)
for spectrum in _xas_obj.spectra:
runnable = NormalizationRunnable(spectrum=spectrum,
configuration=_xas_obj.configuration,
callback=self._test)
self.pool.start(runnable)
self.pool.waitForDone()
def _test(self):
self._advancement.increaseAdvancement()
__call__ = process
class NormalizationRunnable(qt.QRunnable):
def __init__(self, spectrum, configuration, callback=None):
qt.QRunnable.__init__(self)
self._spectrum = spectrum
self._configuration = configuration
self._callback = callback
def run(self):
self._configuration, self._spectrum = _process_spectrum(spectrum=self._spectrum,
configuration=self._configuration,
overwrite=True)
if self._callback:
self._callback()
class NormalizationQThread(qt.QThread):
def __init__(self, parent=None):
qt.QThread.__init__(self, parent)
def init(self, xas_obj, process_obj):
self._xas_obj = xas_obj
self._process_obj = process_obj
def run(self):
self._xas_obj = self._process_obj.process(self._xas_obj)
......@@ -28,7 +28,7 @@ __license__ = "MIT"
__date__ = "06/11/2019"
from xas.core.types import XASObject
from .progress import Progress
from .process import Process
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from PyMca5.PyMcaPhysics.xas.XASClass import e2k
import logging
......@@ -78,11 +78,11 @@ def pymca_exafs(xas_obj):
return exafs_obj.process(xas_obj=xas_obj)
class PyMca_exafs(object):
class PyMca_exafs(Process):
"""Process spectra for exafs and get information about the processing
advancement"""
def __init__(self):
self._advancement = Progress('exafs')
Process.__init__(self, name='exafs')
self._settings = None
def setProperties(self, properties):
......
......@@ -29,7 +29,7 @@ __date__ = "06/11/2019"
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from xas.core.types import XASObject, Spectrum
from .progress import Progress
from .process import Process
import logging
_logger = logging.getLogger(__name__)
......@@ -61,10 +61,10 @@ def pymca_ft(xas_obj):
return ft_obj.process(xas_obj=xas_obj)
class PyMca_ft(object):
class PyMca_ft(Process):
def __init__(self):
Process.__init__(self, name='ft')
self._settings = None
self._advancement = Progress('ft')
def setProperties(self, properties):
if '_pymcaSettings' in properties:
......
......@@ -29,7 +29,7 @@ __date__ = "06/11/2019"
from xas.core.types import XASObject, Spectrum
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from .progress import Progress
from .process import Process
import logging
_logger = logging.getLogger(__name__)
......@@ -61,10 +61,10 @@ def pymca_k_weight(xas_obj):
return k_weight_obj.process(xas_obj=xas_obj)
class PyMca_k_weight(object):
class PyMca_k_weight(Process):
def __init__(self):
Process.__init__(self, name='k weight')
self._k_weight = None
self._advancement = Progress('k weight')
def setProperties(self, properties):
if '_kWeightSetting' in properties:
......
......@@ -30,15 +30,17 @@ __date__ = "06/11/2019"
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from xas.core.types import XASObject, Spectrum
from .progress import Progress
from .process import Process
import multiprocessing
import functools
import logging
_logger = logging.getLogger(__name__)
def _process_spectrum(spectrum, configuration):
def _process_spectrum(spectrum, configuration, overwrite=True):
if spectrum.mu is None:
_logger.error('Mu is not specified, unable to normalize')
return
return None, None
pymca_xas = XASClass()
pymca_xas.setSpectrum(energy=spectrum.energy,
mu=spectrum.mu)
......@@ -50,11 +52,16 @@ def _process_spectrum(spectrum, configuration):
ddict = spectrum.to_dict()
res = pymca_xas.normalize()
ddict.update(res)
spectrum = Spectrum.from_dict(ddict)
spectrum_ = Spectrum.from_dict(ddict)
except (IndexError, ValueError) as e:
_logger.error(e)
return None, None
if overwrite:
spectrum.load_frm_dict(spectrum_.to_dict())
return configuration, spectrum
else:
return configuration, spectrum_
return configuration, spectrum
def pymca_normalization(xas_obj):
"""
......@@ -69,10 +76,10 @@ def pymca_normalization(xas_obj):
return normalization_obj.process(xas_obj=xas_obj)
class PyMca_normalization(object):
class PyMca_normalization(Process):
def __init__(self):
Process.__init__(self, 'normalization')
self._settings = None
self._advancement = Progress('normalization')
def setProperties(self, properties):
if '_pymcaSettings' in properties:
......@@ -99,13 +106,21 @@ class PyMca_normalization(object):
if self._settings:
_xas_obj.configuration['Normalization'] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
for i_spectrum, spectrum in enumerate(_xas_obj.spectra):
_xas_obj.configuration, _xas_obj.spectra[
i_spectrum] = _process_spectrum(spectrum=spectrum,
configuration=_xas_obj.configuration)
self._advancement.setAdvancement(
int(i_spectrum / _xas_obj.n_spectrum) * 100)
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(_process_spectrum,
configuration=_xas_obj.configuration,
callback=self._advancement.increaseAdvancement)
res = p.map(partial_, _xas_obj.spectra)
# update xas_obj TODO: find a better way for setting this
for i, res_ite in enumerate(res):
configuration, spectrum = res_ite
_xas_obj.spectra[i].spectrum = spectrum
_xas_obj.spectra[i].configuration = configuration
_xas_obj.configuration = _xas_obj.spectra[0].configuration
self._advancement.endProcess()
return _xas_obj
......
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""module for process base class"""
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "07/08/2019"
from .progress import Progress
class Process(object):
def __init__(self, name):
assert type(name) is str
self._name = name
self._advancement = Progress(name=name)
self.__stop = False
"""flag to notice when a end of process is required"""
@property
def name(self):
return self._name
def stop(self):
self.__stop = True
......@@ -22,6 +22,7 @@
# THE SOFTWARE.
#
# ###########################################################################*/
"""module for giving information on process progress"""
__authors__ = ["H. Payno"]
__license__ = "MIT"
......@@ -66,6 +67,11 @@ class Progress(object):
"""Simple interface for defining advancement on a 100 percentage base"""
def __init__(self, name):
self._name = name
self.reset()
def reset(self, max_=None):
self._nProcessed = 0
self._maxProcessed = max_
def startProcess(self):
_logger.info('start process' + self._name)
......@@ -75,3 +81,10 @@ class Progress(object):
def endProcess(self):
_logger.info('process finished' + self._name)
def setMaxSpectrum(self, n):
self._maxProcessed = n
def increaseAdvancement(self, i=1):
self._nProcessed += i
self.setAdvancement((self._nProcessed / self._maxProcessed) * 100)
......@@ -83,6 +83,9 @@ class XASObject(object):
self.addSpectrum(spectrum)
self.energy = energy
def _setSpectra(self, spectra):
self.__spectra = spectra
def getSpectrum(self, dim1_idx, dim2_idx):
"""Util function to access the spectrum at dim1_idx, dim2_idx"""
assert dim1_idx < self.dim1
......@@ -345,12 +348,15 @@ class Spectrum(object):
def __contains__(self, item):
return item in self.__key_mapper or item in self.__other_parameters
def load_frm_dict(self, ddict):
for key, value in ddict.items():
self[key] = value
return self
@staticmethod
def from_dict(ddict):
spectrum = Spectrum()
for key, value in ddict.items():
spectrum[key] = value
return spectrum
return spectrum.load_frm_dict(ddict=ddict)
def to_dict(self):
res = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment