Commit e3779397 authored by payno's avatar payno Committed by Henri Payno
Browse files

[orangecontrib] add test on the normalization widget

parent a9980bed
......@@ -54,6 +54,8 @@ from processview.core.manager import DatasetState
from tomwer.core.process.reconstruction.normalization import (
IntensityNormalizationProcess,
)
from tomwer.core import settings
from tomwer.core import utils
import functools
_logger = logging.getLogger(__name__)
......@@ -177,6 +179,12 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
def setLocked(self, locked):
self._window.setLocked(locked)
def setCurrentMethod(self, mode):
self._window.setCurrentMethod(mode)
def getCurrentMethod(self):
return self._window.getCurrentMethod()
@Inputs.data_in
def process(self, scan: TomwerScanBase):
if not isinstance(scan, (TomwerScanBase, type(None))):
......@@ -187,9 +195,18 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
if scan is None:
return
self._window.setScan(scan=scan)
if self.isLocked():
self._window._processScan(scan=scan)
self._skipCurrentScan(new_scan=scan)
if settings.isOnLbsram(scan) and utils.isLowOnMemory(
settings.get_lbsram_path()
):
details = "skip {} because low memory on lbsram".format(str(scan))
self.notify_skip(scan=scan, details=details)
self.Outputs.data_out.send(scan)
else:
self._window.setScan(scan=scan)
if self.isLocked():
self._window._processScan(scan=scan)
def _validate(self, scan):
if scan is None:
......@@ -208,10 +225,18 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
},
process_index=scan.pop_process_index(),
)
if scan.intensity_normalization.tomwer_processing_res is True:
self.notify_succeed(scan=scan)
elif scan.intensity_normalization.tomwer_processing_res is False:
self.notify_failed(scan=scan)
else:
self.notify_skip(scan=scan)
self.Outputs.data_out.send(scan)
def _skipCurrentScan(self, new_scan):
scan = self._widget.getScan()
scan = self._window.getScan()
# if the same scan has been run several scan
if scan is None or str(scan) == str(new_scan):
return
......@@ -226,9 +251,19 @@ class NormIOW(WidgetLongProcessing, SuperviseOW):
self.notify_skip(scan=scan, details=details)
self.Outputs.data_out.send(scan)
def validateCurrentScan(self):
scan = self._window.getScan()
self._validate(scan)
def clear(self):
self._window.clear()
def getConfiguration(self):
return self._window.getConfiguration()
def stop(self):
self._window.stop()
def _updateSettings(self):
self._rpSetting = self._window.getConfiguration()
self._rpSetting["__lock__"] = self.isLocked()
......
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "09/06/2021"
from silx.gui.utils.testutils import TestCaseQt
from silx.gui import qt
from tomwer.core.utils.scanutils import MockHDF5
from orangecontrib.tomwer.widgets.reconstruction.NormIOW import (
NormIOW as _NormIOW,
)
from processview.core.manager import ProcessManager, DatasetState
from tomwer.io.utils.h5pyutils import EntryReader
from tomwer.core.process.reconstruction.normalization import (
IntensityNormalizationProcess,
)
from silx.io.utils import h5py_read_dataset
import tempfile
import shutil
import logging
import os
from tomwer.core import utils, settings
logger = logging.getLogger(__name__)
class NormIOW(_NormIOW):
def __init__(self, parent=None):
self._scans_finished = []
super().__init__(parent)
def processing_finished(self, scan):
self._scans_finished.append(scan)
def wait_processing(self, wait_time):
self._window._processing_stack._computationThread.wait(wait_time)
@property
def scans_finished(self):
return self._scans_finished
def compute(self):
self._window._processCurrentScan()
def setROI(self, start_x, end_x, start_y, end_y):
self._window.setROI(start_x=start_x, end_x=end_x, start_y=start_y, end_y=end_y)
def close(self):
self._scans_finished = {}
super().close()
class TestProcessing(TestCaseQt):
DIM = 100
def setUp(self):
super().setUp()
utils.mockLowMemory(False)
settings.mock_lsbram(False)
self._source_dir = tempfile.mkdtemp()
def create_scan(folder_name):
_dir = os.path.join(self._source_dir, folder_name)
return MockHDF5(
scan_path=_dir,
n_ini_proj=20,
n_proj=20,
n_alignement_proj=2,
create_final_ref=False,
create_ini_dark=True,
create_ini_ref=True,
n_refs=1,
dim=self.DIM,
).scan
# create scans
self.scan_1 = create_scan("scan_1")
self.scan_2 = create_scan("scan_2")
self.scan_3 = create_scan("scan_3")
self._process_manager = ProcessManager()
self.widget = NormIOW()
self.widget.show()
def tearDown(self):
self.widget.setAttribute(qt.Qt.WA_DeleteOnClose)
self.widget.stop()
self.widget.close()
self.widget = None
self.qapp.processEvents()
shutil.rmtree(self._source_dir)
super().tearDown()
def testUnlocked(self):
"""Test result when used with some interaction"""
self.widget.setLocked(False)
def process_scalar_manually():
self.widget.setCurrentMethod("manual ROI")
self.qapp.processEvents()
self.widget.setROI(start_x=0, end_x=10, start_y=0, end_y=10)
self.qapp.processEvents()
self.widget.compute()
self.widget.wait_processing(500000)
self.qapp.processEvents()
self.widget.process(self.scan_1)
process_scalar_manually()
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=self.scan_1.get_dataset_identifier(),
process=self.widget,
),
DatasetState.WAIT_USER_VALIDATION,
)
self.widget.process(self.scan_2)
process_scalar_manually()
self.assertEqual(len(self.widget.scans_finished), 0)
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=self.scan_1.get_dataset_identifier(),
process=self.widget,
),
DatasetState.SKIPPED,
)
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=self.scan_2.get_dataset_identifier(),
process=self.widget,
),
DatasetState.WAIT_USER_VALIDATION,
)
self.widget.process(self.scan_3)
process_scalar_manually()
self.widget.validateCurrentScan()
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=self.scan_3.get_dataset_identifier(),
process=self.widget,
),
DatasetState.SUCCEED,
)
# insure a normalization has been registered
self.assertTrue(self.scan_3.intensity_normalization.tomwer_processing_res, True)
def testTestLbsram(self):
"""Test scan are all validated if 'low memory on lbsram' scenario is
activated"""
utils.mockLowMemory(True)
settings.mock_lsbram(True)
for scan in (self.scan_1, self.scan_2, self.scan_3):
self.widget.process(scan)
self.widget.wait_processing(5000)
self.qapp.processEvents()
for scan in (self.scan_1, self.scan_2, self.scan_3):
with self.subTest(scan=str(scan)):
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=scan.get_dataset_identifier(),
process=self.widget,
),
DatasetState.SKIPPED,
)
def testLocked(self):
"""Test scan are all validated if the widget is lock"""
self.widget.setLocked(True)
for scan in (self.scan_1, self.scan_2, self.scan_3):
self.widget.process(scan)
self.widget.wait_processing(5000)
self.qapp.processEvents()
for scan in (self.scan_1, self.scan_2, self.scan_3):
# test status is SUCCEED
with self.subTest(scan=str(scan)):
self.assertEqual(
self._process_manager.get_dataset_state(
dataset_id=scan.get_dataset_identifier(),
process=self.widget,
),
DatasetState.SUCCEED,
)
# test process file has been updated
with EntryReader(scan.process_file_url) as entry:
self.assertTrue("tomwer_process_0" in entry)
self.assertEqual(
h5py_read_dataset(entry["tomwer_process_0"]["program"]),
IntensityNormalizationProcess.program_name(),
)
......@@ -90,21 +90,28 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
def process(self, scan: typing.Union[None, TomwerScanBase] = None):
if scan is None:
return None
scan.intensity_normalization.tomwer_processing_res = None
params = IntensityNormalizationParams.from_dict(self._settings)
if params.method is Method.MANUAL_ROI:
res = self._compute_from_manual_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.AUTO_ROI:
res = self._compute_from_automatic_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.DATASET:
res = self._compute_from_dataset()
need_conversion_to_tomoscan = True
elif params.method.value in TomoScanMethod.values():
need_conversion_to_tomoscan = False
res = None
try:
if params.method is Method.MANUAL_ROI:
res = self._compute_from_manual_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.AUTO_ROI:
res = self._compute_from_automatic_roi(scan)
need_conversion_to_tomoscan = True
elif params.method is Method.DATASET:
res = self._compute_from_dataset()
need_conversion_to_tomoscan = True
elif params.method.value in TomoScanMethod.values():
need_conversion_to_tomoscan = False
res = None
else:
raise ValueError("method {} is not handled".format(params.method))
except Exception as e:
_logger.error(e)
scan.intensity_normalization.tomwer_processing_res = False
else:
raise ValueError("method {} is not handled".format(params.method))
scan.intensity_normalization.tomwer_processing_res = True
if need_conversion_to_tomoscan:
results_to_tomoscan_norm(
......@@ -113,6 +120,7 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
results=res,
extra_infos=params.extra_infos,
)
return scan
def _compute_from_manual_roi(self, scan):
......@@ -228,6 +236,7 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
)
current_idx = 0
url_idxs = {v.path(): k for k, v in scan.projections.items()}
for proj_index in proj_indexes:
url = proj_compacted[proj_index]
if url_has_been_treated(url):
......@@ -235,10 +244,52 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
append_url(url)
data = silx.io.get_data(url)
def retrieve_data_proj_indexes(url_):
urls = []
for slice in range(
url_.data_slice().start,
url_.data_slice().stop,
url_.data_slice().step or 1,
):
urls.append(
DataUrl(
file_path=url_.file_path(),
data_path=url_.data_path(),
scheme=url_.scheme(),
data_slice=slice,
)
)
# try to retrieve the index from the projections else
# keep the slice index as the frame index (should be the
# case in most case
res = []
for my_url in urls:
my_url_path = my_url.path()
if my_url_path in url_idxs:
res.append(url_idxs[my_url_path])
else:
_logger.warning(
"unable to retrieve frame index from url {}. "
"Take the slice index as frame index".format(
my_url_path
)
)
return res
data_indexes = retrieve_data_proj_indexes(url)
# apply flat field correction
if data.ndim == 2:
projs = (data,)
else:
projs = list(data)
data = scan.flat_field_correction(
projs=(data), proj_indexes=(proj_index,)
)[0]
projs=projs, proj_indexes=data_indexes
)
if data is None:
continue
data = numpy.asarray(data)
if data.ndim is 2:
roi_area[current_idx] = data[start_y:end_y, start_x:end_x]
current_idx += 1
......@@ -254,7 +305,6 @@ class IntensityNormalizationProcess(SingleProcess, SuperviseProcess):
)
else:
raise ValueError("{} is not handled".format(calc_area))
return IntensityNormalizationProcess.compute_stats(roi_area)
@staticmethod
......@@ -368,7 +418,6 @@ def results_to_tomoscan_norm(
scan.intensity_normalization = TomoScanMethod.SCALAR
if results is not None:
if isinstance(results, numpy.ndarray):
extra_infos["value"] = tuple(results)
else:
extra_infos["value"] = results
results = tuple(results)
extra_infos["value"] = results
scan.intensity_normalization.set_extra_infos(extra_infos)
......@@ -135,7 +135,7 @@ class NormIntensityWindow(qt.QMainWindow):
self._centralWidget.setScan(scan=scan)
self._optsWidget.setScan(scan=scan)
def setMode(self, mode):
def setCurrentMethod(self, mode):
self._optsWidget.setCurrentMethod(mode)
def stop(self):
......@@ -148,6 +148,11 @@ class NormIntensityWindow(qt.QMainWindow):
def getROI(self):
return self._centralWidget.getROI()
def setROI(self, start_x, end_x, start_y, end_y):
self._centralWidget.setROI(
start_x=start_x, end_x=end_x, start_y=start_y, end_y=end_y
)
def setResult(self, result):
# once computed update scan normalization values (
# update scalar value for example)
......@@ -221,6 +226,11 @@ class _Viewer(qt.QTabWidget):
def getROI(self):
return self._projView.getROI()
def setROI(self, start_x, end_x, start_y, end_y):
self._projView.setROI(
start_x=start_x, end_x=end_x, start_y=start_y, end_y=end_y
)
class _ProjPlotWithROI(DataViewer):
"""DataViewer specialized on projections. Embed a RectangleROI"""
......@@ -258,6 +268,10 @@ class _ProjPlotWithROI(DataViewer):
def getROI(self):
return self._roi
def setROI(self, start_x, end_x, start_y, end_y):
self._roi.setOrigin((start_x, start_y))
self._roi.setSize((end_x - start_x, end_y - start_y))
def setManualROIVisible(self, visible):
self._roiVisible = visible
self._updateROIVisibility()
......
......@@ -59,7 +59,7 @@ class INormalizationProcessStack(FIFO, qt.QObject):
process=self,
state=DatasetState.ON_GOING,
)
_logger.processStarted("start saaxis {}" "".format(str(scan)))
_logger.processStarted("start intensity normalization {}".format(str(scan)))
assert isinstance(scan, TomwerScanBase)
if isOnLbsram(scan) and isLowOnMemory(get_lbsram_path()) is True:
# if computer is running into low memory on lbsram skip it
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment