Commit 82c09a6f authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Merge branch 'dff_limited_mem' into 'master'

Double Flat-field for limited-memory case

Closes #119

See merge request !43
parents 55bbe459 a6439337
Pipeline #27123 passed with stages
in 2 minutes and 59 seconds
from os import path
import numpy as np
from .logger import LoggerOrPrint
from .utils import use_options, pipeline_step
from ..utils import check_supported
from ..io.reader import ChunkReader
from ..preproc.ccd import FlatField, Log
......@@ -103,47 +104,11 @@ class FullFieldPipeline:
self.z_max = sub_region[-1]
def _get_process_name(self):
# TODO support "incomplete" processing pipeline
return "reconstruction"
#
# Decorators and callback mechanism
#
def use_options(step_name, step_attr):
def decorator(func):
def wrapper(*args, **kwargs):
self = args[0]
if step_name not in self.processing_steps:
self.__setattr__(step_attr, None)
return
self._steps_name2component[step_name] = step_attr
self._steps_component2name[step_attr] = step_name
return func(*args, **kwargs)
return wrapper
return decorator
def pipeline_step(step_attr, step_desc):
def decorator(func):
def wrapper(*args, **kwargs):
self = args[0]
if self.__getattribute__(step_attr) is None:
return
self.logger.info(step_desc)
res = func(*args, **kwargs)
self.logger.debug("End " + step_desc)
callback = self._callbacks.get(self._steps_component2name[step_attr], None)
if callback is not None:
callback()
return res
return wrapper
return decorator
def register_callback(self, step_name, callback):
"""
Register a callback for a pipeline processing step.
......@@ -184,6 +149,7 @@ class FullFieldPipeline:
self.radios = self.chunk_reader.files_data
self.radios_shape = self.radios.shape
self._sinobuilder_radios_shape = self.radios_shape
self._dff_radios_shape = self.radios_shape
def _process_finalize(self):
......@@ -244,7 +210,7 @@ class FullFieldPipeline:
def _init_double_flatfield(self):
options = self.processing_options["double_flatfield"]
self.double_flatfield = self.DoubleFlatFieldClass(
self.radios_shape,
self._dff_radios_shape,
result_url=None,
input_is_mlog=False,
output_is_mlog=False,
......@@ -298,9 +264,12 @@ class FullFieldPipeline:
self.sinos = None
else:
self._sinobuilder_copy = True
self.sinos = self._allocate_array(self.sino_builder.output_shape, "f", name="sinos")
self.sinos = self._allocate_sinobuilder_output()
self._sinobuilder_output = self.sinos
def _allocate_sinobuilder_output(self):
return self._allocate_array(self.sino_builder.output_shape, "f", name="sinos")
@use_options("reconstruction", "reconstruction")
def _prepare_reconstruction(self):
......@@ -308,8 +277,11 @@ class FullFieldPipeline:
x_s, x_e = options["start_x"], options["end_x"]+1
y_s, y_e = options["start_y"], options["end_y"]+1
self._rec_roi = (x_s, x_e, y_s, y_e)
self.n_slices = self.chunk_size # TODO modify with vertical shifts
self.recs = self._allocate_array((self.n_slices, y_e - y_s, x_e - x_s), "f", name="recs")
self.n_slices = self.radios_shape[1] # TODO modify with vertical shifts
self._allocate_recs(y_e - y_s, x_e - x_s)
def _allocate_recs(self, ny, nx):
self.recs = self._allocate_array((self.n_slices, ny, nx), "f", name="recs")
@use_options("reconstruction", "reconstruction")
......@@ -358,7 +330,7 @@ class FullFieldPipeline:
err = "File already exists: %s" % self.fname
if options["overwrite"]:
if options.get("warn_overwrite", True):
self.logger.warning(err + ". It will be overwritten on user request")
self.logger.warning(err + ". It will be overwritten as requested in configuration")
options["warn_overwrite"] = False
else:
self.logger.fatal(err)
......@@ -417,8 +389,10 @@ class FullFieldPipeline:
self.flatfield.normalize_radios(self.radios)
@pipeline_step("double_flatfield", "Applying double flat-field")
def _double_flatfield(self):
self.double_flatfield.apply_double_flatfield(self.radios)
def _double_flatfield(self, radios=None):
if radios is None:
radios = self.radios
self.double_flatfield.apply_double_flatfield(radios)
@pipeline_step("phase_retrieval", "Performing phase retrieval")
def _retrieve_phase(self):
......@@ -438,6 +412,8 @@ class FullFieldPipeline:
def _build_sino(self, radios=None):
if radios is None:
radios = self.radios
# Either a new array (previously allocated in "_sinobuilder_output"),
# or a view of "radios"
self.sinos = self.sino_builder.radios_to_sinos(
radios,
output=self._sinobuilder_output,
......@@ -458,6 +434,7 @@ class FullFieldPipeline:
if data is None:
data = self.recs
self.writer.write(data, *self._writer_exec_args, **self._writer_exec_kwargs)
self.logger.info("Wrote %s" % self.writer.fname)
def _process_chunk(self):
......
from math import ceil
import numpy as np
from ..preproc.ccd_cuda import CudaFlatField, CudaLog
from ..preproc.double_flatfield import DoubleFlatField
from ..preproc.double_flatfield_cuda import CudaDoubleFlatField
from ..preproc.phase_cuda import CudaPaganinPhaseRetrieval
from ..preproc.sinogram_cuda import CudaSinoProcessing
......@@ -104,6 +105,8 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
# In this case, the "build sinogram" step is best done on host to avoid
# extraneous CPU<->GPU copies, and save some GPU memory.
SinoProcessingClass = SinoProcessing
# Same for DoubleFlatField
DoubleFlatFieldClass = DoubleFlatField
def __init__(self, process_config, sub_region, chunk_size, logger=None, extra_options=None, cuda_options=None):
"""
......@@ -160,6 +163,15 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
assert (self.delta_z % self.chunk_size) == 0, "delta_z must be a multiple of chunk_size"
self._allocate_radios()
self._h_recs = None
self._old_flatfield = None
# This is a current limitation.
# Things are a bit tricky as chunk_size and delta_z would have to be
# divided by binning_z, but then the group size has to be re-calculated.
# On the other hand, it makes little sense to use this class
# for other use cases than full-resolution reconstruction...
if self.processing_options["read_chunk"]["binning"][-1] > 1:
raise ValueError("Binning in z is not supported with this class")
#
def _init_reader_finalize(self):
......@@ -178,7 +190,10 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
self.radios = self.chunk_reader.files_data
# (group_size, delta_z, n_x) - fits in GPU mem
self.radios_group_shape = (self.radios_group_size, ) + self.radios.shape[1:]
self.radios_shape = self.radios_group_shape # passed to CCD processing classes
# passed to CCD processing classes
self.radios_shape = self.radios_group_shape
# DoubleFF is done on host. Here self.radios is still the host array (n_angles, delta_z, width)
self._dff_radios_shape = self.radios.shape
if "flatfield" in self.processing_steps:
# We need to update the projections indices passed to FlatField
# for each group of radios
......@@ -198,6 +213,16 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
self.radios = self._d_radios # (radios_group_size, delta_z, width) (fits in GPU mem)
def _allocate_sinobuilder_output(self):
self._allocate_array(self.sino_builder.output_shape, "f", name="sinos")
self._h_sinos = np.zeros(self._d_sinos.shape, "f")
self._sinobuilder_output = self._h_sinos # patch self.sino_builder
return self._h_sinos
def _allocate_recs(self, ny, nx):
self.recs = self._allocate_array((self.chunk_size, ny, nx), "f", name="recs")
def _register_callbacks(self):
# No callbacks are registered for this subclass
pass
......@@ -212,6 +237,51 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
# re-allocate _d_radios for processing a new chunk
self.radios = self._h_radios
self._allocate_radios()
self.flatfield = self._old_flatfield
def _reinit_flatfield(self, start_idx, end_idx, transfer_size):
if "flatfield" in self.processing_steps:
# We need to update the projections indices passed to FlatField
# for each group of radios
ff_opts = self.processing_options["flatfield"]
ff_opts["projs_indices"] = self._ff_proj_indices[start_idx:end_idx]
self._init_flatfield(shape=(transfer_size, ) + self.radios_shape[1:])
def _flatfield_radios_group(self, start_idx, end_idx, transfer_size):
self._reinit_flatfield(start_idx, end_idx, transfer_size)
self._flatfield()
def _apply_flatfield_and_dff(self, n_groups, group_size, n_images):
"""
If double flat-field is activated, apply flat-field + double flat-field.
Otherwise, do nothing and leave the flat-field for later "group processing"
"""
if "double_flatfield" not in self.processing_steps:
return
for i in range(n_groups):
self.logger.debug("processing group %d/%d" % (i+1, n_groups))
start_idx = i * group_size
end_idx = min((i + 1) * group_size, n_images)
transfer_size = end_idx - start_idx
# Copy H2D
self._d_radios[:transfer_size, :, :] = self._h_radios[start_idx:end_idx, :, :]
# Process a group of radios (radios_group_size, delta_z, width)
self._old_radios = self.radios
self.radios = self.radios[:transfer_size]
self._flatfield_radios_group(start_idx, end_idx, transfer_size)
self.radios = self._old_radios
# Copy D2H
self._d_radios[:transfer_size, :, :].get(ary=self._h_radios[start_idx:end_idx])
# Here flat-field has been applied on all radios (n_angles, delta_z, n_x).
# Now apply double FF on host.
self._double_flatfield(radios=self._h_radios)
if "flatfield" in self.processing_steps:
self.processing_steps.remove("flatfield")
self._old_flatfield = self.flatfield
self.flatfield = None
def _process_chunk_ccd(self):
......@@ -221,6 +291,9 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
n_groups = self._n_radios_groups
group_size = self.radios_group_size
n_images = self._h_radios.shape[0]
self._apply_flatfield_and_dff(n_groups, group_size, n_images)
for i in range(n_groups):
self.logger.debug("processing group %d/%d" % (i+1, n_groups))
start_idx = i * group_size
......@@ -229,16 +302,9 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
# Copy H2D
self._d_radios[:transfer_size, :, :] = self._h_radios[start_idx:end_idx, :, :]
# Process a group of radios (radios_group_size, delta_z, width)
if "flatfield" in self.processing_steps:
# We need to update the projections indices passed to FlatField
# for each group of radios
ff_opts = self.processing_options["flatfield"]
ff_opts["projs_indices"] = self._ff_proj_indices[start_idx:end_idx]
self._init_flatfield(shape=(transfer_size, ) + self.radios_shape[1:])
self._old_radios = self.radios
self.radios = self.radios[:transfer_size]
self._flatfield()
self._double_flatfield()
self._flatfield_radios_group(start_idx, end_idx, transfer_size)
self._retrieve_phase()
self._apply_unsharp()
self._take_log()
......@@ -246,22 +312,31 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
# Copy D2H
self._d_radios[:transfer_size, :, :].get(ary=self._h_radios[start_idx:end_idx])
self.logger.debug("End of processing steps on radios")
# release cuda memory
replace_array_memory(self._d_radios, (1,))
self._d_radios = None
# Restore original processing steps
self.processing_steps = self._processing_steps
# Initialize sino builder
if "build_sino" in self.processing_steps:
self._init_sino_builder()
# release cuda memory of _d_radios if a new array was allocated for sinograms
if self._sinobuilder_output is not None:
replace_array_memory(self._d_radios, (1,))
self._d_radios = None
# otherwise use the buffer of _d_radios for _d_sinos
else:
self._d_sinos = garray.empty(
(self.chunk_size, ) + self.sino_builder.output_shape[1:],
"f",
gpudata=self._d_radios.gpudata
)
self._d_sinos.fill(0)
def _process_chunk_sinos(self):
"""
Perform the processing in the "sinograms space"
"""
self.processing_steps = self._processing_steps
self.logger.debug("Initializing processing on sinos")
self._init_sino_builder()
self._tmp_h_sinos = np.zeros(self._d_sinos.shape, "f")
self._sinobuilder_output = self._tmp_h_sinos
self._prepare_reconstruction()
self._init_reconstruction()
self._init_writer()
......@@ -276,10 +351,17 @@ class CudaFullFieldPipelineLimitedMemory(CudaFullFieldPipeline):
start_idx = i * group_size
end_idx = min((i + 1) * group_size, self.delta_z)
transfer_size = end_idx - start_idx # not useful here as delta_z % chunk_size == 0
# Build sinograms on host. Use a view of _h_radios
# Build sinograms on host using _h_radios
self._build_sino(radios=self._h_radios[:, start_idx:end_idx, :])
# Copy H2D
self._d_sinos[:, :, :] = self._sinobuilder_output[:, :, :]
# pycuda does not support copy where "order" is not the same
# (self.sinos might be a view on self.radios)
if not(self._sinobuilder_copy) and not(self.sinos.flags["C_CONTIGUOUS"]):
sinos = np.ascontiguousarray(self.sinos)
else:
sinos = self._h_sinos
#
self._d_sinos[:, :, :] = sinos[:, :, :]
# Process stack of sinograms (chunk_size, n_angles, width)
self._reconstruct(sinos=self._d_sinos)
# Copy D2H
......
#
# Decorators and callback mechanism
#
def use_options(step_name, step_attr):
def decorator(func):
def wrapper(*args, **kwargs):
self = args[0]
if step_name not in self.processing_steps:
self.__setattr__(step_attr, None)
return
self._steps_name2component[step_name] = step_attr
self._steps_component2name[step_attr] = step_name
return func(*args, **kwargs)
return wrapper
return decorator
def pipeline_step(step_attr, step_desc):
def decorator(func):
def wrapper(*args, **kwargs):
self = args[0]
if self.__getattribute__(step_attr) is None:
return
self.logger.info(step_desc)
res = func(*args, **kwargs)
self.logger.debug("End " + step_desc)
callback = self._callbacks.get(self._steps_component2name[step_attr], None)
if callback is not None:
callback()
return res
return wrapper
return decorator
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment