Newer
Older
from ..io.config import NabuConfigParser, validate_nabu_config
from .dataset_analyzer import analyze_dataset, DatasetAnalyzer
from .estimators import COREstimator, DetectorTiltEstimator
from .logger import Logger, PrinterLogger
from .params import radios_rotation_mode
from .utils import extract_parameters
class ProcessConfig:
"""
A class for describing the Nabu process configuration.
"""
def __init__(
self,
conf_fname=None,
conf_dict=None,
dataset_infos=None,
checks=True,
remove_unused_radios=True,
):
"""
Initialize a ProcessConfig class.
Parameters
----------
conf_fname: str
Path to the nabu configuration file. If provided, the parameters
`conf_dict` and `dataset_infos` are ignored.
A dictionary describing the nabu processing steps.
If provided, it should be provided along with `dataset_infos` ; in this
case, the parameter `conf_fname` is ignored.
dataset_infos: DatasetAnalyzer
A `DatasetAnalyzer` class instance.
If provided, it should be provided along with `conf_dict` ; in this
case, the parameter `conf_fname` is ignored.
checks: bool, optional, default is True
Whether to perform checks on configuration and datasets (recommended !)
remove_unused_radios: bool, optional, default is True
Whether to remove unused radios, i.e radios present in the dataset,
but not explicitly listed in the scan metadata.
create_logger: str or bool, optional
Whether to create a Logger object. Default is False, meaning that the logger
object creation is left to the user.
If set to True, a Logger object is created, and logs will be written
to the file "nabu_dataset_name.log".
If set to a string, a Logger object is created, and the logs will be written
to the file specified by this string.
"""
args_error_msg = (
"You must either provide conf_fname, or (conf_dict and dataset_infos)"
)
if conf_fname is not None:
if (conf_dict is not None) or (dataset_infos is not None):
raise ValueError(args_error_msg)
if not (os.path.isfile(conf_fname)):
raise ValueError("No such file: %s" % conf_fname)
conf = NabuConfigParser(conf_fname).conf_dict
self.nabu_config = validate_nabu_config(conf)
self._create_logger(create_logger)
dataset_infos = analyze_dataset(
self.nabu_config["dataset"]["location"],
processes_file=self.nabu_config["preproc"]["processes_file"],
extra_options={
"force_flatfield": self.nabu_config["preproc"]["flatfield_enabled"] == "forced",
"exclude_projections": self.nabu_config["dataset"]["exclude_projections"],
"output_dir": self.nabu_config["output"]["location"],
"hdf5_entry": self.nabu_config["dataset"]["hdf5_entry"],
)
else:
if (conf_dict is None) or (dataset_infos is None):
raise ValueError(args_error_msg)
self.nabu_config = validate_nabu_config(conf_dict)
self._create_logger(create_logger)
if not(isinstance(dataset_infos, DatasetAnalyzer)):
raise ValueError("Expected a DatasetAnalyzer instance for dataset_infos")
self.dataset_infos = dataset_infos
self.checks = checks
self.remove_unused_radios = remove_unused_radios
self.validation_stage2()
self.build_processing_steps()
def _create_logger(self, create_logger):
if create_logger is False:
self.logger = PrinterLogger()
return
elif create_logger is True:
dataset_loc = self.nabu_config["dataset"]["location"]
dataset_fname_rel = os.path.basename(dataset_loc)
logger_filename = os.path.join(
os.path.abspath(os.getcwd()),
os.path.splitext(dataset_fname_rel)[0] + "_nabu.log"
)
logger_filename = os.path.join(
os.path.abspath(os.getcwd()),
dataset_fname_rel + "_nabu.log"
)
elif isinstance(create_logger, str):
logger_filename = create_logger
else:
raise ValueError("Expected bool or str for create_logger")
self.logger = Logger(
"nabu",
level=self.nabu_config["about"]["verbosity"],
logfile=logger_filename
)
def _get_cor(self):
cor = self.nabu_config["reconstruction"]["rotation_axis_position"]
if isinstance(cor, str): # auto-CoR
self.dataset_infos,
halftomo=self.nabu_config["reconstruction"]["enable_halftomo"],
do_flatfield=self.nabu_config["preproc"]["flatfield_enabled"],
cor_options=self.nabu_config["reconstruction"]["cor_options"],
cor = self.corfinder.find_cor(search_method=cor)
self.dataset_infos.axis_position = cor
def _get_tilt(self):
tilt = self.nabu_config["preproc"]["tilt_correction"]
# Deprecate key "rotate_projections" ?
user_rot_projs = self.nabu_config["preproc"]["rotate_projections"]
if user_rot_projs is not None:
msg = "=" * 80 + "\n"
msg += "Both 'detector_tilt' and 'rotate_projections' options were provided. The option 'rotate_projections' will take precedence. This means that the projections will be rotated by %f degrees and the option 'detector_tilt' will be ignored." % user_rot_projs
msg += "\n" + "=" * 80
self.logger.warning(msg)
tilt = user_rot_projs
#
if isinstance(tilt, str): # auto-tilt
self.tilt_estimator = DetectorTiltEstimator(
self.dataset_infos,
logger=self.logger,
autotilt_options=self.nabu_config["preproc"]["autotilt_options"]
)
tilt = self.tilt_estimator.find_tilt(tilt_method=tilt)
self.dataset_infos.detector_tilt = tilt
def validation_stage2(self):
validator = NabuValidator(self.nabu_config, self.dataset_infos)
if self.checks:
validator.perform_all_checks(remove_unused_radios=self.remove_unused_radios)
def get_radios_rotation_mode(self):
"""
Determine whether projections are to be rotated, and if so, when they are to be rotated.
Returns
-------
method: str or None
Rotation method: one of the values of `nabu.resources.params.radios_rotation_mode`
"""
user_rotate_projections = self.nabu_config["preproc"]["rotate_projections"]
tilt = self.dataset_infos.detector_tilt
phase_method = self.nabu_config["phase"]["method"]
do_ctf = phase_method == "CTF"
do_pag = phase_method == "paganin"
do_unsharp = self.nabu_config["phase"]["unsharp_coeff"] > 0
if user_rotate_projections is None and tilt is None:
return None
if do_ctf:
return "full"
# TODO "chunked" rotation is done only when using a "processing margin"
# For now the processing margin is enabled only if phase or unsharp is enabled.
# We can either
# - Enable processing margin if rotating projections is needed (more complicated to implement)
# - Always do "full" rotation (simpler to implement, at the expense of performances)
if do_pag or do_unsharp:
else:
return "full"
def build_processing_steps(self):
"""
Build a list of processing steps from a ProcessConfig instance.
The returned structures are a more compact and ready-to-use representation
of the two main fields of ProcessConfig (dataset_infos and nabu_config).
"""
nabu_config = self.nabu_config
dataset_infos = self.dataset_infos
binning = (nabu_config["dataset"]["binning"], nabu_config["dataset"]["binning_z"])
tasks = []
options = {}
#
# Dataset / Get data
#
# First thing to do is to get the data (radios or sinograms)
# For now data is assumed to be on disk (see issue #66).
tasks.append("read_chunk")
options["read_chunk"] = {
"files": dataset_infos.projections, # TODO handle sinograms
"dataset_subsampling": nabu_config["dataset"]["projections_subsampling"]
#
if nabu_config["preproc"]["flatfield_enabled"]:
tasks.append("flatfield")
options["flatfield"] = {
# ChunkReader handles binning/subsampling by itself,
# but FlatField needs "real" indices (after binning/subsampling)
"projs_indices": dataset_infos._projs_indices_subsampled,
}
if nabu_config["preproc"]["ccd_filter_enabled"]:
tasks.append("ccd_correction")
options["ccd_correction"] = {
"type": "median_clip", # only one available for now
"median_clip_thresh": nabu_config["preproc"]["ccd_filter_threshold"],
}
if nabu_config["preproc"]["double_flatfield_enabled"]:
tasks.append("double_flatfield")
options["double_flatfield"] = {
"sigma": nabu_config["preproc"]["dff_sigma"],
}
#
# Radios rotation (do it here if possible)
if self.get_radios_rotation_mode() == "chunk":
tasks.append("rotate_projections")
options["rotate_projections"] = {
"angle": nabu_config["preproc"]["rotate_projections"] or dataset_infos.detector_tilt,
"center": nabu_config["preproc"]["rotate_projections_center"],
if nabu_config["phase"]["method"] is not None:
tasks.append("phase")
options["phase"] = copy_dict_items(
nabu_config["phase"], ["method", "delta_beta", "margin", "padding_type"]
options["phase"].update({
"energy_kev": dataset_infos.energy,
"distance_cm": dataset_infos.distance * 1e2,
"pixel_size_microns": dataset_infos.pixel_size,
"pixel_size_m": dataset_infos.pixel_size * 1e-6,
if binning != (1, 1):
options["phase"]["delta_beta"] /= (binning[0] * binning[1])
if options["phase"]["method"] == "CTF":
self._get_ctf_parameters(options["phase"])
if nabu_config["phase"]["unsharp_coeff"] > 0:
tasks.append("unsharp_mask")
options["unsharp_mask"] = copy_dict_items(
nabu_config["phase"], ["unsharp_coeff", "unsharp_sigma"]
)
if nabu_config["preproc"]["take_logarithm"]:
tasks.append("take_log")
options["take_log"] = copy_dict_items(nabu_config["preproc"], ["log_min_clip", "log_max_clip"])
# Radios rotation (do it here if mode=="full")
#
if self.get_radios_rotation_mode() == "full":
tasks.append("rotate_projections")
options["rotate_projections"] = {
"angle": nabu_config["preproc"]["rotate_projections"] or dataset_infos.detector_tilt,
"center": nabu_config["preproc"]["rotate_projections_center"],
# Translation movements
#
translations = dataset_infos.translations
if translations is not None:
tasks.append("radios_movements")
options["radios_movements"] = {
"translation_movements": dataset_infos.translations
}
#
# Sinogram normalization (before half-tomo)
#
if nabu_config["preproc"]["sino_normalization"] is not None:
tasks.append("sino_normalization")
options["sino_normalization"] = {
"method": nabu_config["preproc"]["sino_normalization"]
}
#
# Sinogram-based rings artefacts removal
#
if nabu_config["preproc"]["sino_rings_correction"]:
tasks.append("sino_rings_correction")
options["sino_rings_correction"] = {
"user_options": nabu_config["preproc"]["sino_rings_options"],
if nabu_config["reconstruction"]["method"] is not None:
tasks.append("build_sino")
options["build_sino"] = copy_dict_items(
nabu_config["reconstruction"],
["rotation_axis_position", "enable_halftomo", "start_x", "end_x",
"start_y", "end_y", "start_z", "end_z"]
)
options["build_sino"]["axis_correction"] = dataset_infos.axis_correction
tasks.append("reconstruction")
# Iterative is not supported through configuration file for now.
options["reconstruction"] = copy_dict_items(
nabu_config["reconstruction"],
["method", "rotation_axis_position", "fbp_filter_type",
"padding_type", "enable_halftomo",
"start_x", "end_x", "start_y", "end_y", "start_z", "end_z"]
)
rec_options["rotation_axis_position"] = dataset_infos.axis_position
options["build_sino"]["rotation_axis_position"] = dataset_infos.axis_position
rec_options["axis_correction"] = dataset_infos.axis_correction
rec_options["angles"] = dataset_infos.reconstruction_angles
rec_options["radio_dims_y_x"] = dataset_infos.radio_dims[::-1]
rec_options["pixel_size_cm"] = dataset_infos.pixel_size * 1e-4 # pix size is in microns
if rec_options["enable_halftomo"]:
rec_options["angles"] = rec_options["angles"][:rec_options["angles"].size//2]
cor_i = int(round(rec_options["rotation_axis_position"]))
rec_options["rotation_axis_position_halftomo"] = (2*cor_i-1)/2.
rec_options["cor_estimated_auto"] = isinstance(nabu_config["reconstruction"]["rotation_axis_position"], str)
#
# Histogram
#
if nabu_config["postproc"]["output_histogram"]:
tasks.append("histogram")
options["histogram"] = copy_dict_items(
nabu_config["postproc"], ["histogram_bins"]
)
#
# Save
#
if nabu_config["output"]["location"] is not None:
tasks.append("save")
options["save"] = copy_dict_items(
nabu_config["output"], list(nabu_config["output"].keys())
)
options["save"]["overwrite"] = nabu_config["output"]["overwrite_results"]
self.processing_steps = tasks
self.processing_options = options
if set(self.processing_steps) != set(self.processing_options.keys()):
raise ValueError("Something wrong with process_config: options do not correspond to steps")
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def _get_ctf_parameters(self, phase_options):
dataset_info = self.dataset_infos
user_phase_options = self.nabu_config["phase"]
ctf_geom = extract_parameters(user_phase_options["ctf_geometry"])
ctf_advanced_params = extract_parameters(user_phase_options["ctf_advanced_params"])
# z1_vh
z1_v = ctf_geom["z1_v"]
z1_h = ctf_geom["z1_h"]
z1_vh = None
if z1_h is None and z1_v is None:
# parallel beam
z1_vh = None
elif (z1_v is None) ^ (z1_h is None):
# only one is provided: source-sample distance
z1_vh = z1_v or z1_h
if z1_h is not None and z1_v is not None:
# distance of the vertically focused source (horizontal line) and the horizontaly focused source (vertical line)
# for KB mirrors
z1_vh = (z1_v, z1_h)
# pix_size_det
pix_size_det = ctf_geom["detec_pixel_size"] or dataset_info.pixel_size * 1e-6
# wavelength
wavelength = 1.23984199e-9 / dataset_info.energy
phase_options["ctf_geo_pars"] = {
"z1_vh": z1_vh,
"z2": phase_options["distance_m"],
"pix_size_det": pix_size_det,
"wavelength": wavelength,
"magnification": bool(ctf_geom["magnification"]),
"length_scale": ctf_advanced_params["length_scale"]
}
phase_options["ctf_lim1"] = ctf_advanced_params["lim1"]
phase_options["ctf_lim2"] = ctf_advanced_params["lim2"]
if user_phase_options["ctf_correct_flat_distorsion"]:
phase_options["ctf_flat_distorsion_params"] = extract_parameters(user_phase_options["ctf_flat_distorsion_params"])
for what in ["ctf_normalize_by_mean", "ctf_translations", "ctf_correct_flat_distorsion"]:
phase_options[what] = user_phase_options[what]