Commit 2bc2c976 authored by payno's avatar payno
Browse files

Merge branch 'master' of gitlab.esrf.fr:tomotools/tomoscan

parents ec0f7456 ff26348e
Pipeline #22669 passed with stages
in 2 minutes and 12 seconds
tomoscan
========
tomoscan aims to provide an unified interface to read tomography data from .edf or .hdf5 (NXtomo) acquisitions.
.. toctree::
:maxdepth: 4
......
......@@ -22,6 +22,8 @@
#
#############################################################################*/
"""contains EDFTomoScan, class to be used with EDF acquisition"""
__authors__ = ["H.Payno"]
__license__ = "MIT"
......@@ -38,7 +40,7 @@ import io
from typing import Union
from ..scanbase import TomoScanBase
from .utils import get_parameters_frm_par_or_info, extract_urls_from_edf
from ..unitsystem import metricsystem
from ..unitsystem.metricsystem import MetricSystem
from ..utils import docstring
import logging
......@@ -84,8 +86,25 @@ class EDFTomoScan(TomoScanBase):
self.__ref_on = None
self.__scan_range = None
self._edf_n_frames = n_frames
self.__distance = None
self.__energy = None
self.update()
@docstring(TomoScanBase.clear_caches)
def clear_caches(self):
self._darks = None
self._flats = None
self._projections = None
self.__tomo_n = None
self.__ref_n = None
self.__dark_n = None
self.__dim1 = None
self.__dim2 = None
self.__pixel_size = None
self.__ref_on = None
self.__scan_range = None
@docstring(TomoScanBase.tomo_n)
@property
def tomo_n(self) -> Union[None, int]:
......@@ -116,7 +135,7 @@ class EDFTomoScan(TomoScanBase):
:rtype: float
"""
if self.__pixel_size is None:
self.__pixel_size = EDFTomoScan.get_pixel_size(scan=self.path)
self.__pixel_size = EDFTomoScan._get_pixel_size(scan=self.path)
return self.__pixel_size
@property
......@@ -190,7 +209,7 @@ class EDFTomoScan(TomoScanBase):
def is_abort(self, **kwargs) -> bool:
abort_file = os.path.basename(self.path) + self.ABORT_FILE
abort_file = os.path.join(self.path, abort_file)
if 'src_pattern' in kwargs and kwargs['src_pattern' is not None]:
if 'src_pattern' in kwargs and kwargs['src_pattern'] is not None:
assert 'dest_pattern' in kwargs
abort_file = abort_file.replace(kwargs['src_pattern'],
kwargs['dest_pattern'])
......@@ -218,9 +237,10 @@ class EDFTomoScan(TomoScanBase):
@docstring(TomoScanBase.update)
def update(self):
self.projections = EDFTomoScan.get_proj_urls(self.path, n_frames=self._edf_n_frames)
self._darks = EDFTomoScan.get_darks_url(self.path)
self._flats = EDFTomoScan.get_refs_url(self.path)
if self.path is not None:
self.projections = EDFTomoScan.get_proj_urls(self.path, n_frames=self._edf_n_frames)
self._darks = EDFTomoScan.get_darks_url(self.path)
self._flats = EDFTomoScan.get_refs_url(self.path)
@docstring(TomoScanBase.load_from_dict)
def load_from_dict(self, desc: Union[dict, io.TextIOWrapper]):
......@@ -228,11 +248,11 @@ class EDFTomoScan(TomoScanBase):
data = json.load(desc)
else:
data = desc
if not (self._DICT_TYPE_KEY in data and data[self._DICT_TYPE_KEY] == self._TYPE):
if not (self.DICT_TYPE_KEY in data and data[self.DICT_TYPE_KEY] == self._TYPE):
raise ValueError('Description is not an EDFScan json description')
assert self._DICT_PATH_KEY in data
self.path = data[self._DICT_PATH_KEY]
assert self.DICT_PATH_KEY in data
self.path = data[self.DICT_PATH_KEY]
return self
@staticmethod
......@@ -303,8 +323,7 @@ class EDFTomoScan(TomoScanBase):
def extract_index(my_str, type_):
res = []
modified_str = copy.copy(my_str)
while modified_str[-1].isdigit():
while modified_str != "" and modified_str[-1].isdigit():
res.append(modified_str[-1])
modified_str = modified_str[:-1]
if len(res) == 0:
......@@ -315,7 +334,7 @@ class EDFTomoScan(TomoScanBase):
return int(''.join(orignalOrder)), modified_str
else:
return float('.'.join(('0', ''.join(orignalOrder)))), modified_str
_file = os.path.basename(_file)
if _file.endswith('.edf'):
name = _file.replace(basename, '', 1)
name = name.rstrip('.edf')
......@@ -330,7 +349,10 @@ class EDFTomoScan(TomoScanBase):
if part_1 is None:
return None
if part_2 is None:
return int(part_1)
if part_1 is None:
return None
else:
return int(part_1)
else:
return float(part_1) + part_2
else:
......@@ -392,8 +414,34 @@ class EDFTomoScan(TomoScanBase):
return d1, d2
@property
@docstring(TomoScanBase.distance)
def distance(self) -> Union[None, float]:
if self.__distance is None:
self.__distance = EDFTomoScan.retrieve_information(self.path,
None, "Distance",
type_=float,
key_aliases=('distance', )
)
if self.__distance is None:
return None
else:
return self.__distance * MetricSystem.MILLIMETER.value
@property
@docstring(TomoScanBase.energy)
def energy(self):
if self.__energy is None:
self.__energy = EDFTomoScan.retrieve_information(self.path,
None,
"Energy",
type_=float,
key_aliases=('energy', )
)
return self.__energy
@staticmethod
def get_pixel_size(scan: str) -> Union[None, int]:
def _get_pixel_size(scan: str) -> Union[None, float]:
if os.path.isdir(scan) is False:
return None
value = EDFTomoScan.retrieve_information(scan=scan,
......@@ -413,7 +461,7 @@ class EDFTomoScan(TomoScanBase):
# for now pixel size are stored in microns.
# We want to return them in meter
if value is not None:
return value * metricsystem.micrometer
return value * MetricSystem.MICROMETER.value
else:
return None
......
......@@ -22,6 +22,8 @@
#
#############################################################################*/
"""contains EDFTomoScan, class to be used with HDF5 acquisition"""
__authors__ = ["H.Payno"]
__license__ = "MIT"
......@@ -35,12 +37,24 @@ import os
import h5py
import numpy
from silx.io.url import DataUrl
from silx.utils.enum import Enum as _Enum
from tomoscan.utils import docstring
from silx.io.utils import get_data
from ..unitsystem import metricsystem
import logging
import typing
_logger = logging.getLogger(__name__)
class ImageKey(_Enum):
ALIGNMENT = -1
PROJECTION = 0
FLAT_FIELD = 1
DARK_FIELD = 2
INVALID = 3
class HDF5TomoScan(TomoScanBase):
"""
This is the implementation of a TomoBase class for an acquisition stored
......@@ -53,79 +67,222 @@ class HDF5TomoScan(TomoScanBase):
replace progressively by properties at the 'TomoBase' level
:param scan: scan directory or scan masterfile.h5
:type: Union[str,None]
:param Union[str, None] entry: name of the NXtomo entry to select. If given
index is ignored.
:param Union[int, None] index: of the NXtomo entry to select. Ignored if
an entry is specified. For consistency
entries are ordered alphabetically
"""
_TYPE = 'hdf5'
_DEFAULT_ENTRY = '1_tomo'
_DICT_ENTRY_KEY = 'entry'
_PROJ_PATH = 'measurement/pcoedge64:image'
_PROJ_PATH = 'instrument/detector/data'
_SCAN_META_PATH = 'scan_meta/technique/scan'
_DET_META_PATH = 'scan_meta/technique/detector'
_ROTATION_ANGLE_PATH = 'sample/rotation_angle'
_IMG_KEY_PATH = 'instrument/detector/image_key'
_IMG_KEY_CONTROL_PATH = 'instrument/detector/image_key_control'
_X_PIXEL_SIZE_PATH = 'instrument/detector/x_pixel_size'
_Y_PIXEL_SIZE_PATH = 'instrument/detector/y_pixel_size'
_X_PIXEL_MAG_SIZE_PATH = 'instrument/detector/x_magnified_pixel_size'
_Y_PIXEL_MAG_SIZE_PATH = 'instrument/detector/y_magnified_pixel_size'
_DISTANCE_PATH = 'instrument/detector/distance'
_ENERGY_PATH = 'beam/incident_energy'
_SCHEME = 'silx'
def __init__(self, scan, entry=None):
_EPSILON_ROT_ANGLE = 0.02
def __init__(self, scan: str, entry: str = None,
index: typing.Union[int, None] = 0):
if entry is not None:
index = None
# if the user give the master file instead of the scan dir...
if scan is not None:
if os.path.isfile(scan):
self.master_file = scan
scan = os.path.dirname(scan)
else:
self.master_file = self.get_master_file(scan)
self.master_file = os.path.join(scan, os.path.basename(scan))
if os.path.exists(self.master_file + '.hdf5'):
if os.path.exists(self.master_file + '.nx'):
self.master_file = self.master_file + '.nx'
elif os.path.exists(self.master_file + '.hdf5'):
self.master_file = self.master_file + '.hdf5'
else:
elif os.path.exists(self.master_file + '.h5'):
self.master_file = self.master_file + '.h5'
else:
self.master_file = self.master_file + '.nx'
else:
self.master_file = None
super(HDF5TomoScan, self).__init__(scan=scan, type_=HDF5TomoScan._TYPE)
self._entry = entry or HDF5TomoScan._DEFAULT_ENTRY
if scan is None:
self._entry = None
else:
self._entry = entry or self._get_entry_at(index=index,
file_path=self.master_file)
if self._entry is None:
raise ValueError('unable to find a valid entry for %s' % self.master_file)
# for now the default entry is 1_tomo but should change with time
# data caches
self._projections = None
self._flats = None
self._darks = None
self._tomo_n = None
# number of projections / radios
self._dark_n = None
# number of dark image made during acquisition
self._ref_n = None
# number of flat field made during acquisition
self._ref_on = None
# when the last flat field is process
self._scan_range = None
# scan range, in degree
self._dim_1, self._dim_2 = None, None
# image dimensions
self._pixel_size = None
self._x_pixel_size = None
self._y_pixel_size = None
# pixel dimensions (tuple)
self._frames = None
self._image_keys = None
self._image_keys_control = None
self._rotation_angles = None
self._distance = None
self._energy = None
@staticmethod
def get_master_file(scan_path):
if os.path.isfile(scan_path):
master_file = scan_path
else:
master_file = os.path.join(scan_path, os.path.basename(scan_path))
if os.path.exists(master_file + '.nx'):
master_file = master_file + '.nx'
elif os.path.exists(master_file + '.hdf5'):
master_file = master_file + '.hdf5'
elif os.path.exists(master_file + '.h5'):
master_file = master_file + '.h5'
else:
master_file = master_file + '.nx'
return master_file
@docstring(TomoScanBase.clear_caches)
def clear_caches(self) -> None:
self._projections = None
self._flats = None
self._darks = None
self._tomo_n = None
self._dark_n = None
self._ref_n = None
self._scan_range = None
self._dim_1, self._dim_2 = None, None
self._x_pixel_size = None
self._y_pixel_size = None
self._x_magnified_pixel_size = None
self._y_magnified_pixel_size = None
self._rotation_angles = None
self._distance = None
self._image_keys_control = None
@staticmethod
def _get_entry_at(index: int, file_path: str) -> str:
"""
:param index:
:param file_path:
:return:
"""
entries = HDF5TomoScan.get_valid_entries(file_path)
if len(entries) == 0:
return None
else:
return entries[index]
@docstring(TomoScanBase.is_tomoscan_dir)
@staticmethod
def is_tomoscan_dir(directory, **kwargs):
master_file_base = os.path.join(directory, os.path.basename(directory))
if os.path.exists(master_file_base + '.hdf5'):
def get_valid_entries(file_path: str) -> tuple:
"""
return the list of 'Nxtomo' entries at the root level
:param str file_path:
:return: list of valid Nxtomo node (ordered alphabetically)
:rtype: tuple
..note: entries are sorted to insure consistency
"""
res = []
res_buf = []
if not os.path.isfile(file_path):
raise ValueError('given file path should be a file')
with h5py.File(file_path, 'r') as h5f:
for root_node in h5f.keys():
node = h5f[root_node]
if HDF5TomoScan.node_is_nxtomo(node) is True:
res_buf.append(root_node) # cannnot be node because of sym links
[res.append(node) for node in res_buf]
res.sort()
return tuple(res)
@staticmethod
def node_is_nxtomo(node: h5py.Group) -> bool:
"""check if the given h5py node is an nxtomo node or not"""
if 'NX_class' in node.attrs or 'NXclass' in node.attrs:
_logger.info(node.name + ' is recognized as an nx class.')
else:
_logger.info(node.name + ' is node an nx class.')
return False
if 'definition' in node.attrs and node.attrs['definition'].lower() == 'nxtomo':
_logger.info(node.name + ' is recognized as an NXtomo class.')
return True
else:
return os.path.exists(master_file_base + '.h5')
return False
@docstring(TomoScanBase.is_tomoscan_dir)
@staticmethod
def is_tomoscan_dir(directory: str, **kwargs) -> bool:
if os.path.isfile(directory):
master_file = directory
else:
master_file = HDF5TomoScan.get_master_file(scan_path=directory)
if master_file:
entries = HDF5TomoScan.get_valid_entries(file_path=master_file)
return len(entries) > 0
@docstring(TomoScanBase.is_abort)
def is_abort(self, **kwargs):
# for now there is no abort definition in .hdf5
return False
@docstring(TomoScanBase.to_dict)
def to_dict(self) -> dict:
res = super().to_dict()
res[self._DICT_ENTRY_KEY] = self.entry
return res
@staticmethod
def from_dict(_dict):
def from_dict(_dict: dict):
scan = HDF5TomoScan(scan=None)
scan.load_from_dict(_dict=_dict)
return scan
@docstring(TomoScanBase.load_from_dict)
def load_from_dict(self, _dict):
def load_from_dict(self, _dict: dict) -> TomoScanBase:
"""
:param _dict:
......@@ -135,171 +292,400 @@ class HDF5TomoScan(TomoScanBase):
data = json.load(_dict)
else:
data = _dict
if not (self._DICT_TYPE_KEY in data and data[self._DICT_TYPE_KEY] == self._TYPE):
raise ValueError('Description is not an EDFScan json description')
assert self._DICT_PATH_KEY in data
self.path = data[self._DICT_PATH_KEY]
if not (self.DICT_TYPE_KEY in data and data[self.DICT_TYPE_KEY] == self._TYPE):
raise ValueError('Description is not an HDF5Scan json description')
if HDF5TomoScan._DICT_ENTRY_KEY not in data:
raise ValueError('No hdf5 entry specified')
assert self.DICT_PATH_KEY in data
self.path = data[self.DICT_PATH_KEY]
self._entry = data[self._DICT_ENTRY_KEY]
return self
@property
def entry(self) -> str:
return self._entry
@property
@docstring(TomoScanBase.projections)
def projections(self):
if self._projections is None or len(self._projections) != self.tomo_n:
self.update()
def projections(self) -> typing.Union[dict, None]:
"""projections / radio, does not include the return projections"""
if self._projections is None:
if self.frames:
proj_frames = tuple(filter(lambda x: x.image_key == ImageKey.PROJECTION and x.is_control == False, self.frames))
self._projections = {}
for proj_frame in proj_frames:
self._projections[proj_frame.index] = proj_frame.url
return self._projections
@projections.setter
def projections(self, projections):
def projections(self, projections: dict):
self._projections = projections
@property
@docstring(TomoScanBase.darks)
def darks(self):
raise NotImplementedError('ref path not defined yet for hdf5')
def darks(self) -> typing.Union[dict, None]:
if self._darks is None:
if self.frames:
dark_frames = tuple(filter(lambda x: x.image_key == ImageKey.DARK_FIELD, self.frames))
self._darks = {}
for dark_frame in dark_frames:
self._darks[dark_frame.index] = dark_frame.url
return self._darks
@property
@docstring(TomoScanBase.flats)
def flats(self):
raise NotImplementedError('ref path not defined yet for hdf5')
def flats(self) -> typing.Union[dict, None]:
if self._flats is None:
if self.frames:
flat_frames = tuple(filter(lambda x: x.image_key == ImageKey.FLAT_FIELD, self.frames))
self._flats = {}
for flat_frame in flat_frames:
self._flats[flat_frame.index] = flat_frame.url
return self._flats
@docstring(TomoScanBase.update)
def update(self):
def update(self) -> None:
"""update list of radio and reconstruction by parsing the scan folder
"""
if not os.path.exists(self.master_file):
if self.master_file is None or not os.path.exists(self.master_file):
return
self.projections = self._get_projections_url()
# TODO: update darks and flats too
@docstring(TomoScanBase.get_proj_angle_url)
def _get_projections_url(self):
"""
:param path:
:return: list of url
"""
if self.master_file is None or not os.path.exists(self.master_file):
return
with h5py.File(self.master_file, 'r') as h5_file:
frames = self.frames
if frames is not None:
urls = {}
if (self._entry in h5_file and
HDF5TomoScan._PROJ_PATH in h5_file[self._entry]):
image = h5_file[self._entry][HDF5TomoScan._PROJ_PATH]
def get_reader(extension):
extension = extension.lower()
if extension == 'edf':
return 'fabio'
elif extension == 'hdf5':
return 'silx'
else:
mess = ' '.join(('extension', extension,
'unrecognized to define a reader'))
_logger.warning(mess)
return None
for i_slice, slice_data in enumerate(image):
# if fit hdf5 mock
if isinstance(slice_data, numpy.ndarray) and slice_data.ndim == 2:
data_path = '/'.join((self._entry, self._PROJ_PATH))
silx_url = DataUrl(file_path=self.master_file,
data_path=data_path,
data_slice=(i_slice,),
scheme='silx')
else:
# if fit the bcu prototype
scheme = get_reader(slice_data[1])
file_path = slice_data[4]
data_path = '/'.join((slice_data[3], 'measurement', 'pcoedge64', 'data'))
slice_number = slice_data[2]
silx_url = DataUrl(file_path=file_path, data_path=data_path,
data_slice=(slice_number,), scheme=scheme)
urls[i_slice] = silx_url
for frame in frames:
if frame.image_key is ImageKey.PROJECTION:
urls[frame.index] = frame.url
return urls
else:
return None
@docstring(TomoScanBase.tomo_n)
@property
def tomo_n(self):
if self._tomo_n is None and self.master_file and os.path.exists(self.master_file):
def tomo_n(self) -> typing.Union[None, int]:
"""we are making two asumptions for computing tomo_n:
- if a rotation = scan_range +/- EPSILON this is a return projection
- The delta between each projections is constant
"""
if (self._tomo_n is None and self.master_file and
os.path.exists(self.master_file)):
if self.projections:
return len(self.projections)
else:
return None
else:
return None
@property
def return_projs(self) -> typing.Union[None, list]:
""""""
frames = self.frames
if frames:
return_frames = list(filter(lambda x: x.is_control == True, frames))
return return_frames
else:
return None
@property
def rotation_angle(self) -> typing.Union[None, list]: