Commit 798b20c1 authored by Henri Payno's avatar Henri Payno
Browse files

[orangecontrib] start removing RunnableProcess for larch widgets

parent 095576b3
......@@ -128,10 +128,6 @@ def larch_autobk(xas_obj):
return mback_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class Larch_autobk(
Process,
name="autobk",
......@@ -152,48 +148,24 @@ class Larch_autobk(
self.setConfiguration(self.inputs.autobk)
_xas_obj.configuration["autobk"] = self.inputs.autobk
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.progress = 100
self.register_process(_xas_obj, data_keys=(_NexusDatasetDef("bkg"),))
self.outputs.xas_obj = _xas_obj.to_dict()
return _xas_obj
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_autobk(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_autobk,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_autobk(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
def definition(self):
return "autobk calculation"
......
......@@ -119,10 +119,6 @@ def larch_mback(xas_obj):
return mback_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class Larch_mback(
Process,
name="mback",
......@@ -146,10 +142,9 @@ class Larch_mback(
elif "mback" not in _xas_obj.configuration:
_xas_obj.configuration["mback"] = {}
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.progress = 100.0
self.register_process(
_xas_obj,
data_keys=(
......@@ -162,38 +157,15 @@ class Larch_mback(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_mback(
spectrum=spectrum,
configuration=xas_obj.configuration["mback"],
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_mback,
configuration=xas_obj.configuration["mback"],
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_mback(
spectrum=spectrum,
configuration=xas_obj.configuration["mback"],
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
def definition(self):
return "mback calculation"
......
......@@ -128,10 +128,6 @@ def larch_mback_norm(xas_obj):
return mback_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class Larch_mback_norm(
Process,
name="mback_norm",
......@@ -152,10 +148,9 @@ class Larch_mback_norm(
self.setConfiguration(self.inputs.mback_norm)
_xas_obj.configuration["mback_norm"] = self.inputs.mback_norm
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.progress = 100.0
data_keys = [
_NexusDatasetDef("mback_mu"),
_NexusDatasetDef("norm_mback"),
......@@ -168,38 +163,15 @@ class Larch_mback_norm(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_mback_norm(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_mback_norm,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_mback_norm(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
def definition(self):
return "mback norm calculation"
......
......@@ -98,7 +98,9 @@ def process_spectr_pre_edge(
opts[opt_name] = _conf[opt_name]
if _DEBUG is True:
assert isinstance(spectrum, Group)
assert isinstance(
spectrum, Group
), f"spectrum is expected to be an instance of {type(Group)}. Not {type(spectrum)}"
if overwrite:
_spectrum = spectrum
else:
......@@ -124,10 +126,6 @@ def larch_pre_edge(xas_obj):
return mback_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class Larch_pre_edge(
Process,
name="pre_edge",
......@@ -149,10 +147,9 @@ class Larch_pre_edge(
self.setConfiguration(self.inputs.pre_edge)
_xas_obj.configuration["pre_edge"] = self.inputs.pre_edge
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.progress = 100.0
self.register_process(
_xas_obj,
data_keys=(
......@@ -218,38 +215,15 @@ class Larch_pre_edge(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_pre_edge(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_pre_edge,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_pre_edge(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
def definition(self):
return "pre_edge calculation"
......
......@@ -151,10 +151,6 @@ def larch_xftf(xas_obj):
return mback_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class Larch_xftf(
Process,
name="xftf",
......@@ -254,38 +250,15 @@ class Larch_xftf(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
process_spectr_xftf(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_xftf,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_xftf(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
def definition(self):
return "xftf calculation"
......
......@@ -164,7 +164,8 @@ class PyMca_normalization(
_NexusDatasetDef("NormalizedSignal"),
),
)
self.outputs.xas_obj = _xas_obj.to_dict()
xas_obj_as_dict = _xas_obj.to_dict()
self.outputs.xas_obj = xas_obj_as_dict
return _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -319,7 +319,9 @@ class XASObject:
file_path=self.linked_h5_file, data_path=data_path, scheme="silx"
).path()
print("start create saving point")
self._create_saving_pt()
print("end create saving point")
spectra_ = get_spectra_and_processing()
res = {
"configuration": self.configuration,
......
......@@ -32,6 +32,7 @@ from silx.gui import qt
from Orange.widgets import gui
import logging
from ewoksorange.bindings.owwidgets import OWEwoksWidgetOneThread
from ewokscore.hashing import MissingData
from est.core.types import XASObject
_logger = logging.getLogger(__file__)
......@@ -51,17 +52,19 @@ class _ProcessForOrangeMixIn(OWEwoksWidgetOneThread):
pass
def outputsChanged(self):
print("output variables are", self.output_variables)
# TODO: maybe we would expect a task_outputs to match the task_inputs
if "xas_obj" in self.task_inputs:
print("xas_obj contains", self.task_inputs)
xas_obj = self.output_variables["xas_obj"].value
if isinstance(xas_obj, dict):
if isinstance(xas_obj, MissingData):
_logger.warning("no output data set. Unable to update the GUI")
return
elif isinstance(xas_obj, dict):
xas_obj = XASObject.from_dict(xas_obj)
if hasattr(self, "_window") and hasattr(self._window, "setXASObj"):
self._window.setXASObj(xas_obj=xas_obj)
elif hasattr(self, "_window") and hasattr(self._window, "xasObjViewer"):
if hasattr(self._window.xasObjViewer, "setXASObj"):
self._window.xasObjViewer.setXASObj(xas_obj=xas_obj)
# emit signal for the plot
self.Outputs.xas_obj.send(xas_obj)
......@@ -139,7 +139,9 @@ class AutobkWindow(qt.QMainWindow):
)
class AutobkOW(_ProcessForOrangeMixIn, OWWidget):
class AutobkOW(
_ProcessForOrangeMixIn, ewokstaskclass=est.core.process.larch.autobk.Larch_autobk
):
"""
Widget used for signal extraction
"""
......@@ -154,26 +156,12 @@ class AutobkOW(_ProcessForOrangeMixIn, OWWidget):
want_main_area = True
resizing_enabled = True
allows_cycle = False
_larchSettings = Setting(dict())
# kept for compatibility
static_input = Setting({"xas_obj": None, "autobk": None})
"""Store the configuration of the larch configuration"""
ewokstaskclass = est.core.process.larch.autobk.Larch_autobk
class Inputs:
xas_obj = Input("xas_obj", XASObject, default=True)
# simple compatibility for some Orange widget and especialy the
# 'spectroscopy add-on'
data_table = Input("Data", Orange.data.Table)
class Outputs:
xas_obj = Output("xas_obj", XASObject)
# by default we want to avoid sending 'Orange.data.Table' to avoid
# loosing the XASObject flow process and results.
def __init__(self):
super().__init__()
self._latest_xas_obj = None
......@@ -192,16 +180,9 @@ class AutobkOW(_ProcessForOrangeMixIn, OWWidget):
# connect signals / slots
self._window._parametersWindow.sigChanged.connect(self._updateProcess)
# required to display advancement
if _USE_THREAD is False:
self._advancement = QProgress("autobk")
self._advancement.sigProgress.connect(self._setProgressValue)
# set up (insure settings will be store
self._update_settings()
# self.handleNewSignals()
def _updateProcess(self):
"""Update settings keeping current xas obj"""
self._update_settings()
......@@ -214,83 +195,3 @@ class AutobkOW(_ProcessForOrangeMixIn, OWWidget):
"autobk": self._window._parametersWindow.getParameters(),
"xas_obj": None,
}
@Inputs.data_table
def processFrmDataTable(self, data_table):
if data_table is None:
return
self.process(Converter.toXASObject(data_table=data_table))
@Inputs.xas_obj
def process(self, xas_obj):
if xas_obj is None:
return
if not self._canProcess():
_logger.warning(
"There is some processing on going already, will"
"not process the new dataset"
)
self._latest_xas_obj = xas_obj
self._startProcess()
# setup the normalization process
inputs = {"xas_obj": xas_obj}
if _USE_THREAD is True:
# note: for now with larch with cannot do thread computation (see
# PreEdgeOW )
process_obj = QLarch_autobk(inputs=inputs)
process_obj._advancement.sigProgress.connect(self._setProgressValue)
process_obj.set_properties(
{"_larchSettings": self._window._parametersWindow.getParameters()}
)
# update the processing thread
thread = self.getProcessingThread()
thread.init(process_obj=process_obj, xas_obj=self._latest_xas_obj)
self._callback_finish = functools.partial(
self._endProcess, self._latest_xas_obj
)
thread.finished.connect(self._callback_finish)
# start processing
thread.start(priority=qt.QThread.LowPriority)
else:
# manage advancement
self._advancement.setAdvancement(0)
self._advancement.setMaxSpectrum(self._latest_xas_obj.n_spectrum)
process_obj = est.core.process.larch.autobk.Larch_autobk(inputs=inputs)
process_obj.advancement = self._advancement
process_obj.set_properties(
{"_larchSettings": self._window._parametersWindow.getParameters()}
)
# hack (reason explained in the thread part)
# to avoid gui freeze, processevents after each spectrum process
process_obj.addCallback(qt.QApplication.instance().processEvents)
process_obj.addCallback(self._advancement.increaseAdvancement)
process_obj.run()
self._callback_finish = None
self._endProcess(self._latest_xas_obj)
class QLarch_autobk(est.core.process.larch.autobk.Larch_autobk):
"""
Normalization able to give advancement using qt.Signal and QThreadPool
"""
def __init__(self, *args, **kwargs):
est.core.process.larch.autobk.Larch_autobk.__init__(self, *args, **kwargs)
self._advancement = QProgress("autobk")
def _pool_process(self, xas_obj):
self.pool = qt.QThreadPool()
self.pool.setMaxThreadCount(5)
for spectrum in xas_obj.spectra:
runnable = ProcessRunnable(
fct=est.core.process.larch.autobk.process_spectr_autobk,
spectrum=spectrum,
configuration=xas_obj.configuration,
callback=self._advancement.increaseAdvancement,
)
self.pool.start(runnable)
self.pool.waitForDone()
......@@ -49,8 +49,6 @@ from orangecontrib.est.widgets.container import _ParameterWindowContainer
_logger = logging.getLogger(__file__)
_USE_THREAD = False
class MbackWindow(qt.QMainWindow):
def __init__(self, parent=None):
......@@ -112,7 +110,9 @@ class MbackWindow(qt.QMainWindow):
return len(self.xasObjViewer._spectrumViews[0]._plotWidget.getAllCurves())
class MbackOW(_ProcessForOrangeMixIn, OWWidget):
class MbackOW(
_ProcessForOrangeMixIn, ewokstaskclass=est.core.process.larch.mback.Larch_mback
):
"""
Widget used for signal extraction
"""
......@@ -130,26 +130,12 @@ class MbackOW(_ProcessForOrangeMixIn, OWWidget):
want_main_area = True
resizing_enabled = True
allows_cycle = False
_larchSettings = Setting(dict())
# kept for compatibility
static_input = Setting({"mback": None})
"""Store the configuration of the larch configuration"""
ewokstaskclass = est.core.process.larch.mback.Larch_mback
class Inputs:
xas_obj = Input("xas_obj", XASObject, default=True)
# simple compatibility for some Orange widget and especialy the
# 'spectroscopy add-on'
data_table = Input("Data", Orange.data.Table)
class Outputs:
xas_obj = Output("xas_obj", XASObject)
# by default we want to avoid sending 'Orange.data.Table' to avoid
# loosing the XASObject flow process and results.
def __init__(self):
super().__init__()
self._latest_xas_obj = None
......@@ -168,11 +154,6 @@ class MbackOW(_ProcessForOrangeMixIn, OWWidget):
# connect signals / slots
self._window._parametersWindow.sigChanged.connect(self._updateProcess)
# required to display advancement
if _USE_THREAD is False:
self._advancement = QProgress("mback")
self._advancement.sigProgress.connect(self._setProgressValue)