#!/usr/bin/python # coding: utf8 # /*########################################################################## # # Copyright (c) 2015-2016 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # ###########################################################################*/ from __future__ import absolute_import, division __authors__ = ["D. Naudet"] __date__ = "01/06/2016" __license__ = "MIT" import logging import functools import multiprocessing import numpy as np from scipy.optimize import leastsq from silx.math.fit import snip1d from ... import config from ...io import QSpaceH5 from ...io.FitH5 import BackgroundTypes from ...util import gaussian, project from .fitresults import FitResult _logger = logging.getLogger(__name__) def background_estimation(mode, data): """Estimates a background of mode kind for data :param BackgroundTypes mode: The kind of background to compute :param numpy.ndarray data: The data array to process :return: The estimated background as an array with same shape as input :rtype: numpy.ndarray :raises ValueError: In case mode is not known """ # Background subtraction if mode == BackgroundTypes.CONSTANT: # Shift data so that smallest value is 0 return np.ones_like(data) * np.nanmin(data) elif mode == BackgroundTypes.LINEAR: # Simple linear background return np.linspace(data[0], data[-1], num=len(data), endpoint=True) elif mode == BackgroundTypes.SNIP: # Using snip background return snip1d(data, snip_width=len(data)) elif mode == BackgroundTypes.NONE: return np.zeros_like(data) else: raise ValueError("Unsupported background mode") class FitTypes(object): """Kind of fit available""" ALLOWED = range(2) GAUSSIAN, CENTROID = ALLOWED class PeakFitter(object): """Class performing fit/com processing :param str qspace_f: path to the HDF5 file containing the qspace cubes :param FitTypes fit_type: :param Union[numpy.ndarray,None] indices: indices of the cubes (in the input HDF5 dataset) for which the dim0/dim1/dim2 peaks coordinates will be computed. E.g : if the array [1, 2, 3] is provided, only those cubes will be fitted. :param Union[int,None] n_proc: Number of process to use. If None, the config value is used. :param Union[List[List[int]],None] roi_indices: QSpace ROI to process :param BackgroundTypes background: The background subtraction to perform """ READY, RUNNING, DONE, ERROR, CANCELED = __STATUSES = range(5) def __init__(self, qspace_f, fit_type=FitTypes.GAUSSIAN, indices=None, n_proc=None, roi_indices=None, background=BackgroundTypes.NONE): super(PeakFitter, self).__init__() self.__results = None self.__status = self.READY self.__qspace_f = qspace_f self.__fit_type = fit_type self.__background = background self.__n_proc = n_proc if n_proc else config.DEFAULT_PROCESS_NUMBER if roi_indices is not None: self.__roi_indices = np.array(roi_indices[:]) else: self.__roi_indices = None if fit_type not in FitTypes.ALLOWED: self.__set_status(self.ERROR) raise ValueError('Unknown fit type: {0}'.format(fit_type)) if background not in BackgroundTypes.ALLOWED: self.__set_status(self.ERROR) raise ValueError('Unknown background type: {}'.format(background)) try: with QSpaceH5.QSpaceH5(qspace_f) as qspace_h5: with qspace_h5.qspace_dset_ctx() as dset: qdata_shape = dset.shape except IOError: self.__set_status(self.ERROR) raise if indices is None: n_points = qdata_shape[0] self.__indices = np.arange(n_points) else: self.__indices = np.array(indices, copy=True) def __set_status(self, status): assert status in self.__STATUSES self.__status = status status = property(lambda self: self.__status) results = property(lambda self: self.__results) def peak_fit(self): """Blocking execution of fit/com processing. It returns the fit/com result """ for _ in self.__peak_fit(): pass return self.results def peak_fit_iterator(self): """Run fit/com processing as a generator. It yields a progress indicator in [0, 1]. """ for progress, _ in enumerate(self.__peak_fit()): yield progress / len(self.__indices) def __peak_fit(self): self.__results = None self.__set_status(self.RUNNING) pool = multiprocessing.Pool(self.__n_proc) fit_results = [] for result in pool.imap( functools.partial(_fit_process, qspace_f=self.__qspace_f, fit_type=self.__fit_type, background_type=self.__background, roiIndices=self.__roi_indices), self.__indices): fit_results.append(result) yield result pool.close() pool.join() # Prepare FitResult object with QSpaceH5.QSpaceH5(self.__qspace_f) as qspace_h5: x_pos = qspace_h5.sample_x[self.__indices] y_pos = qspace_h5.sample_y[self.__indices] q_dim0, q_dim1, q_dim2 = qspace_h5.qspace_dimension_values if self.__roi_indices is not None: q_dim0 = q_dim0[self.__roi_indices[0][0]:self.__roi_indices[0][1]] q_dim1 = q_dim1[self.__roi_indices[1][0]:self.__roi_indices[1][1]] q_dim2 = q_dim2[self.__roi_indices[2][0]:self.__roi_indices[2][1]] if self.__fit_type == FitTypes.GAUSSIAN: fit_name = 'Gaussian' result_name = 'gauss_0' result_dtype = [('Area', np.float64), ('Center', np.float64), ('Sigma', np.float64), ('Status', np.bool_)] elif self.__fit_type == FitTypes.CENTROID: fit_name = 'Centroid' result_name = 'centroid' result_dtype = [('COM', np.float64), ('I_sum', np.float64), ('I_max', np.float64), ('Pos_max', np.float64), ('Status', np.bool_)] else: raise RuntimeError('Unknown Fit Type') results = FitResult(entry=fit_name, sample_x=x_pos, sample_y=y_pos, q_x=q_dim0, q_y=q_dim1, q_z=q_dim2, background_mode=self.__background) fit_results = np.array(fit_results, dtype=result_dtype) # From points x axes to axes x points fit_results = np.transpose(fit_results) for axis_index, array in enumerate(fit_results): for name, _ in result_dtype[:-1]: results._add_axis_result(result_name, axis_index, name, array[name]) results._set_axis_status(axis_index, array['Status']) self.__results = results self.__set_status(self.DONE) # Per process cache for fit process _per_process_cache = None def _fit_process(index, qspace_f, fit_type=FitTypes.GAUSSIAN, background_type=BackgroundTypes.NONE, roiIndices=None): """Run fit processing. It loads a QSpace, extracts a ROI from it, project to axes, and then for each axis, it subtracts a background and performs a fit/COM. This function is run through a multiprocessing.Pool :param int index: The index of the QSpace to process :param str qspace_f: Filename of the hdf5 file containing QSpace :param FitTypes fit_type: The kind of fit to perform :param BackgroundTypes background_type: The kind of background subtraction to perform :param Union[List[List[int]],None] roiIndices: Optional QSpace ROI start:end in the 3 dimensions :return: Fit results as a list of results for dim0, dim1 and dim2 :rtype: List[List[Union[float,bool]]] """ global _per_process_cache if _per_process_cache is None: # Initialize per process cache qspace_h5 = QSpaceH5.QSpaceH5(qspace_f) _per_process_cache = ( qspace_h5, qspace_h5.qspace_dimension_values, qspace_h5.histo) # Retrieve data/file from cache qspace_h5, axes, hits = _per_process_cache # Load qspace qspace = qspace_h5.qspace_slice(index) # apply Qspace ROI if roiIndices is not None: dim0Slice = slice(roiIndices[0][0], roiIndices[0][1], 1) dim1Slice = slice(roiIndices[1][0], roiIndices[1][1], 1) dim2Slice = slice(roiIndices[2][0], roiIndices[2][1], 1) axes = [axis[roi] for axis, roi in zip(axes, (dim0Slice, dim1Slice, dim2Slice))] hits = hits[dim0Slice, dim1Slice, dim2Slice] qspace = qspace[dim0Slice, dim1Slice, dim2Slice] # Normalize with hits and project to axes projections = project(qspace, hits) # Background subtraction if background_type != BackgroundTypes.NONE: for array in projections: array -= background_estimation(background_type, array) # Fit/COM fit = {FitTypes.CENTROID: centroid, FitTypes.GAUSSIAN: gaussian_fit}[fit_type] result = [fit(axis, values) for axis, values in zip(axes, projections)] return result # Center of mass def centroid(axis, signal): """Returns Center of mass and maximum information :param numpy.ndarray axis: 1D x data :param numpy.ndarray signal: 1D y data :return: Center of mass, sum of signal, max of signal, position of max and status ('COM', 'I_sum', 'I_max', 'Pos_max', 'status') :rtype: List[Union[float,bool]] """ signal_sum = signal.sum() if signal_sum == 0: return float('nan'), float('nan'), float('nan'), float('nan'), False else: max_idx = signal.argmax() return (float(np.dot(axis, signal) / signal_sum), float(signal_sum), float(signal[max_idx]), float(axis[max_idx]), True) # Gaussian fit def _gaussian_err(parameters, axis, signal): """Returns difference between signal and given gaussian :param List[float] parameters: area, center, sigma :param numpy.ndarray axis: 1D x data :param numpy.ndarray signal: 1D y data :return: """ area, center, sigma = parameters return gaussian(axis, area, center, sigma) - signal _SQRT_2_PI = np.sqrt(2 * np.pi) def gaussian_fit(axis, signal): """Returns gaussian fitting information parameters: (a, c, s) and f(x) = (a / (sqrt(2 * pi) * s)) * exp(- 0.5 * ((x - c) / s)^2) :param axis: 1D x data :param signal: 1D y data :return: Area, center, sigma, status ('Area', 'Center', 'Sigma', 'status) :rtype: List[Union[float,bool]] """ # compute guess area = signal.sum() * (axis[-1] - axis[0]) / len(axis) center = axis[signal.argmax()] sigma = area / (signal.max() * _SQRT_2_PI) # Fit a gaussian result = leastsq(_gaussian_err, x0=(area, center, sigma), args=(axis, signal), maxfev=100000, full_output=True) if result[4] not in [1, 2, 3, 4]: return float('nan'), float('nan'), float('nan'), False else: area, center, sigma = result[0] return float(area), float(center), float(sigma), True