Commit 095576b3 authored by Henri Payno's avatar Henri Payno
Browse files

[orangecontrib] start removing RunnableProcess for pymca widgets

parent 28797618
......@@ -26,7 +26,7 @@
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/11/2019"
__date__ = "04/10/2021"
import functools
import logging
......@@ -140,10 +140,6 @@ def pymca_exafs(xas_obj):
return exafs_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class PyMca_exafs(
Process,
name="exafs",
......@@ -167,9 +163,9 @@ class PyMca_exafs(
self.setConfiguration(self.inputs.exafs)
_xas_obj.configuration["EXAFS"] = self.inputs.exafs
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
self._advancement.endProcess()
self.register_process(
_xas_obj,
......@@ -183,8 +179,8 @@ class PyMca_exafs(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra.data.flat:
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra.data.flat):
assert "NormalizedBackground" in spectrum
process_spectr_exafs(
spectrum=spectrum,
......@@ -193,30 +189,7 @@ class PyMca_exafs(
overwrite=True,
)
assert "EXAFSKValues" in spectrum
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_exafs,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
self.progress = i_s / n_s * 100.0
def definition(self):
return "exafs calculation"
......@@ -231,25 +204,3 @@ class PyMca_exafs(
return "pymca_exafs"
__call__ = run
if __name__ == "__main__":
import sys
import yaml
xas_object_yaml_file = sys.argv[1]
_logger.debug("Load xas object from {}".format(xas_object_yaml_file))
with open(xas_object_yaml_file, "r") as file:
ddict = yaml.load(file)["input_data"]
xas_object = XASObject.from_dict(ddict)
print("******* do exafs ********")
res_xas_object = pymca_exafs(xas_obj=xas_object)
res_xas_object._create_saving_pt()
# dump resulting object
with open(xas_object_yaml_file, "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
# dump resulting object into output_normalization file
with open("output_exafs.yaml", "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
......@@ -26,7 +26,7 @@
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/11/2019"
__date__ = "05/10/2021"
import functools
import logging
......@@ -145,10 +145,6 @@ def pymca_ft(xas_obj):
return ft_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class PyMca_ft(
Process,
name="ft",
......@@ -197,38 +193,15 @@ class PyMca_ft(
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
for spectrum in xas_obj.spectra:
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_spectr_ft(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
else:
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(1) as p:
partial_ = functools.partial(
process_spectr_ft,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
self.progress = i_s / n_s * 100.0
def definition(self):
return "fourier transform"
......@@ -243,25 +216,3 @@ class PyMca_ft(
return "pymca_ft"
__call__ = run
if __name__ == "__main__":
import sys
import yaml
xas_object_yaml_file = sys.argv[1]
_logger.debug("Load xas object from {}".format(xas_object_yaml_file))
with open(xas_object_yaml_file, "r") as file:
ddict = yaml.load(file)["input_data"]
xas_object = XASObject.from_dict(ddict)
print("******* do ft ********")
res_xas_object = pymca_ft(xas_obj=xas_object)
res_xas_object._create_saving_pt()
# dump resulting object
with open(xas_object_yaml_file, "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
# dump resulting object into output_normalization file
with open("output_ft.yaml", "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
......@@ -26,7 +26,7 @@
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/11/2019"
__date__ = "05/10/2021"
import functools
import logging
......@@ -133,10 +133,6 @@ def pymca_k_weight(xas_obj):
return k_weight_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class PyMca_k_weight(
Process,
name="k weight",
......@@ -189,9 +185,9 @@ class PyMca_k_weight(
]
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.progress = 100.0
self.register_process(xas_obj=_xas_obj, data_keys=tuple())
self.outputs.xas_obj = _xas_obj.to_dict()
return _xas_obj
......@@ -199,9 +195,9 @@ class PyMca_k_weight(
def _pool_process(self, xas_obj):
"""process normalization from a pool"""
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
assert "KWeight" in xas_obj.configuration
for spectrum in xas_obj.spectra.data.flat:
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra.data.flat):
process_spectr_k(
spectrum=spectrum,
configuration=xas_obj.configuration,
......@@ -209,15 +205,7 @@ class PyMca_k_weight(
overwrite=True,
)
assert "KWeight" in xas_obj.configuration
else:
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_k,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
p.map(partial_, xas_obj.spectra)
self.progress = i_s / n_s * 100.0
def definition(self):
return "Define k weight for xas treatment"
......@@ -232,25 +220,3 @@ class PyMca_k_weight(
return "pymca_k_weight"
__call__ = run
if __name__ == "__main__":
import sys
import yaml
xas_object_yaml_file = sys.argv[1]
_logger.debug("Load xas object from {}".format(xas_object_yaml_file))
with open(xas_object_yaml_file, "r") as file:
ddict = yaml.load(file)["input_data"]
xas_object = XASObject.from_dict(ddict)
print("******* do k-weight ********")
res_xas_object = pymca_k_weight(xas_obj=xas_object)
res_xas_object._create_saving_pt()
# dump resulting object
with open(xas_object_yaml_file, "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
# dump resulting object into output_normalization file
with open("output_k_weight.yaml", "w") as file:
yaml.dump({"input_data": res_xas_object.to_dict()}, file)
......@@ -26,7 +26,7 @@
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/11/2019"
__date__ = "05/10/2021"
import functools
......@@ -119,10 +119,6 @@ def pymca_normalization(xas_obj):
return normalization_obj.run()
_USE_MULTIPROCESSING_POOL = False
# note: we cannot use multiprocessing pool with pypushflow for now.
class PyMca_normalization(
Process,
name="normalization",
......@@ -174,7 +170,7 @@ class PyMca_normalization(
def _pool_process(self, xas_obj):
"""process normalization from a pool"""
assert isinstance(xas_obj, XASObject)
if not _USE_MULTIPROCESSING_POOL:
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra.data.flat):
process_spectr_norm(
spectrum=spectrum,
......@@ -182,32 +178,7 @@ class PyMca_normalization(
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / len(xas_obj.spectra.data.flat) * 100.0
else:
# TODO: remove this mode for simplification
from multiprocessing import Manager
manager = Manager()
output_dict = {}
res_list = manager.list()
for i_spect, spect in enumerate(xas_obj.spectra.data.flat):
res_list.append(None)
output_dict[spect] = i_spect
with multiprocessing.Pool(5) as p:
partial_ = functools.partial(
process_spectr_norm,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=False,
output=res_list,
output_dict=output_dict,
)
p.map(partial_, xas_obj.spectra)
# then update local spectrum
for spectrum, res in zip(xas_obj.spectra, res_list):
spectrum.update(res)
self.progress = i_s / n_s * 100.0
def definition(self):
return "Normalization of the spectrum"
......
......@@ -139,17 +139,6 @@ class ExafsOW(
# kept for compatibility
static_input = Setting({"xas_obj": None, "exafs": None})
"""Store the configuration of the PyMca XASClass"""
#
# class Inputs:
# xas_obj = Input("xas_obj", XASObject, default=True)
# # simple compatibility for some Orange widget and especialy the
# # 'spectroscopy add-on'
# data_table = Input("Data", Orange.data.Table)
#
# class Outputs:
# xas_obj = Output("xas_obj", XASObject)
# # by default we want to avoid sending 'Orange.data.Table' to avoid
# # loosing the XASObject flow process and results.
def __init__(self):
super().__init__()
......@@ -187,65 +176,14 @@ class ExafsOW(
"exafs": self._window._pymcaWindow.getParameters(),
}
#
# @Inputs.data_table
# def processFrmDataTable(self, data_table):
# if data_table is None:
# return
# self.process(Converter.toXASObject(data_table=data_table))
#
# @Inputs.xas_obj
# def process(self, xas_obj):
# if xas_obj is None:
# return
#
# if not self._canProcess():
# _logger.warning(
# "There is some processing on going already, will"
# "not process the new dataset"
# )
#
# self._latest_xas_obj = xas_obj
# self._startProcess()
#
# # setup the exafs process
# process_obj = QPyMca_exafs(inputs={"xas_obj": xas_obj})
# process_obj._advancement.sigProgress.connect(self._setProgressValue)
# process_obj.set_properties(
# {"_pymcaSettings": self._window._pymcaWindow.getParameters()}
# )
#
# # update the processing thread
# thread = self.getProcessingThread()
# thread.init(process_obj=process_obj, xas_obj=self._latest_xas_obj)
# self._callback_finish = functools.partial(
# self._endProcess, self._latest_xas_obj
# )
# thread.finished.connect(self._callback_finish)
# # start processing
# thread.start(priority=qt.QThread.LowPriority)
#
#
# class QPyMca_exafs(est.core.process.pymca.exafs.PyMca_exafs):
# """
# Normalization able to give advancement using qt.Signal and QThreadPool
# """
#
# def __init__(self, *args, **kwargs):
# est.core.process.pymca.exafs.PyMca_exafs.__init__(self, *args, **kwargs)
# self._advancement = QProgress("normalization")
#
# def _pool_process(self, xas_obj):
# self.pool = qt.QThreadPool()
# self.pool.setMaxThreadCount(5)
# for spectrum in xas_obj.spectra:
# runnable = ProcessRunnable(
# fct=est.core.process.pymca.exafs.process_spectr_exafs,
# spectrum=spectrum,
# configuration=xas_obj.configuration,
# callback=self._advancement.increaseAdvancement,
# )
# self.pool.start(runnable)
# self.pool.waitForDone()
def outputsChanged(self):
if "xas_obj" in self.task_inputs:
xas_obj = self.output_variables["xas_obj"].value
if isinstance(xas_obj, dict):
xas_obj = XASObject.from_dict(xas_obj)
if hasattr(self, "_window") and hasattr(self._window, "setXASObj"):
self._window.setXASObj(xas_obj=xas_obj)
elif hasattr(self, "_window") and hasattr(self._window, "xasObjViewer"):
if hasattr(self._window.xasObjViewer, "setXASObj"):
self._window.xasObjViewer.setXASObj(xas_obj=xas_obj)
self.Outputs.xas_obj.send(xas_obj)
......@@ -156,17 +156,6 @@ class FTOW(_ProcessForOrangeMixIn, ewokstaskclass=est.core.process.pymca.ft.PyMc
# kept for compatibility
static_input = dict({"ft": None, "xas_obj": None})
"""Store the configuration of the PyMca XASClass"""
#
# class Inputs:
# xas_obj = Input("xas_obj", XASObject, default=True)
# # simple compatibility for some Orange widget and especialy the
# # 'spectroscopy add-on'
# data_table = Input("Data", Orange.data.Table)
#
# class Outputs:
# xas_obj = Output("xas_obj", XASObject)
# # by default we want to avoid sending 'Orange.data.Table' to avoid
# # loosing the XASObject flow process and results.
def __init__(self):
super().__init__()
......@@ -183,54 +172,24 @@ class FTOW(_ProcessForOrangeMixIn, ewokstaskclass=est.core.process.pymca.ft.PyMc
self._window._pymcaWindow.setParameters(pymca_settings)
# signal / slot connection
# connect signals / slots
self._window._pymcaWindow.sigChanged.connect(self._updateProcess)
# self.handleNewSignals()
def _updateProcess(self, *arv, **kwargs):
self._update_settings()
if self._latest_xas_obj:
self.process(xas_obj=self._latest_xas_obj)
#
# @Inputs.data_table
# def processFrmDataTable(self, data_table):
# if data_table is None:
# return
# self.process(Converter.toXASObject(data_table=data_table))
#
# @Inputs.xas_obj
# def process(self, xas_obj):
# if xas_obj is None:
# return
#
# if not self._canProcess():
# _logger.warning(
# "There is some processing on going already, will"
# "not process the new dataset"
# )
#
# self._latest_xas_obj = xas_obj
# self._startProcess()
#
# # setup the ft process
# process_obj = QPyMca_ft(inputs={"xas_obj": xas_obj})
# process_obj._advancement.sigProgress.connect(self._setProgressValue)
# process_obj.set_properties(
# {"_pymcaSettings": self._window._pymcaWindow.getParameters()}
# )
#
# # update the processing thread
# thread = self.getProcessingThread()
# thread.init(process_obj=process_obj, xas_obj=self._latest_xas_obj)
# self._callback_finish = functools.partial(
# self._endProcess, self._latest_xas_obj
# )
# thread.finished.connect(self._callback_finish)
#
# # start processing
# thread.start(priority=qt.QThread.LowPriority)
def outputsChanged(self):
if "xas_obj" in self.task_inputs:
xas_obj = self.output_variables["xas_obj"].value
if isinstance(xas_obj, dict):
xas_obj = XASObject.from_dict(xas_obj)
if hasattr(self, "_window") and hasattr(self._window, "setXASObj"):
self._window.setXASObj(xas_obj=xas_obj)
elif hasattr(self, "_window") and hasattr(self._window, "xasObjViewer"):
if hasattr(self._window.xasObjViewer, "setXASObj"):
self._window.xasObjViewer.setXASObj(xas_obj=xas_obj)
self.Outputs.xas_obj.send(xas_obj)
def _update_settings(self):
self._pymcaSettings = self._window._pymcaWindow.getParameters()
......@@ -238,31 +197,3 @@ class FTOW(_ProcessForOrangeMixIn, ewokstaskclass=est.core.process.pymca.ft.PyMc
"xas_obj": None,
"ft": self._window._pymcaWindow.getParameters(),
}
#
# def _setProgressValue(self, value):
# self._progress.widget.progressBarSet(value)
#
# class QPyMca_ft(est.core.process.pymca.ft.PyMca_ft):
# """
# Normalization able to give advancement using qt.Signal and QThreadPool
# """
#
# def __init__(self, *args, **kwargs):
# est.core.process.pymca.ft.PyMca_ft.__init__(self, *args, **kwargs)
# self._advancement = QProgress("normalization")
#
# def _pool_process(self, xas_obj):
# self.pool = qt.QThreadPool()
# self.pool.setMaxThreadCount(5)
# for spectrum in xas_obj.spectra:
# runnable = ProcessRunnable(
# fct=est.core.process.pymca.ft.process_spectr_ft,
# spectrum=spectrum,
# configuration=xas_obj.configuration,
# callback=self._advancement.increaseAdvancement,
# )
# self.pool.start(runnable)
# self.pool.waitForDone()
......@@ -89,17 +89,6 @@ class KWeightOW(
# kept for compatibility
static_input = Setting({"xas_obj": None, "k_weight": 3})
"""Store the configuration of the PyMca XASClass"""
#
# class Inputs:
# xas_obj = Input("xas_obj", XASObject, default=True)
# # simple compatibility for some Orange widget and especialy the
# # 'spectroscopy add-on'
# data_table = Input("Data", Orange.data.Table)
#
# class Outputs:
# xas_obj = Output("xas_obj", XASObject)
# # by default we want to avoid sending 'Orange.data.Table' to avoid
# # loosing the XASObject flow process and results.
def __init__(self):
super().__init__()
......@@ -123,47 +112,24 @@ class KWeightOW(
if self._latest_xas_obj:
self.process(self._latest_xas_obj)
#
# @Inputs.data_table
# def processFrmDataTable(self, data_table):
# if data_table is None:
# return
# self.process(Converter.toXASObject(data_table=data_table))
#
# @Inputs.xas_obj
# def process(self, xas_obj):
# if xas_obj is None:
# return
#
# if not self._canProcess():
# _logger.warning(
# "There is some processing on going already, will"
# "not process the new dataset"
# )
#
# self._latest_xas_obj = xas_obj
# self._startProcess()
#
# # setup the k weight process
# process_obj = QPyMca_k_weight(inputs={"xas_obj": xas_obj})
# process_obj._advancement.sigProgress.connect(self._setProgressValue)
# process_obj.set_properties(
# {"_kWeightSetting": self._window._k_spin_box.value()}
# )
#
# # update the processing thread
# thread = self.getProcessingThread()
# thread.init(process_obj=process_obj, xas_obj=self._latest_xas_obj)
# self._callback_finish = functools.partial(
# self._endProcess, self._latest_xas_obj
# )
# thread.finished.connect(self._callback_finish)
# # start processing
# thread.start(priority=qt.QThread.LowPriority)
def _update_settings(self):
self._kWeightSetting = self._window._k_spin_box.value()
self.static_input = {
"xas_obj": None,
"k_weight": self._window._k_spin_box.value(),
}
def inputsChanged(self):
pass