Commit 4a159813 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Merge branch 'half_tomo_cor_preproc' into 'master'

Half tomo: auto-CoR

See merge request !59
parents d5eb5513 e21b70cc
Pipeline #35076 passed with stages
in 13 minutes and 27 seconds
This diff is collapsed.
......@@ -3,8 +3,12 @@ import numpy as np
import os
import h5py
from silx.resources import ExternalResources
from nabu.testutils import get_data as nabu_get_data
try:
import scipy.ndimage
__has_scipy__ = True
except ImportError:
__has_scipy__ = False
......@@ -12,6 +16,7 @@ except ImportError:
from nabu.preproc import alignment
from nabu.testutils import utilstest, __do_long_tests__
@pytest.fixture(scope="class")
def bootstrap_base(request):
cls = request.cls
......@@ -25,6 +30,7 @@ def bootstrap_cor(request):
cls.data, calib_data = get_cor_data_h5("test_alignment_cor.h5")
cls.cor_gl_pix, cls.cor_hl_pix, cls.tilt_deg = calib_data
cls.ht_im1, cls.ht_im2, cls.ht_cor = get_cor_data_half_tomo()
@pytest.fixture(scope="class")
......@@ -42,8 +48,18 @@ def bootstrap_fcs(request):
cls.abs_tol_dist = 1e-2
cls.abs_tol_tilt = 2.5e-4
cls.data, cls.img_pos, cls.pixel_size, (calib_data_std, calib_data_angle) = get_focus_data("test_alignment_focus.h5")
cls.angle_best_ind, cls.angle_best_pos, cls.angle_tilt_v, cls.angle_tilt_h = calib_data_angle
(
cls.data,
cls.img_pos,
cls.pixel_size,
(calib_data_std, calib_data_angle),
) = get_focus_data("test_alignment_focus.h5")
(
cls.angle_best_ind,
cls.angle_best_pos,
cls.angle_tilt_v,
cls.angle_tilt_h,
) = calib_data_angle
cls.std_best_ind, cls.std_best_pos = calib_data_std
......@@ -58,14 +74,37 @@ def get_cor_data_h5(*dataset_path):
with h5py.File(dataset_downloaded_path, "r") as hf:
data = hf["/entry/instrument/detector/data"][()]
cor_global_pix = hf['/calibration/alignment/global/x_rotation_axis_pixel_position'][()]
cor_global_pix = hf["/calibration/alignment/global/x_rotation_axis_pixel_position"][()]
cor_highlow_pix = hf['/calibration/alignment/highlow/x_rotation_axis_pixel_position'][()]
tilt_deg = hf['/calibration/alignment/highlow/z_camera_tilt'][()]
cor_highlow_pix = hf["/calibration/alignment/highlow/x_rotation_axis_pixel_position"][()]
tilt_deg = hf["/calibration/alignment/highlow/z_camera_tilt"][()]
return data, (cor_global_pix, cor_highlow_pix, tilt_deg)
def get_cor_data_half_tomo():
"""Obtains two weakly overlapping images with features plus spurious noise for a challenging test of cor retrieval for half tomo."""
datasource = ExternalResources(project="nabu", url_base=None)
myfile = os.path.join(datasource.data_home, "data_for_ht_cor.h5")
if not os.path.isfile(myfile):
im1, im2, cor = _get_challenging_ImsCouple_for_halftomo_cor()
f = h5py.File(myfile, "w")
f["im1"] = np.array(im1, "f")
f["im2"] = np.array(im2, "f")
f["cor"] = cor
assert os.path.isfile(myfile)
with h5py.File(myfile, "r") as hf:
im1 = hf["im1"][()]
im2 = hf["im2"][()]
cor = hf["cor"][()]
return im1, im2, cor
def get_alignxc_data(*dataset_path):
"""
Get a dataset file from silx.org/pub/nabu/data
......@@ -79,10 +118,10 @@ def get_alignxc_data(*dataset_path):
img_pos = hf["/entry/instrument/detector/distance"][()]
unit_length_shifts_vh = [
hf['/calibration/alignxc/y_pixel_shift_unit'][()],
hf['/calibration/alignxc/x_pixel_shift_unit'][()]
hf["/calibration/alignxc/y_pixel_shift_unit"][()],
hf["/calibration/alignxc/x_pixel_shift_unit"][()],
]
all_shifts_vh = hf['/calibration/alignxc/yx_pixel_offsets'][()]
all_shifts_vh = hf["/calibration/alignxc/yx_pixel_offsets"][()]
return data, img_pos, (unit_length_shifts_vh, all_shifts_vh)
......@@ -99,24 +138,87 @@ def get_focus_data(*dataset_path):
data = hf["/entry/instrument/detector/data"][()]
img_pos = hf["/entry/instrument/detector/distance"][()]
pixel_size = np.mean([
hf['/entry/instrument/detector/x_pixel_size'][()],
hf['/entry/instrument/detector/y_pixel_size'][()]
])
pixel_size = np.mean(
[
hf["/entry/instrument/detector/x_pixel_size"][()],
hf["/entry/instrument/detector/y_pixel_size"][()],
]
)
angle_best_ind = hf['/calibration/focus/angle/best_img'][()]
angle_best_pos = hf['/calibration/focus/angle/best_pos'][()]
angle_tilt_v = hf['/calibration/focus/angle/tilt_v_rad'][()]
angle_tilt_h = hf['/calibration/focus/angle/tilt_h_rad'][()]
angle_best_ind = hf["/calibration/focus/angle/best_img"][()]
angle_best_pos = hf["/calibration/focus/angle/best_pos"][()]
angle_tilt_v = hf["/calibration/focus/angle/tilt_v_rad"][()]
angle_tilt_h = hf["/calibration/focus/angle/tilt_h_rad"][()]
std_best_ind = hf['/calibration/focus/std/best_img'][()]
std_best_pos = hf['/calibration/focus/std/best_pos'][()]
std_best_ind = hf["/calibration/focus/std/best_img"][()]
std_best_pos = hf["/calibration/focus/std/best_pos"][()]
calib_data_angle = (angle_best_ind, angle_best_pos, angle_tilt_v, angle_tilt_h)
calib_data_std = (std_best_ind, std_best_pos)
return data, img_pos, pixel_size, (calib_data_std, calib_data_angle)
def _peaks2im(poss_y, poss_x, widths, ny, nx):
"""Given a set of positions, widths and the image shape creates an image with spots"""
res = np.zeros([ny, nx], "f")
ys = np.arange(ny)
xs = np.arange(nx)
for py, px, w in zip(poss_y, poss_x, widths):
y = ys - py
x = xs - px
add = np.exp(-((y * y)[:, None] + x * x) / (2.0 * w * w))
res[:] += add
return res
def _get_challenging_ImsCouple_for_halftomo_cor():
"""Creates two weakly overlapping images with features plus spurious noise for a challenging test of cor retrieval for half tomo."""
np.random.seed(0)
n_y = 400
n_x = 500
npoints = 2000
poss_x = -n_x / 2.0 + 2 * n_x * np.random.random(npoints)
poss_y = -n_y / 2.0 + 2 * n_y * np.random.random(npoints)
widths = 3 * np.random.random(npoints)
im1 = _peaks2im(poss_y, poss_x, widths, n_y, n_x)
center_x = 400.123
shift_y = 3.1415
poss_x_2 = 2 * center_x - poss_x
poss_y_2 = shift_y + poss_y
im2 = _peaks2im(poss_y_2, poss_x_2, widths, n_y, n_x)
npoints_spurious = 200
spurious_poss_x = n_x * np.random.random(npoints_spurious)
spurious_poss_y = n_y * np.random.random(npoints_spurious)
widths = [20.0] * npoints_spurious
spurious_im = _peaks2im(spurious_poss_y, spurious_poss_x, widths, n_y, n_x)
im1[:] += spurious_im * 0.1
spurious_poss_x = n_x * np.random.random(npoints_spurious)
spurious_poss_y = n_y * np.random.random(npoints_spurious)
widths = [20.0] * npoints_spurious
spurious_im = _peaks2im(spurious_poss_y, spurious_poss_x, widths, n_y, n_x)
im2[:] += spurious_im * 0.1
noise_level = 0.2
noise_ima1 = np.random.normal(0.0, size=[n_y, n_x]) * noise_level
noise_ima2 = np.random.normal(0.0, size=[n_y, n_x]) * noise_level
im1[:] += noise_ima1
im2[:] += noise_ima2
return im1, im2, (center_x - (n_x - 1) / 2.0)
@pytest.mark.usefixtures("bootstrap_base")
class TestAlignmentBase(object):
def test_peak_fitting_2d_3x3(self):
......@@ -164,7 +266,10 @@ class TestAlignmentBase(object):
cc_coords = np.arange(0, 8)
(found_peaks_val, found_peaks_pos) = alignment.AlignmentBase.extract_peak_regions_1d(img, axis=-1, cc_coords=cc_coords)
(
found_peaks_val,
found_peaks_pos,
) = alignment.AlignmentBase.extract_peak_regions_1d(img, axis=-1, cc_coords=cc_coords)
message = "The found peak positions do not correspond to the expected peak positions:\n Expected: %s\n Found: %s" % (
peaks_pos,
found_peaks_pos[1, :],
......@@ -197,10 +302,9 @@ class TestCor(object):
message = "Computed CoR %f " % cor_position + " and real CoR %f do not coincide" % self.cor_gl_pix
assert np.isclose(self.cor_gl_pix, cor_position, atol=self.abs_tol), message
@pytest.mark.skipif(not(__has_scipy__), reason="need scipy for this test")
@pytest.mark.skipif(not (__has_scipy__), reason="need scipy for this test")
def test_noisyHF_cor_posx(self):
""" test with noise at high frequencies
"""
"""test with noise at high frequencies"""
radio1 = self.data[0, :, :]
radio2 = np.fliplr(self.data[1, :, :])
......@@ -222,6 +326,81 @@ class TestCor(object):
message = "Computed CoR %f " % cor_position + " and real CoR %f do not coincide" % self.cor_gl_pix
assert np.isclose(self.cor_gl_pix, cor_position, atol=self.abs_tol), message
def test_half_tomo_cor_Err(self):
"""test that the test is challenging enough"""
radio1 = self.ht_im1
radio2 = np.fliplr(self.ht_im2)
cor_pos = self.ht_cor
CoR_calc = alignment.CenterOfRotation()
cor_position = CoR_calc.find_shift(radio1, radio2, low_pass=1, high_pass=20.0)
message = (
"Computed CoR %f " % cor_position
+ " and real CoR %f should not coincide when using the standard algorithm with half tomo data" % cor_pos
)
assert not np.isclose(cor_pos, cor_position, atol=self.abs_tol), message
def test_half_tomo_cor(self):
"""test that the half tomo algorithm is precise enough"""
radio1 = self.ht_im1
radio2 = np.fliplr(self.ht_im2)
cor_pos = self.ht_cor
CoR_calc = alignment.CenterOfRotationAdaptiveSearch()
cor_position = CoR_calc.find_shift(radio1, radio2, low_pass=1, high_pass=20)
message = (
"Computed CoR %f " % cor_position
+ " and real CoR %f should coincide when using the halftomo algorithm with hald tomo data" % cor_pos
)
assert np.isclose(cor_pos, cor_position, atol=self.abs_tol), message
def test_half_tomo_cor_exp(self):
"""test the half_tomo algorithm on experimental data """
radios = nabu_get_data("ha_autocor_radios.npz")
radio1 = radios["radio1"]
radio2 = radios["radio2"]
cor_pos = radios["cor_pos"]
radio2 = np.fliplr(radio2)
CoR_calc = alignment.CenterOfRotationAdaptiveSearch()
cor_position = CoR_calc.find_shift(radio1, radio2, low_pass=1, high_pass=20, filtered_cost=True)
print("Found cor_position", cor_position)
message = (
"Computed CoR %f " % cor_position
+ " and real CoR %f should coincide when using the halftomo algorithm with hald tomo data" % cor_pos
)
assert np.isclose(cor_pos, cor_position, atol=self.abs_tol), message
def test_half_tomo_cor_exp_limited(self):
"""test the hal_tomo algorithm on experimental data and global search with limits"""
radios = nabu_get_data("ha_autocor_radios.npz")
radio1 = radios["radio1"]
radio2 = radios["radio2"]
cor_pos = radios["cor_pos"]
radio2 = np.fliplr(radio2)
CoR_calc = alignment.CenterOfRotationAdaptiveSearch()
cor_position = CoR_calc.find_shift(radio1, radio2, low_pass=1, high_pass=20, margins=(100, 10), filtered_cost=False)
print("Found cor_position", cor_position)
message = (
"Computed CoR %f " % cor_position
+ " and real CoR %f should coincide when using the halftomo algorithm with hald tomo data" % cor_pos
)
assert np.isclose(cor_pos, cor_position, atol=self.abs_tol), message
def test_cor_posx_linear(self):
radio1 = self.data[0, :, :]
radio2 = np.fliplr(self.data[1, :, :])
......@@ -297,10 +476,13 @@ class TestDetectorTranslation(object):
)
assert np.all(np.isclose(self.expected_shifts_vh, [shifts_v, shifts_h], atol=self.abs_tol)), message
message = "Computed shifts %s and expected %s do not coincide" % (found_shifts_list, self.all_shifts_vh)
message = "Computed shifts %s and expected %s do not coincide" % (
found_shifts_list,
self.all_shifts_vh,
)
assert np.all(np.isclose(found_shifts_list, self.all_shifts_vh, atol=self.abs_tol)), message
@pytest.mark.skipif(not(__has_scipy__), reason="need scipy for this test")
@pytest.mark.skipif(not (__has_scipy__), reason="need scipy for this test")
def test_alignxc_synth(self):
T_calc = alignment.DetectorTranslationAlongBeam()
......@@ -317,11 +499,14 @@ class TestDetectorTranslation(object):
stack, np.array([0.0, 1, 2, 3]), high_pass=1.0, return_shifts=True
)
message = "Found shifts per units %s and reference %s do not coincide" % ((shifts_v, shifts_h), (-1.234 * 2, -1.234))
message = "Found shifts per units %s and reference %s do not coincide" % (
(shifts_v, shifts_h),
(-1.234 * 2, -1.234),
)
assert np.all(np.isclose((shifts_v, shifts_h), (-1.234 * 2, -1.234), atol=self.abs_tol)), message
@pytest.mark.skipif(not(__do_long_tests__), reason="need environment variable NABU_LONG_TESTS=1")
@pytest.mark.skipif(not (__do_long_tests__), reason="need environment variable NABU_LONG_TESTS=1")
@pytest.mark.usefixtures("bootstrap_fcs")
class TestFocus(object):
def test_find_distance(self):
......@@ -345,8 +530,11 @@ class TestFocus(object):
assert np.isclose(self.angle_best_ind, focus_ind, atol=self.abs_tol_dist), message
expected_tilts_vh = np.squeeze(np.array([self.angle_tilt_v, self.angle_tilt_h]))
computed_tilts_vh = - tilts_vh / (self.pixel_size / 1000)
message = "Computed tilts %s and expected %s do not coincide" % (computed_tilts_vh, expected_tilts_vh)
computed_tilts_vh = -tilts_vh / (self.pixel_size / 1000)
message = "Computed tilts %s and expected %s do not coincide" % (
computed_tilts_vh,
expected_tilts_vh,
)
assert np.all(np.isclose(computed_tilts_vh, expected_tilts_vh, atol=self.abs_tol_tilt)), message
def test_size_determination(self):
......
import numpy as np
from silx.io import get_data
from ..preproc.ccd import FlatField
from ..preproc.alignment import CenterOfRotation
from ..preproc.alignment import CenterOfRotation, CenterOfRotationAdaptiveSearch
class CORFinder:
"""
......@@ -29,23 +29,35 @@ class CORFinder:
self._init_flatfield()
self._apply_flatfield()
self.cor = CenterOfRotation()
self._default_search_method = "centered"
if self.halftomo:
self._default_search_method = "global"
self.cor = CenterOfRotationAdaptiveSearch()
def _get_angles(self, angles):
dataset_angles = self.dataset_info.rotation_angles
if dataset_angles is None:
if angles is None: # should not happen with hdf5
print("Warning: no information on angles was found for this dataset. Using default [0, 180[ range.")
angles = np.linspace(0, np.pi, len(self.dataset_info.projections), False)
theta_min = 0
theta_max = np.pi
msg = "Warning: no information on angles was found for this dataset. Using default range "
endpoint = False
if self.halftomo:
theta_max *= 2
endpoint = True
msg += "[0, 360]"
else:
msg += "[0, 180["
print(msg)
angles = np.linspace(
theta_min, theta_max, len(self.dataset_info.projections),
endpoint=endpoint
)
dataset_angles = angles
self.angles = dataset_angles
def _init_radios(self):
# TODO
if self.halftomo:
raise NotImplementedError("Automatic COR with half tomo is not supported yet")
#
# We take 2 radios. It could be tuned for a 360 degrees scan.
self._n_radios = 2
self._radios_indices = []
......@@ -80,24 +92,37 @@ class CORFinder:
self.flatfield.normalize_radios(self.radios)
def find_cor(self, **cor_kwargs):
def find_cor(self, search_method=None, **cor_kwargs):
"""
Find the center of rotation.
Parameters
----------
This function passes the named parameters to nabu.preproc.alignment.CenterOfRotation.find_shift.
search_method: str, optional
Which CoR search method to use. Default is "auto" (equivalent to "centered").
Returns
-------
cor: float
The estimated center of rotation for the current dataset.
Notes
------
This function passes the named parameters to nabu.preproc.alignment.CenterOfRotation.find_shift.
"""
shift = self.cor.find_shift(
self.radios[0],
np.fliplr(self.radios[1]),
**cor_kwargs
)
search_method = search_method or self._default_search_method
if search_method == "global":
shift = self.cor.find_shift(
self.radios[0],
np.fliplr(self.radios[1]),
low_pass=1, high_pass=20
)
else:
shift = self.cor.find_shift(
self.radios[0],
np.fliplr(self.radios[1]),
**cor_kwargs
)
# find_shift returned a single scalar in 2020.1
# This should be the default after 2020.2 release
if hasattr(shift, "__iter__"):
......
......@@ -58,7 +58,7 @@ class NabuValidator(object):
ny = nx
if self.nabu_config["reconstruction"]["enable_halftomo"]:
if self.dataset_infos.axis_position is None:
raise ValueError("rotation_axis_position should be either a number or 'auto' for half tomo")
raise ValueError("Cannot use rotation axis position in the middle of the detector when half tomo is enabled")
cor = int(round(self.dataset_infos.axis_position))
ny = nx = 2*cor
what = (
......@@ -199,7 +199,7 @@ class NabuValidator(object):
nx, nz = self.dataset_infos.radio_dims
rec_params = self.nabu_config["reconstruction"]
if rec_params["enable_halftomo"]:
cor = int(round(rec_params["rotation_axis_position"]))
cor = int(round(self.dataset_infos.axis_position))
ny = nx = 2*cor
what = (
("start_x", "end_x", nx),
......
......@@ -134,3 +134,17 @@ sino_normalizations = {
"": None,
"chebyshev": "chebyshev",
}
class CorMethods(Enum):
AUTO = "centered"
CENTERED = "centered"
GLOBAL = "global"
cor_methods = {
"auto": "centered",
"centered": "centered",
"global": "global",
}
......@@ -78,12 +78,12 @@ class ProcessConfig:
def _get_cor(self):
cor = self.nabu_config["reconstruction"]["rotation_axis_position"]
if cor == "auto":
if isinstance(cor, str): # auto-CoR
self.corfinder = CORFinder(
self.dataset_infos,
halftomo=self.nabu_config["reconstruction"]["enable_halftomo"],
)
cor = self.corfinder.find_cor()
cor = self.corfinder.find_cor(search_method=cor)
self.dataset_infos.axis_position = cor
......@@ -223,7 +223,7 @@ class ProcessConfig:
# New key
rec_options["rotation_axis_position_halftomo"] = (2*cor_i-1)/2.
# New key
rec_options["cor_estimated_auto"] = (nabu_config["reconstruction"]["rotation_axis_position"] == "auto")
rec_options["cor_estimated_auto"] = isinstance(nabu_config["reconstruction"]["rotation_axis_position"], str)
#
# Histogram
......
......@@ -217,16 +217,18 @@ def optional_float_validator(val):
@validator
def cor_validator(val):
if isinstance(val, float):
return val
elif len(val.strip()) >= 1:
if val.lower() == "auto":
return "auto"
val_float, error = convert_to_float(val)
assert error is None, "Invalid number"
val_float, error = convert_to_float(val)
if error is None:
return val_float
else:
if len(val.strip()) == 0:
return None
val = name_range_checker(
val.lower(),
CorMethods.values(),
"center of rotation estimation method",
replacements=cor_methods
)
return val
@validator
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment