Commit 3ca7c263 authored by payno's avatar payno
Browse files

[test][utils] add a mockup class for zseries.

Split _BlissSample  into two classes: _BlissBasicTomo and _BlissZseriesTomo
parent 7e6b4608
......@@ -36,6 +36,7 @@ import numpy
import os
from nxtomomill import converter
from nxtomomill.test.utils.bliss import MockBlissAcquisition
from glob import glob
class TestH5ToNxConverter(unittest.TestCase):
......@@ -217,6 +218,40 @@ class TestH5ToNxConverter(unittest.TestCase):
raise_error_if_issue=True,
)
def test_z_series_conversion(self):
"""Test conversion of a zseries bliss (mock) acquisition"""
bliss_mock = MockBlissAcquisition(
n_sample=1,
n_sequence=1,
n_scan_per_sequence=10,
n_darks=5,
n_flats=5,
with_nx_detector_attr=True,
output_dir=self.folder,
detector_name="frelon1",
acqui_type="zseries",
z_values=(1, 2, 3),
)
self.assertTrue(len(bliss_mock.samples), 1)
sample = bliss_mock.samples[0]
self.assertTrue(os.path.exists(sample.sample_file))
output_file = sample.sample_file.replace(".h5", ".nx")
res = converter.h5_to_nx(
input_file_path=sample.sample_file,
output_file=output_file,
single_file=False,
request_input=False,
entries=None,
file_extension=None,
raise_error_if_issue=False,
)
# insure the 4 files are generated: master file and one per z
files = glob(os.path.dirname(sample.sample_file) + "/*.nx")
self.assertEqual(len(files), 4)
# try to create HDF5TomoScan from those to insure this is valid
# and check z values for example
class TestDetectorDetection(unittest.TestCase):
"""
......
......@@ -45,6 +45,9 @@ class MockBlissAcquisition:
create one serie of n flats after dark if any
:param str output_dir: will contain the proposal file and one folder per
sequence.
:param str acqui_type: acquisition type. Can be "basic" or "zseries"
:param Iterable z_values: if acqui_type is zseries then users should
provide the serie of values for z (one per stage)
"""
def __init__(
......@@ -57,6 +60,8 @@ class MockBlissAcquisition:
output_dir,
with_nx_detector_attr=True,
detector_name="pcolinux",
acqui_type="basic",
z_values=None,
):
self.__folder = output_dir
if not os.path.exists(output_dir):
......@@ -70,8 +75,8 @@ class MockBlissAcquisition:
sample_dir = os.path.join(self.path, dir_name)
os.mkdir(sample_dir)
sample_file = os.path.join(sample_dir, dir_name + ".h5")
self.__samples.append(
_BlissSample(
if acqui_type == "basic":
acqui_tomo = _BlissBasicTomo(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
......@@ -81,7 +86,23 @@ class MockBlissAcquisition:
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
)
)
elif acqui_type == "zseries":
if z_values is None:
raise ValueError("for zseries z_values should be provided")
acqui_tomo = _BlissZseriesTomo(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
with_nx_detector_attr=with_nx_detector_attr,
detector_name=detector_name,
z_values=z_values,
)
else:
raise NotImplementedError("")
self.__samples.append(acqui_tomo)
@property
def samples(self):
......@@ -114,20 +135,20 @@ class _BlissSample:
detector_name,
with_nx_detector_attr=True,
):
self.__with_nx_detector_attr = with_nx_detector_attr
self.__sample_dir = sample_dir
self.__sample_file = sample_file
self.__n_sequence = n_sequence
self.__n_scan_per_seq = n_scan_per_sequence
self.__n_darks = n_darks
self.__n_flats = n_flats
self.__scan_folders = []
self._with_nx_detector_attr = with_nx_detector_attr
self._sample_dir = sample_dir
self._sample_file = sample_file
self._n_sequence = n_sequence
self._n_scan_per_seq = n_scan_per_sequence
self._n_darks = n_darks
self._n_flats = n_flats
self._scan_folders = []
self._index = 1
self.__detector_name = detector_name
self.__det_width = 64
self.__det_height = 64
self.__n_frame_per_scan = 10
self.__energy = 19.0
self._detector_name = detector_name
self._det_width = 64
self._det_height = 64
self._n_frame_per_scan = 10
self._energy = 19.0
for i_sequence in range(n_sequence):
self.add_sequence()
......@@ -136,6 +157,9 @@ class _BlissSample:
self._index += 1
return idx
def get_main_entry_title(self):
raise NotImplementedError("Base class")
@staticmethod
def get_title(scan_type):
if scan_type == "dark":
......@@ -147,108 +171,97 @@ class _BlissSample:
else:
raise ValueError("Not implemented")
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
def create_entry_and_energy(self, seq_ini_index):
# add sequence init information
with h5py.File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(seq_ini_index) + ".1")
seq_node.attrs["NX_class"] = u"NXentry"
seq_node["title"] = "tomo:fullturn"
seq_node["title"] = self.get_main_entry_title()
seq_node.require_group("instrument/positioners")
# write energy
seq_node["technique/scan/energy"] = self.__energy
def register_scan_in_parent_seq(parent_index, scan_index):
with h5py.File(self.sample_file, mode="a") as h5f:
# write scan numbers
seq_node = h5f.require_group(str(parent_index) + ".1")
scan_number_node = seq_node.require_group("measurement/scan_numbers")
if "data" in scan_number_node:
res = list(scan_number_node["data"][()])
del scan_number_node["data"]
else:
res = []
res.append(scan_index)
scan_number_node["data"] = res
def add_scan(scan_type):
scan_idx = self.get_next_free_index()
scan_name = str(scan_idx).zfill(4)
scan_path = os.path.join(self.path, scan_name)
self.__scan_folders.append(
_BlissScan(folder=scan_path, scan_type=scan_type)
)
register_scan_in_parent_seq(parent_index=seq_ini_index, scan_index=scan_idx)
# register the scan information
with h5py.File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(scan_idx) + ".1")
# write title
title = self.get_title(scan_type=scan_type)
seq_node["title"] = title
# write data
data = (
numpy.random.random(
self.__det_height * self.__det_width * self.__n_frame_per_scan
)
* 256
)
data = data.reshape(
self.__n_frame_per_scan, self.__det_height, self.__det_width
)
data = data.astype(numpy.uint16)
det_path_1 = "/".join(("instrument", self.__detector_name))
det_grp = seq_node.require_group(det_path_1)
det_grp["data"] = data
if self.__with_nx_detector_attr:
det_grp.attrs["NX_class"] = "NXdetector"
acq_grp = det_grp.require_group("acq_parameters")
acq_grp["acq_expo_time"] = 4
det_path_2 = "/".join(("technique", "scan", self.__detector_name))
seq_node[det_path_2] = data
seq_node.attrs["NX_class"] = u"NXentry"
# write rotation angle value and translations
hrsrot_pos = seq_node.require_group(
"/".join(("instrument", "positioners"))
)
hrsrot_pos["hrsrot"] = numpy.random.randint(
low=0.0, high=360, size=self.__n_frame_per_scan
)
hrsrot_pos["sx"] = numpy.array(
numpy.random.random(size=self.__n_frame_per_scan)
)
hrsrot_pos["sy"] = numpy.random.random(size=self.__n_frame_per_scan)
hrsrot_pos["sz"] = numpy.random.random(size=self.__n_frame_per_scan)
seq_node["technique/scan/energy"] = self._energy
if self.n_darks > 0:
add_scan(scan_type="dark")
if self.__n_flats > 0:
add_scan(scan_type="flat")
def register_scan_in_parent_seq(self, parent_index, scan_index):
with h5py.File(self.sample_file, mode="a") as h5f:
# write scan numbers
seq_node = h5f.require_group(str(parent_index) + ".1")
scan_number_node = seq_node.require_group("measurement/scan_numbers")
if "data" in scan_number_node:
res = list(scan_number_node["data"][()])
del scan_number_node["data"]
else:
res = []
res.append(scan_index)
scan_number_node["data"] = res
def add_scan(self, scan_type, seq_ini_index, z_value):
scan_idx = self.get_next_free_index()
scan_name = str(scan_idx).zfill(4)
scan_path = os.path.join(self.path, scan_name)
self._scan_folders.append(_BlissScan(folder=scan_path, scan_type=scan_type))
self.register_scan_in_parent_seq(
parent_index=seq_ini_index, scan_index=scan_idx
)
# register the scan information
with h5py.File(self.sample_file, mode="a") as h5f:
seq_node = h5f.require_group(str(scan_idx) + ".1")
# write title
title = self.get_title(scan_type=scan_type)
seq_node["title"] = title
# write data
data = (
numpy.random.random(
self._det_height * self._det_width * self._n_frame_per_scan
)
* 256
)
data = data.reshape(
self._n_frame_per_scan, self._det_height, self._det_width
)
data = data.astype(numpy.uint16)
det_path_1 = "/".join(("instrument", self._detector_name))
det_grp = seq_node.require_group(det_path_1)
det_grp["data"] = data
if self._with_nx_detector_attr:
det_grp.attrs["NX_class"] = "NXdetector"
acq_grp = det_grp.require_group("acq_parameters")
acq_grp["acq_expo_time"] = 4
det_path_2 = "/".join(("technique", "scan", self._detector_name))
seq_node[det_path_2] = data
seq_node.attrs["NX_class"] = u"NXentry"
# write rotation angle value and translations
hrsrot_pos = seq_node.require_group("/".join(("instrument", "positioners")))
hrsrot_pos["hrsrot"] = numpy.random.randint(
low=0.0, high=360, size=self._n_frame_per_scan
)
hrsrot_pos["sx"] = numpy.array(
numpy.random.random(size=self._n_frame_per_scan)
)
hrsrot_pos["sy"] = numpy.random.random(size=self._n_frame_per_scan)
hrsrot_pos["sz"] = numpy.asarray([z_value] * self._n_frame_per_scan)
for i_proj_seq in range(self.__n_scan_per_seq):
add_scan(scan_type="projection")
def add_sequence(self):
"""Add a sequence to the bliss file"""
raise NotImplementedError("Base class")
@property
def path(self):
return self.__sample_dir
return self._sample_dir
@property
def sample_directory(self):
return self.__sample_dir
return self._sample_dir
@property
def sample_file(self):
return self.__sample_file
return self._sample_file
def scans_folders(self):
return self.__scan_folders
return self._scan_folders
@property
def n_darks(self):
return self.__n_darks
return self._n_darks
class _BlissScan:
......@@ -262,3 +275,78 @@ class _BlissScan:
def path(self):
return self.__path
class _BlissBasicTomo(_BlissSample):
def get_main_entry_title(self):
return "tomo:fullturn"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_energy(seq_ini_index=seq_ini_index)
if self.n_darks > 0:
self.add_scan(scan_type="dark", seq_ini_index=seq_ini_index, z_value=1)
if self._n_flats > 0:
self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1)
for i_proj_seq in range(self._n_scan_per_seq):
self.add_scan(
scan_type="projection", seq_ini_index=seq_ini_index, z_value=1
)
class _BlissZseriesTomo(_BlissSample):
def __init__(
self,
sample_dir,
sample_file,
n_sequence,
n_scan_per_sequence,
n_darks,
n_flats,
detector_name,
z_values,
with_nx_detector_attr=True,
):
self._z_values = z_values
super().__init__(
sample_dir=sample_dir,
sample_file=sample_file,
n_sequence=n_sequence,
n_scan_per_sequence=n_scan_per_sequence,
n_darks=n_darks,
n_flats=n_flats,
detector_name=detector_name,
with_nx_detector_attr=with_nx_detector_attr,
)
def get_main_entry_title(self):
return "tomo:zseries"
def add_sequence(self):
# reserve the index for the 'initialization' sequence. No scan folder
# will be created for this one.
seq_ini_index = self.get_next_free_index()
self.create_entry_and_energy(seq_ini_index=seq_ini_index)
for z_value in self._z_values:
if self.n_darks > 0:
self.add_scan(
scan_type="dark", seq_ini_index=seq_ini_index, z_value=z_value
)
if self._n_flats > 0:
self.add_scan(
scan_type="flat", seq_ini_index=seq_ini_index, z_value=z_value
)
for i_proj_seq in range(self._n_scan_per_seq):
self.add_scan(
scan_type="projection", seq_ini_index=seq_ini_index, z_value=z_value
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment