Commit a4b26cf4 authored by Carsten Richter's avatar Carsten Richter

Merge branch 'tests' into 'master'

Add tests for center-of-mass/max processing

Closes #69

See merge request !107
parents 4f1a40c2 c678b364
Pipeline #6973 passed with stages
in 4 minutes and 15 seconds
......@@ -162,7 +162,6 @@ class FitH5(XsocsH5Base):
return [self._get_array_data('/'.join((base_path, name)))
for name in self.get_qspace_dimension_names(entry)]
def get_axis_result(self, entry, process, result, axis):
"""Returns the results for the given entry/process/result name/axis.
......@@ -369,7 +368,6 @@ class FitH5Writer(FitH5):
self._set_array_data(
'/'.join((base_path, self.__axis_name(index))), values)
def set_roi_indices(self, roi0, roi1, roi2):
"""Write qspace axes coordinates for each dimension
......@@ -382,5 +380,3 @@ class FitH5Writer(FitH5):
values = numpy.array(values)
self._set_array_data(
'/'.join((base_path, self.__axis_name(index))), values)
......@@ -31,5 +31,5 @@ __authors__ = ["D. Naudet"]
__license__ = "MIT"
__date__ = "01/05/2017"
from .peak_fit import FitTypes, FitStatus, PeakFitter # noqa
from .peak_fit import FitTypes, FitStatus, PeakFitter, FitResult # noqa
from .peak_fit import BackgroundTypes, background_estimation # noqa
......@@ -41,7 +41,7 @@ from silx.math.fit import snip1d
from ... import config
from ...io import QSpaceH5
from ...io.FitH5 import BackgroundTypes, FitH5Writer
from ...io.FitH5 import BackgroundTypes, FitH5, FitH5Writer
from ...util import gaussian, project
......@@ -112,7 +112,7 @@ class FitResult(object):
"""QSpace axis names (List[str])"""
self.roi_indices = roi_indices
"""Roi indices (List[List[str]])"""
"""Roi indices (List[List[int]])"""
self.fit_mode = fit_mode
"""Fit type (FitTypes)"""
......@@ -145,20 +145,22 @@ class FitResult(object):
"""
return numpy.array(self._fit_results[dimension][parameter], copy=copy)
_FIT_ENTRY_NAMES = {
FitTypes.GAUSSIAN: 'Gaussian',
FitTypes.CENTROID: 'Centroid'}
_FIT_PROCESS_NAMES = {
FitTypes.GAUSSIAN: 'gauss_0',
FitTypes.CENTROID: 'centroid'}
def to_fit_h5(self, fit_h5, mode=None):
"""Write fit results to an HDF5 file
:param str fit_h5: Filename where to save fit results
:param Union[None,str] mode: HDF5 file opening mode
"""
if self.fit_mode == FitTypes.GAUSSIAN:
fit_name = 'Gaussian'
result_name = 'gauss_0'
elif self.fit_mode == FitTypes.CENTROID:
fit_name = 'Centroid'
result_name = 'centroid'
else:
raise RuntimeError('Unknown Fit Type')
fit_name = self._FIT_ENTRY_NAMES[self.fit_mode]
result_name = self._FIT_PROCESS_NAMES[self.fit_mode]
with FitH5Writer(fit_h5,
entry=fit_name,
......@@ -179,6 +181,90 @@ class FitResult(object):
else:
fitH5.set_result(result_name, dimension, name, results)
@classmethod
def from_fit_h5(cls, filename):
"""Create a FitResult from content of a HDF5 fit file
:param str filename: HDF% fit results file name
:rtype: FitResult
"""
with FitH5(filename) as fith5:
# Retrieve entry
entries = fith5.entries()
if len(entries) != 1:
raise RuntimeError("Only one entry in fit result is supported")
entry = fith5.entries()[0]
# Get fit mode corresponding to entry name
for key, value in cls._FIT_ENTRY_NAMES.items():
if value == entry:
fit_mode = key
break
else:
raise RuntimeError("Unsupported fit entry name: %s" % entry)
# Get corresponding NXProcess name
process = cls._FIT_PROCESS_NAMES[fit_mode]
if process not in fith5.processes(entry):
raise RuntimeError("Cannot find relevant NXProcess group in file")
# Retrieve fit results
parameters = list(fith5.get_result_names(entry, process))
parameters.append('Status')
fit_results = []
for axis in range(len(fith5.get_qspace_dimension_names(entry))):
result = [fith5.get_axis_result(entry, process, param, axis)
for param in parameters[:-1]] # All but Status
result.append(fith5.get_status(entry, axis))
fit_results.append(result)
# Get dtype from result arrays
result_dtype = [(name, array.dtype)
for name, array in zip(parameters, fit_results[0])]
# Convert from list by axis of list by param of list by sample point
# To list by sample point of list by axis of list by param
nb_axes = len(fit_results)
nb_params = len(fit_results[0])
nb_points = len(fit_results[0][0])
fit_results = [[tuple([fit_results[axis][param][point]
for param in range(nb_params)])
for axis in range(nb_axes)]
for point in range(nb_points)]
# Convert to record array
fit_results = numpy.array(fit_results, dtype=result_dtype)
# Return FitResult object
sample_x, sample_y = fith5.sample_positions(entry)
return FitResult(
sample_x=sample_x,
sample_y=sample_y,
q_dim_values=fith5.get_qspace_dimension_values(entry),
q_dim_names=fith5.get_qspace_dimension_names(entry),
roi_indices=fith5.get_roi_indices(entry),
fit_mode=fit_mode,
background_mode=fith5.get_background_mode(entry),
fit_results=fit_results)
def __eq__(self, other):
"""Implement equality, useful for tests"""
return (
isinstance(other, FitResult) and
numpy.array_equal(self.sample_x, other.sample_x) and
numpy.array_equal(self.sample_y, other.sample_y) and
numpy.all([numpy.array_equal(a1, a2)
for a1, a2 in zip(self.qspace_dimension_values,
other.qspace_dimension_values)]) and
(tuple(self.qspace_dimension_names) ==
tuple(other.qspace_dimension_names)) and
numpy.array_equal(self.roi_indices, self.roi_indices) and
self.fit_mode == other.fit_mode and
self.background_mode == self.background_mode and
numpy.array_equal(self._fit_results, other._fit_results))
class PeakFitter(object):
"""Class performing fit/com processing
......@@ -378,8 +464,7 @@ def _fit_process(index,
if roi_indices is not None:
qslice = numpy.s_[roi_indices[0][0]:roi_indices[0][1],
roi_indices[1][0]:roi_indices[1][1],
roi_indices[2][0]:roi_indices[2][1],
]
roi_indices[2][0]:roi_indices[2][1]]
axes = [axis[roi] for axis, roi in zip(axes, qslice)]
qspace = qspace_h5.qspace_slice((index,) + qslice)
......
......@@ -33,102 +33,27 @@ __date__ = "05/01/2016"
import os
import shutil
import sys
import tempfile
import unittest
import numpy as np
import numpy
from silx.utils.testutils import ParametricTestCase
from xsocs import config
from xsocs.test.utils import test_resources
from xsocs.process.fit import PeakFitter
from xsocs.io.FitH5 import FitH5
def _cmp_fit_h5_files(test_case, ref_h5, this_h5):
"""Compares two FitH5 files.
:param test_case:
:param ref_h5:
:param this_h5:
:return:
"""
ref_entries = ref_h5.entries()
this_entries = this_h5.entries()
test_case.assertEqual(ref_entries, this_entries)
for entry in ref_entries:
_cmp_fit_h5_processes(test_case, ref_h5, this_h5, entry)
def _cmp_fit_h5_processes(test_case, ref_h5, this_h5, entry):
"""Compares two FitH5 files
:param test_case:
:param ref_h5:
:param this_h5:
:param entry:
:return:
"""
ref_processes = ref_h5.processes(entry)
this_processes = this_h5.processes(entry)
test_case.assertEqual(ref_processes, this_processes)
for process in ref_processes:
_cmp_fit_h5_results(test_case,
ref_h5,
this_h5,
entry,
process)
def _cmp_fit_h5_results(test_case,
ref_h5,
this_h5,
entry,
process):
"""Compares two FitH5 files
:param test_case:
:param ref_h5:
:param this_h5:
:param entry:
:param process:
:return:
"""
ref_results = ref_h5.get_result_names(
entry, process)
this_results = this_h5.get_result_names(
entry, process)
test_case.assertEqual(ref_results, this_results)
if sys.platform == 'win32': # Relax tests on Windows
compare_arrays = np.allclose
else:
compare_arrays = np.array_equal
for result in ref_results:
ref_qx = ref_h5.get_axis_result(entry, process, result, 0)
this_qx = this_h5.get_axis_result(entry, process, result, 0)
test_case.assertTrue(compare_arrays(ref_qx, this_qx))
ref_qy = ref_h5.get_axis_result(entry, process, result, 1)
this_qy = this_h5.get_axis_result(entry, process, result, 1)
test_case.assertTrue(compare_arrays(ref_qy, this_qy))
ref_qz = ref_h5.get_axis_result(entry, process, result, 2)
this_qz = this_h5.get_axis_result(entry, process, result, 2)
test_case.assertTrue(compare_arrays(ref_qz, this_qz))
from xsocs.process.fit import PeakFitter, FitResult, FitTypes, BackgroundTypes
from xsocs.io.QSpaceH5 import QSpaceH5
class TestPeakFitter(ParametricTestCase):
"""Unit tests of the qspace converter class."""
_QSPACE_FILES = 'qspace_1.h5', 'qspace_2.h5', 'qspace_3.h5', 'qspace_4.h5'
_GAUSSIAN_FILES = ('gaussian_1.h5', 'gaussian_2.h5',
'gaussian_3.h5', 'gaussian_4.h5')
@classmethod
def setUpClass(cls):
config.DEFAULT_PROCESS_NUMBER = 2 # Limit number of processes
......@@ -143,43 +68,97 @@ class TestPeakFitter(ParametricTestCase):
shutil.rmtree(self._tmpTestDir)
self._tmpTestDir = None
def test_nominal(self):
keys = ['fit_f', 'qspace_f']
parameters = [
('gaussian_1.h5', 'qspace_1.h5'),
('gaussian_2.h5', 'qspace_2.h5'),
('gaussian_3.h5', 'qspace_3.h5'),
('gaussian_4.h5', 'qspace_4.h5')
]
param_dicts = [dict(zip(keys, params)) for params in parameters]
for params in param_dicts:
with self.subTest(**params):
qspace_f = test_resources.getfile(
'qspace/{0}'.format(params['qspace_f']))
fit_out = os.path.join(self._tmpTestDir, params['fit_f'])
fitter = PeakFitter(qspace_f)
def test_gaussian(self):
"""Test gaussian fit"""
for fit_f, qspace_f in zip(self._GAUSSIAN_FILES, self._QSPACE_FILES):
with self.subTest(fit_f=fit_f, qspace_f=qspace_f):
# Configure fitting
fitter = PeakFitter(
qspace_f=test_resources.getfile('qspace/' + qspace_f),
fit_type=FitTypes.GAUSSIAN,
background=BackgroundTypes.NONE)
self.assertEqual(fitter.status, fitter.READY)
# Run processing
fitter.peak_fit()
self.assertEqual(fitter.status, fitter.DONE)
results = fitter.results
results.to_fit_h5(fit_out)
fit_ref = test_resources.getfile(
'fit_2018_12/{0}'.format(params['fit_f']))
fit_ref_h5 = FitH5(fit_ref)
fit_out_h5 = FitH5(fit_out)
_cmp_fit_h5_files(self, fit_ref_h5, fit_out_h5)
# Compare results
ref = FitResult.from_fit_h5(
test_resources.getfile('fit_2018_12/' + fit_f))
self.assertEqual(ref, fitter.results)
# Save as HDF5 and compare
fit_out = os.path.join(self._tmpTestDir, fit_f)
fitter.results.to_fit_h5(fit_out)
self.assertEqual(ref, FitResult.from_fit_h5(fit_out))
def test_com(self):
"""Test Center-of-mass and Max reduction"""
for qspace_file in self._QSPACE_FILES:
qspace_f = test_resources.getfile('qspace/' + qspace_file)
# Configure reduction
fitter = PeakFitter(
qspace_f=qspace_f,
fit_type=FitTypes.CENTROID,
background=BackgroundTypes.NONE)
self.assertEqual(fitter.status, fitter.READY)
# Run processing
fitter.peak_fit()
self.assertEqual(fitter.status, fitter.DONE)
# Test results with reference implementation
for key, ref in self._qspace_com_result(qspace_f).items():
for dim in range(3):
with self.subTest(qspace_file=qspace_file,
dimension=dim,
parameter=key):
self.assertTrue(numpy.array_equal(
ref[dim], fitter.results.get_results(dim, key)))
# Save as HDF5 and check saved result
fit_out = os.path.join(
self._tmpTestDir, 'com_result_' + qspace_file + '.h5')
fitter.results.to_fit_h5(fit_out)
self.assertEqual(fitter.results, FitResult.from_fit_h5(fit_out))
@staticmethod
def _qspace_com_result(qspace_f):
"""Test implementation of QSpace COM results
:param str qspace_f: HDF5 file name
:return: Dict of results for each axis for each sample point
:rtype: Dict[List[List[float]]]
"""
result = {'COM': [[], [], []],
'I_sum': [[], [], []],
'I_max': [[], [], []],
'Pos_max': [[], [], []]}
with QSpaceH5(qspace_f) as qspaceh5:
axes = qspaceh5.qspace_dimension_values
hits = qspaceh5.histo
hits = [hits.sum(2).sum(1),
hits.sum(2).sum(0),
hits.sum(1).sum(0)]
for qspace in qspaceh5.qspace:
projs = [qspace.sum(2).sum(1) / hits[0],
qspace.sum(2).sum(0) / hits[1],
qspace.sum(1).sum(0) / hits[2]]
for dim in range(3):
result['I_max'][dim].append(numpy.max(projs[dim]))
q_sum = numpy.sum(projs[dim])
result['I_sum'][dim].append(q_sum)
result['COM'][dim].append(
numpy.dot(projs[dim], axes[dim]) / q_sum)
result['Pos_max'][dim].append(
axes[dim][numpy.argmax(projs[dim])])
return result
def suite():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment