Commit 6eeedd3b authored by Pierre Paleo's avatar Pierre Paleo
Browse files

[hdf5scan] Replace MockScan with the more recent version from tomwer

parent e6476aac
Pipeline #22947 failed with stages
in 33 seconds
# coding: utf-8
#/*##########################################################################
# Copyright (C) 2016-2020 European Synchrotron Radiation Facility
# Copyright (C) 2016 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
......@@ -27,7 +27,7 @@ Utils to mock scans
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "10/10/2019"
__date__ = "30/09/2019"
import h5py
import numpy
......@@ -35,7 +35,8 @@ import os
from xml.etree import cElementTree
import fabio
import fabio.edfimage
from ..scanfactory import ScanFactory # avoid cyclic import
from tomoscan.esrf.hdf5scan import ImageKey
from tomwer.core.scan.hdf5scan import HDF5TomoScan
class _ScanMock:
......@@ -50,9 +51,7 @@ class _ScanMock:
:param scan_path:
:param n_radio:
:param n_ini_radio: number of radio existing at t time. If None,
will create all the radio and consider the
acquisition as finished
:param n_ini_radio:
:param n_extra_radio:
:param scan_range:
:param n_recons:
......@@ -63,6 +62,8 @@ class _ScanMock:
:param dark_n:
:param str scene: scene type.
* 'noise': generate radios from numpy.random
* `increase value`: first frame value will be0, then second 1...
* `arange`: arange through frames
* 'perfect-sphere: generate a sphere which just fit in the
detector dimensions
TODO: add some differente scene type.
......@@ -73,29 +74,12 @@ class _ScanMock:
self.n_radio = n_radio
self.scene = scene
if not os.path.exists(scan_path):
os.mkdir(scan_path)
self.write_metadata(n_radio=n_radio, scan_range=scan_range,
ref_n=ref_n, dark_n=dark_n)
if n_ini_radio is None:
n_ini_radio = n_radio + n_extra_radio
call_acqui_end = True
else:
call_acqui_end = False
for i_radio in range(n_ini_radio):
self.add_radio(i_radio)
for i_extra_radio in range(n_extra_radio):
self.add_radio(n_radio+i_extra_radio)
for i_recons in range(n_recons):
self.add_reconstruction(i_recons)
for i_recons in range(n_pag_recons):
self.add_pag_reconstruction(i_recons)
if recons_vol is True:
self.add_recons_vol()
if call_acqui_end is True:
self.end_acquisition()
def add_radio(self, index=None):
raise NotImplementedError('Base class')
......@@ -117,6 +101,12 @@ class _ScanMock:
def _get_radio_data(self, index):
if self.scene == 'noise':
return numpy.random.random((self.det_height * self.det_width)).reshape((self.det_width, self.det_height))
elif self.scene == 'increasing value':
return numpy.zeros((self.det_width, self.det_height)) + index
elif self.scene == 'arange':
start = index * (self.det_height * self.det_width)
stop = (index + 1) * (self.det_height * self.det_width)
return numpy.arange(start=start, stop=stop).reshape(self.det_width, self.det_height)
elif self.scene == 'perfect-sphere':
background = numpy.zeros((self.det_height * self.det_width))
radius = min(background.shape)
......@@ -133,50 +123,188 @@ class _ScanMock:
scale = 1
background[radii < radius * scale] = 1.0
return background
else:
raise ValueError('selected scene %s is no managed' % self.scene)
class MockHDF5(_ScanMock):
"""
Mock an acquisition in a hdf5 file
Mock an acquisition in a hdf5 file.
note: for now the Mock class only manage one initial ref and one final
"""
def __init__(self, scan_path, n_radio, n_ini_radio=None, n_extra_radio=0,
_PROJ_COUNT = 1
def __init__(self, scan_path, n_proj, n_ini_proj=None, n_alignement_proj=0,
scan_range=360, n_recons=0, n_pag_recons=0, recons_vol=False,
dim=200):
dim=200, create_ini_dark=True, create_ini_ref=True,
create_final_ref=False, n_refs=10):
"""
:param scan_path: directory of the file containing the hdf5 acquisition
:param n_proj: number of projections (does not contain alignement proj)
:param n_ini_proj: number of projection do add in the constructor
:param n_alignement_proj: number of alignment projection
:param int scan_range:
:param n_recons:
:param n_pag_recons:
:param recons_vol:
:param dim: frame dim - only manage square fame for now
:param create_ini_dark: create one initial dark frame on construction
:param create_ini_ref: create the initial serie of ref (n_ref) on
construction (after creation of the dark)
:param create_final_ref: create the final serie of ref (n_ref) on
construction (after creation of the dark)
:param n_refs: number of refs per serie
"""
self.rotation_angle = numpy.linspace(start=0, stop=scan_range,
num=n_proj + 1)
self.rotation_angle_return = numpy.linspace(start=scan_range, stop=0,
num=n_alignement_proj)
self.scan_master_file = os.path.join(scan_path, os.path.basename((scan_path)) + '.h5')
super(MockHDF5, self).__init__(scan_path=scan_path, n_radio=n_radio,
n_ini_radio=n_ini_radio,
n_extra_radio=n_extra_radio,
self._n_refs = n_refs
self.scan_entry = 'entry'
super(MockHDF5, self).__init__(scan_path=scan_path, n_radio=n_proj,
n_ini_radio=n_ini_proj,
n_extra_radio=n_alignement_proj,
scan_range=scan_range,
n_recons=n_recons,
n_pag_recons=n_pag_recons,
recons_vol=recons_vol, dim=dim)
if create_ini_dark:
self.add_initial_dark()
if create_ini_ref:
self.add_initial_ref()
if n_ini_proj is not None:
for i_radio in range(n_ini_proj):
self.add_radio(index=i_radio)
if create_final_ref:
self.add_final_ref()
self.scan = HDF5TomoScan(scan=self.scan_master_file, entry='entry')
def add_initial_dark(self):
dark = numpy.random.random((self.det_height * self.det_width)).reshape(
(1, self.det_width, self.det_height))
self._append_frame(data_=dark,
rotation_angle=self.rotation_angle[-1],
image_key=ImageKey.DARK_FIELD.value,
image_key_control=ImageKey.DARK_FIELD.value)
def add_initial_ref(self):
for i in range(self._n_refs):
flat = numpy.random.random((self.det_height * self.det_width)).reshape(
(1, self.det_width, self.det_height))
self._append_frame(data_=flat,
rotation_angle=self.rotation_angle[0],
image_key=ImageKey.FLAT_FIELD.value,
image_key_control=ImageKey.FLAT_FIELD.value)
def add_final_ref(self):
for i in range(self._n_refs):
flat = numpy.random.random((self.det_height * self.det_width)).reshape(
(1, self.det_width, self.det_height))
self._append_frame(data_=flat,
rotation_angle=self.rotation_angle[-1],
image_key=ImageKey.FLAT_FIELD.value,
image_key_control=ImageKey.FLAT_FIELD.value)
def add_radio(self, index=None):
radio = self._get_radio_data(index=index)
radio = radio.reshape((1, self.det_height, self.det_width))
self._append_frame(data_=radio,
rotation_angle=self.rotation_angle[index],
image_key=ImageKey.PROJECTION.value,
image_key_control=ImageKey.PROJECTION.value)
def _append_frame(self, data_, rotation_angle, image_key,
image_key_control):
with h5py.File(self.scan_master_file, 'r+') as h5_file:
if '1_tomo/measurement/pcoedge64:image' in h5_file:
current_dataset = h5_file['1_tomo/measurement/pcoedge64:image'][()]
del h5_file['1_tomo/measurement/pcoedge64:image']
new_dataset = numpy.append(current_dataset, radio)
entry_one = h5_file.require_group(self.scan_entry)
instrument_grp = entry_one.require_group('instrument')
detector_grp = instrument_grp.require_group('detector')
sample_grp = entry_one.require_group('sample')
# add data
if 'data' in detector_grp:
# read and remove data
current_dataset = detector_grp['data'][()]
new_dataset = numpy.append(current_dataset, data_)
del detector_grp['data']
shape = list(current_dataset.shape)
shape[0] += 1
new_dataset = new_dataset.reshape(shape)
else:
new_dataset = radio
new_dataset = data_
# add rotation angle
if 'rotation_angle' in sample_grp:
new_rot_angle = sample_grp['rotation_angle'][()]
new_rot_angle = numpy.append(new_rot_angle, rotation_angle)
del sample_grp['rotation_angle']
else:
new_rot_angle = [rotation_angle, ]
# add image_key
if 'image_key' in detector_grp:
new_image_key = detector_grp['image_key'][()]
new_image_key = numpy.append(new_image_key,
image_key)
del detector_grp['image_key']
else:
new_image_key = [image_key, ]
# add image_key_control
if 'image_key_control' in detector_grp:
new_image_key_control = detector_grp['image_key_control'][()]
new_image_key_control = numpy.append(new_image_key_control, image_key_control)
del detector_grp['image_key_control']
else:
new_image_key_control = [image_key_control, ]
# add count_time
if 'count_time' in detector_grp:
new_count_time = detector_grp['count_time'][()]
new_count_time = numpy.append(new_count_time, self._PROJ_COUNT)
del detector_grp['count_time']
else:
new_count_time = [self._PROJ_COUNT, ]
with h5py.File(self.scan_master_file, 'a') as h5_file:
h5_file['1_tomo/measurement/pcoedge64:image'] = new_dataset
entry_one = h5_file.require_group(self.scan_entry)
instrument_grp = entry_one.require_group('instrument')
if 'NX_class' not in instrument_grp.attrs:
instrument_grp.attrs['NX_class'] = 'NXinstrument'
detector_grp = instrument_grp.require_group('detector')
if 'NX_class' not in detector_grp.attrs:
detector_grp.attrs['NX_class'] = 'NXdetector'
sample_grp = entry_one.require_group('sample')
if 'NX_class' not in sample_grp.attrs:
sample_grp.attrs['NX_class'] = 'NXsample'
# write camera information
detector_grp['data'] = new_dataset
detector_grp['image_key'] = new_image_key
detector_grp['image_key_control'] = new_image_key_control
detector_grp['count_time'] = new_count_time
# write sample information
sample_grp['rotation_angle'] = new_rot_angle
def write_metadata(self, n_radio, scan_range, ref_n, dark_n):
with h5py.File(self.scan_master_file, 'a') as h5_file:
h5_file['1_tomo/scan_meta/technique/scan/tomo_n'] = n_radio
h5_file['1_tomo/scan_meta/technique/scan/scan_range'] = scan_range
h5_file['1_tomo/scan_meta/technique/scan/ref_n'] = ref_n
h5_file['1_tomo/scan_meta/technique/scan/dark_n'] = dark_n
h5_file['1_tomo/scan_meta/technique/detector/size'] = (self.det_width, self.det_height)
h5_file['1_tomo/scan_meta/technique/detector/pixel_size'] = _ScanMock.PIXEL_SIZE
h5_file['1_tomo/scan_meta/technique/detector/ref_on'] = str(n_radio)
entry_one = h5_file.require_group(self.scan_entry)
instrument_grp = entry_one.require_group('instrument')
detector_grp = instrument_grp.require_group('detector')
sample_grp = entry_one.require_group('sample')
entry_one.attrs["NX_class"] = u"NXentry"
entry_one.attrs["definition"] = u"NXtomo"
if 'size' not in detector_grp:
detector_grp['size'] = (self.det_width, self.det_height)
if 'x_pixel_size' not in detector_grp:
detector_grp['x_pixel_size'] = _ScanMock.PIXEL_SIZE
if 'y_pixel_size' not in detector_grp:
detector_grp['y_pixel_size'] = _ScanMock.PIXEL_SIZE
def end_acquisition(self):
# no specific operation to do
......@@ -184,23 +312,38 @@ class MockHDF5(_ScanMock):
class MockEDF(_ScanMock):
"""Mock a EDF acquisition"""
"""Mock a EDF acquisition
"""
_RECONS_PATTERN = 'slice_'
_RECONS_PATTERN = '_slice_'
_PAG_RECONS_PATTERN = 'slice_pag_'
_PAG_RECONS_PATTERN = '_slice_pag_'
def __init__(self, scan_path, n_radio, n_ini_radio=None, n_extra_radio=0,
scan_range=360, n_recons=0, n_pag_recons=0, recons_vol=False,
dim=200):
dim=200, scene='noise'):
self._last_radio_index = -1
super(MockEDF, self).__init__(scan_path=scan_path, n_radio=n_radio,
super(MockEDF, self).__init__(scan_path=scan_path,
n_radio=n_radio,
n_ini_radio=n_ini_radio,
n_extra_radio=n_extra_radio,
scan_range=scan_range,
n_recons=n_recons,
n_pag_recons=n_pag_recons,
recons_vol=recons_vol, dim=dim)
recons_vol=recons_vol,
dim=dim,
scene=scene)
if n_ini_radio:
for i_radio in range(n_ini_radio):
self.add_radio(i_radio)
for i_extra_radio in range(n_extra_radio):
self.add_radio(i_extra_radio)
for i_recons in range(n_recons):
self.add_reconstruction(i_recons)
for i_recons in range(n_pag_recons):
self.add_pag_reconstruction(i_recons)
if recons_vol is True:
self.add_recons_vol()
def get_info_file(self):
return os.path.join(self.scan_path, os.path.basename(self.scan_path) + '.info')
......@@ -233,17 +376,17 @@ class MockEDF(_ScanMock):
info_file.write('PixelSize= ' + str(_ScanMock.PIXEL_SIZE) + '\n')
def add_radio(self, index=None):
if index:
if index is not None:
self._last_radio_index = index
index_ = index
else:
self._last_radio_index += 1
index_ = self._last_radio_index
file_name = os.path.basename(self.scan_path) + '_{0:04d}'.format(index_) + ".edf"
f = os.path.join(self.scan_path, file_name)
if not os.path.exists(f):
data = self._get_radio_data(index=index_)
assert data is not None
assert data.shape == (self.det_width, self.det_height)
edf_writer = fabio.edfimage.EdfImage(data=data,
header={"tata": "toto"})
......@@ -374,6 +517,7 @@ class MockEDF(_ScanMock):
assert type(nRadio) is int
assert type(nRecons) is int
assert type(dim) is int
from tomwer.core.scan.scanfactory import ScanFactory # avoid cyclic import
MockEDF.fastMockAcquisition(folder=scanID, n_radio=nRadio,
scan_range=scan_range, n_extra_radio=n_extra_radio)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment