Commit 1f729dad authored by payno's avatar payno Committed by Henri Payno
Browse files

[orangecontrib] add normalization processing

parent 0374ab47
......@@ -27,7 +27,7 @@ __authors__ = [
"H. Payno",
]
__license__ = "MIT"
__date__ = "22/06/2021"
__date__ = "19/07/2021"
import logging
......@@ -40,13 +40,67 @@ from silx.gui import qt
from tomwer.core.scan.scanbase import TomwerScanBase
from orangecontrib.tomwer.widgets.utils import WidgetLongProcessing
from Orange.widgets.settings import Setting
from Orange.widgets import widget, gui
from Orange.widgets import gui
from Orange.widgets.widget import Input, Output
from ...orange.managedprocess import SuperviseOW
from tomwer.gui.reconstruction.normalization.intensity import (
NormIntensityWindow as _NormIntensityWindow,
)
from tomwer.synctools.stacks.reconstruction.normalization import (
INormalizationProcessStack,
)
from processview.core.manager import ProcessManager
from processview.core.manager import DatasetState
import functools
_logger = logging.getLogger(__name__)
class NormIntensityWindow(_NormIntensityWindow):
"""
implementation of NormIntensityWindow for orange. Add a lock processing
and a processing stack
"""
sigValidate = qt.Signal()
def __init__(self, parent, process_id=None):
assert isinstance(parent, NormIOW)
super().__init__(parent)
self._parentValidate = self.parent()._validate
self._isLocked = self.parent().isLocked
self._processing_stack = INormalizationProcessStack(process_id=process_id)
# connect signal / slot
self._optsWidget.sigProcessingRequested.connect(self._launchProcessing)
def _validated(self):
scan = self.getScan()
self._parentValidate(scan)
def _launchProcessing(self):
scan = self.getScan()
if scan is None:
return
self._processing_stack.add(
scan=scan,
configuration=self.getConfiguration(),
callback=functools.partial(
self._mightUpdateResult,
scan,
self._isLocked(),
),
)
def _mightUpdateResult(self, scan, validate):
extra_info = scan.intensity_normalization.get_extra_infos()
if "value" in extra_info:
self.setResult(result=extra_info["value"])
else:
self.setResult(None)
if validate is True:
self._parentValidate(scan)
class NormIOW(WidgetLongProcessing, SuperviseOW):
"""
A simple widget managing the copy of an incoming folder to an other one
......@@ -59,7 +113,7 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
id = "orange.widgets.tomwer.reconstruction.NormIOW.NormIOW"
description = "Define normalization on intensity to be applied on projections"
icon = "icons/norm_I.svg"
priority = 40
priority = 28
category = "esrfWidgets"
keywords = [
"tomography",
......@@ -95,7 +149,23 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
SuperviseOW.__init__(self, parent)
WidgetLongProcessing.__init__(self)
raise NotImplementedError("Missing widget on gui side")
self._window = NormIntensityWindow(self, process_id=self.process_id)
self._layout = gui.vBox(self.mainArea, self.name).layout()
self._layout.setContentsMargins(0, 0, 0, 0)
self._layout.addWidget(self._window)
# connect signal / slot
# self._window.sigConfigurationChanged.connect(self._updateSettings)
self.destroyed.connect(self._window.stop)
self._window._processing_stack.sigComputationStarted.connect(
self._startProcessing
)
self._window._processing_stack.sigComputationEnded.connect(self._endProcessing)
def isLocked(self):
return False
@Inputs.data_in
def process(self, scan: TomwerScanBase):
......@@ -106,4 +176,27 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
)
if scan is None:
return
raise NotImplementedError("Missing processing")
if self.isLocked():
raise NotImplementedError("Missing processing")
else:
self._window.setScan(scan=scan)
def _validate(self, scan):
self.Outputs.data_out.send(scan)
def _skipCurrentScan(self, new_scan):
scan = self._widget.getScan()
# if the same scan has been run several scan
if scan is None or str(scan) == str(new_scan):
return
current_scan_state = ProcessManager().get_dataset_state(
dataset_id=scan.get_dataset_identifier(), process=self
)
if current_scan_state in (
DatasetState.PENDING,
DatasetState.WAIT_USER_VALIDATION,
):
details = "Was pending and has been replaced by another scan."
self.notify_skip(scan=scan, details=details)
self.Outputs.data_out.send(scan)
......@@ -60,6 +60,11 @@ class NormIntensityWindow(_NormIntensityWindow):
self._optsWidget.sigProcessingRequested.connect(self._launchProcessing)
self._processingThread.finished.connect(self._threadedProcessEnded)
def _validated(self):
raise NotImplementedError("not implemented")
# TODO: should hide / close the window and insure result are saved
# in the .hdf5 file
def _launchProcessing(self):
self.clear()
if self._processingThread.isRunning():
......
......@@ -54,6 +54,8 @@ import functools
import typing
import numpy
import logging
from tomoscan.normalization import Method as TomoScanMethod
_logger = logging.getLogger(__name__)
......@@ -90,11 +92,28 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
return None
params = IntensityNormalizationParams.from_dict(self._settings)
if params.method is Method.MANUAL_ROI:
return self._compute_from_manual_roi(scan)
res = self._compute_from_manual_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.AUTO_ROI:
return self._compute_from_automatic_roi(scan)
res = self._compute_from_automatic_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.DATASET:
return self._compute_from_dataset()
res = self._compute_from_dataset()
need_conversion_to_tomoscan = True
elif params.method.value in TomoScanMethod.values():
need_conversion_to_tomoscan = False
res = None
else:
raise ValueError("method {} is not handled".format(params.method))
if need_conversion_to_tomoscan:
results_to_tomoscan_norm(
scan=scan,
method=params.method,
results=res,
extra_infos=params.extra_infos,
)
return scan
def _compute_from_manual_roi(self, scan):
params = IntensityNormalizationParams.from_dict(self.get_configuration())
......@@ -216,6 +235,10 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
append_url(url)
data = silx.io.get_data(url)
# apply flat field correction
data = scan.flat_field_correction(
projs=(data), proj_indexes=(proj_index,)
)[0]
if data.ndim is 2:
roi_area[current_idx] = data[start_y:end_y, start_x:end_x]
current_idx += 1
......@@ -320,3 +343,32 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
def definition():
"""definition of the process"""
return "Normalize intensity."
def results_to_tomoscan_norm(
scan,
method: Method,
results: typing.Union[numpy.ndarray, float, None],
extra_infos: dict,
):
"""
Util function to copy results from tomwer normalization to tomoscan
normalization parameters
:param TomwerScanBase scan:
:param Method method:
:param dict results:
:param dict extra_infos:
:return:
"""
if method in TomoScanMethod.values():
scan.intensity_normalization = method
scan.intensity_normalization.set_extra_infos(extra_infos)
else:
scan.intensity_normalization = TomoScanMethod.SCALAR
if results is not None:
if isinstance(results, numpy.ndarray):
extra_infos["value"] = tuple(results)
else:
extra_infos["value"] = results
scan.intensity_normalization.set_extra_infos(extra_infos)
......@@ -83,11 +83,15 @@ class NormIntensityWindow(qt.QMainWindow):
# connect signal / slot
self._optsWidget.sigModeChanged.connect(self._modeChanged)
self._optsWidget.sigValueUpdated.connect(self.setResult)
self._crtWidget.sigValidateRequest.connect(self._validated)
# set up
self._centralWidget._updateSinogramROI()
self._modeChanged()
def _validated(self):
pass
def getConfiguration(self) -> dict:
return self._optsWidget.getConfiguration()
......@@ -586,7 +590,7 @@ class _NormIntensityControl(ControlWidget):
def setResult(self, result):
if isinstance(result, tuple):
result = ",".join([str(element) for element in result])
self._resultQLE.setText(result)
self._resultQLE.setText(str(result))
def clear(self):
self._resultQLE.clear()
......
# coding: utf-8
###########################################################################
# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################
__authors__ = ["H.Payno"]
__license__ = "MIT"
__date__ = "19/07/2021"
from tomwer.core.process.reconstruction.normalization import (
IntensityNormalizationProcess,
IntensityNormalizationParams,
)
from processview.core.manager import ProcessManager
from tomwer.core.scan.scanbase import TomwerScanBase
from tomwer.core.settings import get_lbsram_path, isOnLbsram
from tomwer.core.utils import isLowOnMemory
from processview.core.manager import DatasetState
from ..processingstack import FIFO, ProcessingThread
from processview.core.superviseprocess import SuperviseProcess
from silx.gui import qt
import functools
import logging
_logger = logging.getLogger(__name__)
class INormalizationProcessStack(FIFO, qt.QObject):
"""Implementation of the `.AxisProcess` but having a stack for treating
scans and making computation in threads"""
def __init__(self, process_id=None):
qt.QObject.__init__(self)
FIFO.__init__(self, process_id=process_id)
def _process(self, scan, configuration, callback=None):
ProcessManager().notify_dataset_state(
dataset=scan,
process=self,
state=DatasetState.ON_GOING,
)
_logger.processStarted("start saaxis {}" "".format(str(scan)))
assert isinstance(scan, TomwerScanBase)
if isOnLbsram(scan) and isLowOnMemory(get_lbsram_path()) is True:
# if computer is running into low memory on lbsram skip it
mess = "low memory, skip intensity normalization", scan.path
ProcessManager().notify_dataset_state(
dataset=scan, process=self._process_id, state=DatasetState.SKIPPED
)
_logger.processSkipped(mess)
if callback is not None:
callback()
self.scan_ready(scan=scan)
else:
self._scan_currently_computed = scan
self._computationThread.init(scan=scan, i_norm_params=configuration)
# need to manage connect before starting it because
fct_callback = functools.partial(self._end_threaded_computation, callback)
self._computationThread.finished.connect(fct_callback)
self._computationThread.start()
def _end_computation(self, scan, callback):
"""
callback when the computation thread is finished
:param scan: pass if no call to '_computationThread is made'
"""
assert isinstance(scan, TomwerScanBase)
FIFO._end_computation(self, scan=scan, callback=callback)
def _end_threaded_computation(self, callback=None):
assert self._scan_currently_computed is not None
self._computationThread.finished.disconnect()
if callback:
callback()
FIFO._end_threaded_computation(self)
def _create_processing_thread(self, process_id=None) -> qt.QThread:
return _ProcessingThread(process_id=process_id)
class _ProcessingThread(ProcessingThread, SuperviseProcess):
"""
Thread use to execute the processing of intensity normalization
(mostly load io and compute the mean / median on it)
"""
def __init__(self, process_id=None):
SuperviseProcess.__init__(self, process_id=process_id)
ProcessingThread.__init__(self, process_id=process_id)
self.center_of_rotation = None
self._dry_run = False
self._scan = None
self._i_norm_params = None
def init(self, scan, i_norm_params):
self._scan = scan
self._i_norm_params = i_norm_params
def run(self):
self.sigComputationStarted.emit()
norm_process = IntensityNormalizationProcess(process_id=self.process_id)
norm_process.set_configuration(configuration=self._i_norm_params)
norm_process.process(self._scan)
try:
norm_process.process(self._scan)
except Exception as e:
_logger.error(str(e))
mess = "Intensity normalization computation for {} failed.".format(
str(self._scan)
)
state = DatasetState.FAILED
else:
mess = "Intensity normalization computation for {} succeed.".format(
str(self._scan)
)
state = DatasetState.WAIT_USER_VALIDATION
ProcessManager().notify_dataset_state(
dataset=self._scan,
process=self,
state=state,
details=mess,
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment