Skip to content
Snippets Groups Projects
Commit e6376dd0 authored by Jerome Lesaint's avatar Jerome Lesaint
Browse files

Merge branch 'fluo2d' into 'main'

WIP: Add handling of 2D XRFDATA. Angles in 2D still to handle

See merge request !248
parents e4322e68 85392b6c
No related branches found
No related tags found
1 merge request!248WIP: Add handling of 2D XRFDATA. Angles in 2D still to handle
Pipeline #217516 passed
"""module dedicated to esrf scans"""
from .scan.edfscan import EDFTomoScan # noqa F401
from .scan.fluoscan import FluoTomoScan # noqa F401
from .scan.fluoscan import FluoTomoScan3D # noqa F401
from .scan.fluoscan import FluoTomoScan2D # noqa F401
from .scan.nxtomoscan import NXtomoScan # noqa F401
from .scan.nxtomoscan import HDF5TomoScan # noqa F401
from .volume.edfvolume import EDFVolume # noqa F401
......
......@@ -15,6 +15,7 @@ from tomoscan.utils import docstring
from dataclasses import dataclass, field
import numpy as np
from numpy.typing import NDArray, DTypeLike
from silx.io.utils import h5py_read_dataset
......@@ -38,13 +39,11 @@ except ImportError as e:
else:
has_imageio = True
__all__ = [
"FluoTomoScan",
]
__all__ = ["FluoTomoScanBase", "FluoTomoScan3D", "FluoTomoScan2D"]
@dataclass
class FluoTomoScan:
class FluoTomoScanBase:
"""Dataset manipulation class."""
scan: str
......@@ -59,10 +58,12 @@ class FluoTomoScan:
el_lines: dict[str, list[str]] = field(default_factory=dict)
pixel_size: float | None = None
energy: float | None = None
detected_folders: list[str] = field(default_factory=list)
def __post_init__(self):
self.detected_folders = self.detect_folders()
self.detected_detectors = tuple(self.detect_detectors())
self.detect_pixel_size_and_energy()
self.get_metadata_from_h5_file()
_logger.info(f"Detectors: {self.detected_detectors}")
if len(self.detectors) == 0:
self.detectors = self.detected_detectors
......@@ -77,7 +78,11 @@ class FluoTomoScan:
self.skip_angle_inds = numpy.array(self.skip_angle_inds)
if self.angles is None:
self.detect_rot_angles()
self.angles = self.detect_rot_angles()
def detect_folders(self) -> list[str]:
"""List all folders to process."""
raise NotImplementedError("Base class")
@property
def rot_angles_deg(self) -> NDArray:
......@@ -93,71 +98,58 @@ class FluoTomoScan:
def rot_angles_rad(self) -> NDArray:
return numpy.deg2rad(self.rot_angles_deg)
def detect_rot_angles(self):
tmp_angles = []
proj_ind = 0
while True:
proj_ind += 1
prj_dir = os.path.join(
self.scan, "fluofit", self.dataset_basename + "_%03d" % proj_ind
def detect_rot_angles(self) -> None:
"""Build rotation angles list."""
raise NotImplementedError("Base class")
def _check_ready_to_load_data(self, det):
if not has_imageio:
raise RuntimeError("imageio not install. Cannot load data.")
if det not in self.detectors:
raise RuntimeError(
f"The detector {det} is invalid. Valid ones are {self.detectors}"
)
info_file = os.path.join(prj_dir, "info.txt")
if not os.path.exists(info_file):
_logger.debug(
f"{info_file} doesn't exist, while expected to be present in each projection folder."
)
break
with open(info_file, "r") as f:
info_str = f.read()
tmp_angles.append(float(info_str.split(" ")[2]))
_logger.info(
f"Found angle information in info file for {proj_ind} projections."
)
self.angles = numpy.array(tmp_angles, ndmin=1, dtype=numpy.float32)
if self.angles is None:
raise RuntimeError("Rotation angles not initilized")
if self.detectors is None:
raise RuntimeError("Detectors not initialized")
def detect_pixel_size_and_energy(self):
pixel_size_path = os.path.join(self.scan, self.dataset_basename + "_%03d" % 1)
h5_files = glob.glob1(pixel_size_path, "*.h5")
def get_metadata_from_h5_file(self):
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to load metadata")
h5_path = os.path.join(self.scan, self.detected_folders[0])
h5_files = glob.glob1(h5_path, "*.h5")
if len(h5_files) > 1:
raise ValueError(
"More than one hdf5 file in scan directory. Expect only ONE to pick pixel size."
)
elif len(h5_files) == 0:
pattern = os.path.join(pixel_size_path, "*.h5")
pattern = os.path.join(h5_path, "*.h5")
raise ValueError(
f"Unable to find the hdf5 file in scan directory to pick pixel size. RegExp used is {pattern}"
)
else:
try:
if "3DXRF" in h5_files[0]:
sample_name = h5_files[0].split("_3DXRF_")[0].split("-")[1]
with h5py.File(os.path.join(h5_path, h5_files[0]), "r") as f:
if len(list(f.keys())) != 1:
raise ValueError(
f"H5 file should contain only one entry, found {len(list(f.keys()))}"
)
else:
sample_name = h5_files[0].split("_XRF_")[0].split("-")[1]
except IndexError:
raise ValueError(
f"unable to deduce sample name from {h5_files[0]}. Expected synthax is 'proposal-samplename_XRF_XXX.h5'"
)
entry_name = (
"entry_0000: "
+ sample_name
+ " - "
+ self.dataset_basename
+ "_%03d" % 1
)
with h5py.File(os.path.join(pixel_size_path, h5_files[0]), "r") as f:
self.pixel_size = (
h5py_read_dataset(f[entry_name]["FLUO"]["pixelSize"]) * 1e-6
)
self.energy = float(
h5py_read_dataset(
f[entry_name]["instrument"]["monochromator"]["energy"]
)
)
entry_name = list(f.keys())[0]
self.pixel_size = (
h5py_read_dataset(f[entry_name]["FLUO"]["pixelSize"]) * 1e-6
)
self.energy = float(
h5py_read_dataset(
f[entry_name]["instrument"]["monochromator"]["energy"]
)
)
def detect_detectors(self):
proj_1_dir = os.path.join(
self.scan, "fluofit", self.dataset_basename + "_%03d" % 1
)
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to detect detectors")
proj_1_dir = os.path.join(self.scan, "fluofit", self.detected_folders[0])
detected_detectors = []
file_names = glob.glob1(proj_1_dir, "IMG_*area_density_ngmm2.tif")
for file in file_names:
......@@ -167,9 +159,9 @@ class FluoTomoScan:
return detected_detectors
def detect_elements(self):
proj_1_dir = os.path.join(
self.scan, "fluofit", self.dataset_basename + "_%03d" % 1
)
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to detect elements")
proj_1_dir = os.path.join(self.scan, "fluofit", self.detected_folders[0])
detector = self.detectors[0]
file_names = glob.glob1(proj_1_dir, f"IMG_{detector}*area_density_ngmm2.tif")
for file in file_names:
......@@ -182,18 +174,61 @@ class FluoTomoScan:
except KeyError:
self.el_lines[element] = [line]
def load_data(self, det: str, element: str, line_ind: int = 0):
if not has_imageio:
raise RuntimeError("imageio not install. Cannot load data.")
if det not in self.detectors:
raise RuntimeError(
f"The detector {det} is invalid. Valid ones are {self.detectors}"
def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
"""Main function of class to load data."""
raise NotImplementedError("Base class")
@staticmethod
def from_identifier(identifier):
"""Return the Dataset from a identifier"""
raise NotImplementedError("Not implemented for fluo-tomo yet.")
@docstring(TomoScanBase)
def get_identifier(self) -> ScanIdentifier:
raise NotImplementedError("Not implemented for fluo-tomo yet.")
@dataclass
class FluoTomoScan3D(FluoTomoScanBase):
"""Dataset manipulation class."""
def detect_folders(self):
fit_folders = glob.glob1(
os.path.join(self.scan, "fluofit"), rf"{self.dataset_basename}_projection*"
)
if len(fit_folders) == 0:
raise FileNotFoundError(
"No projection was found in the fluofit folder. The searched for pattern is <scan_dir>/fluofit/<dataset_basename>_projection*'."
)
if self.angles is None:
raise RuntimeError("Rotation angles not initilized")
elif not os.path.isdir(os.path.join(self.scan, fit_folders[0])):
raise FileNotFoundError(
"Found fitted data folders but not the corresponding raw data folder."
)
else:
return fit_folders
if self.detectors is None:
raise RuntimeError("Detectors not initialized")
def detect_rot_angles(self):
tmp_angles = []
for f in self.detected_folders:
prj_dir = os.path.join(self.scan, "fluofit", f)
info_file = os.path.join(prj_dir, "info.txt")
if not os.path.exists(info_file):
_logger.debug(
f"{info_file} doesn't exist, while expected to be present in each projection folder."
)
break
with open(info_file, "r") as f:
info_str = f.read()
tmp_angles.append(float(info_str.split(" ")[2]))
_logger.info(
f"Found angle information in info file for {len(tmp_angles)} projections."
)
return numpy.array(tmp_angles, ndmin=1, dtype=numpy.float32)
def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
self._check_ready_to_load_data(det)
line = self.el_lines[element][line_ind]
......@@ -215,7 +250,9 @@ class FluoTomoScan:
continue
proj_dir = os.path.join(
self.scan, "fluofit", self.dataset_basename + "_%03d" % (ii_i + 1)
self.scan,
"fluofit",
self.dataset_basename + "_projection_%03d" % (ii_i + 1),
)
img_path = os.path.join(
proj_dir, f"IMG_{det}_{element}-{line}_area_density_ngmm2.tif"
......@@ -230,11 +267,94 @@ class FluoTomoScan:
data = numpy.array(data_det)
return numpy.ascontiguousarray(data)
@staticmethod
def from_identifier(identifier):
"""Return the Dataset from a identifier"""
raise NotImplementedError("Not implemented for fluo-tomo yet.")
@docstring(TomoScanBase)
def get_identifier(self) -> ScanIdentifier:
raise NotImplementedError("Not implemented for fluo-tomo yet.")
@dataclass
class FluoTomoScan2D(FluoTomoScanBase):
"""Dataset manipulation class."""
def get_metadata_from_h5_file(self):
super().get_metadata_from_h5_file()
h5_path = os.path.join(self.scan, self.detected_folders[0])
h5_files = glob.glob1(h5_path, "*.h5")
if len(h5_files) > 1:
raise ValueError(
"More than one hdf5 file in scan directory. Expect only ONE to pick pixel size."
)
elif len(h5_files) == 0:
pattern = os.path.join(h5_path, "*.h5")
raise ValueError(
f"Unable to find the hdf5 file in scan directory to pick pixel size. RegExp used is {pattern}"
)
else:
with h5py.File(os.path.join(h5_path, h5_files[0]), "r") as f:
if len(list(f.keys())) != 1:
raise ValueError(
f"H5 file should contain only one entry, found {len(list(f.keys()))}"
)
else:
entry_name = list(f.keys())[0]
self.scanRange_2 = float(
h5py_read_dataset(f[entry_name]["FLUO"]["scanRange_2"])
)
self.scanDim_2 = int(
h5py_read_dataset(f[entry_name]["FLUO"]["scanDim_2"])
)
def detect_rot_angles(self):
nb_projs = self.scanDim_2
angular_coverage = self.scanRange_2
return np.linspace(0, angular_coverage, nb_projs, endpoint=True)
def detect_folders(self):
fit_folder = os.path.join(self.scan, "fluofit", self.dataset_basename)
if not os.path.isdir(fit_folder):
raise FileNotFoundError(
f"No folder {fit_folder} was found in the fluofit folder. The searched for pattern is <scan_dir>/fluofit/<dataset_basename>'."
)
elif not os.path.isdir(os.path.join(self.scan, self.dataset_basename)):
raise FileNotFoundError(
"Found fitted data folders but not the corresponding raw data folder."
)
else:
return [
self.dataset_basename,
]
def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
self._check_ready_to_load_data(det)
line = self.el_lines[element][line_ind]
data_det = []
description = f"Loading images of {element}-{line} ({det}): "
if has_tqdm:
slice_iterator = tqdm(
range(len(self.detected_folders)),
disable=self.verbose,
desc=description,
)
else:
slice_iterator = range(len(self.detected_folders))
for ii_i in slice_iterator:
proj_dir = os.path.join(
self.scan,
"fluofit",
self.dataset_basename, # WARNING: dataset_basename is ONE SINGLE sinogram.
)
img_path = os.path.join(
proj_dir, f"IMG_{det}_{element}-{line}_area_density_ngmm2.tif"
)
if self.verbose:
_logger.info(f"Loading {ii_i+1}/{len(self.angles)}: {img_path}")
img = iio.imread(img_path)
data_det.append(numpy.nan_to_num(numpy.array(img, dtype=self.dtype)))
data = numpy.array(data_det)
return numpy.ascontiguousarray(data)
# coding: utf-8
import logging
import pytest
import numpy
from tomoscan.esrf.scan.fluoscan import FluoTomoScan2D
from tomoscan.tests.datasets import GitlabDataset
logging.disable(logging.INFO)
@pytest.fixture(scope="class")
def fluodata2D(request):
cls = request.cls
cls.scan_dir = GitlabDataset.get_dataset("fluo_datasets2D")
cls.dataset_basename = "CONT2_p2_600nm_FT02_slice_0"
cls.scan = FluoTomoScan2D(
scan=cls.scan_dir,
dataset_basename=cls.dataset_basename,
detectors=(),
)
@pytest.mark.usefixtures("fluodata2D")
class TestFluo2D:
def test_all_detectors(self):
assert (
len(self.scan.el_lines) == 2
), f"Number of elements found should be 2 and is {len(self.scan.el_lines)}."
assert set(self.scan.detectors) == set(
["fluo1", "corrweighted"]
), f"There should be 2 'detectors' (fluo1 and corrweighted), {len(self.detectors)} were found."
def test_one_detector(self):
scan = FluoTomoScan2D(
scan=self.scan_dir,
dataset_basename=self.dataset_basename,
detectors=("corrweighted",),
)
assert (
len(scan.el_lines) == 2
), f"Number of elements found should be 2 and is {len(scan.el_lines)}."
assert (
len(scan.detectors) == 1
), f"There should be 1 detector (corrweighted), {len(scan.detectors)} were found."
# One ghost detector (no corresponding files)
# test general section setters
with pytest.raises(ValueError):
scan = FluoTomoScan2D(
scan=self.scan_dir,
dataset_basename=self.dataset_basename,
detectors=("toto",),
)
def test_load_data(self):
data = self.scan.load_data("corrweighted", "Ca", 0)
assert data.shape == (1, 251, 1000)
def test_load_energy_and_pixel_size(self):
assert self.scan.energy == 17.1
assert numpy.allclose(
self.scan.pixel_size, 6e-10, atol=1e-4
) # Tolerance:0.1nm (since pixel_size is expected in um).
......@@ -4,26 +4,26 @@ import logging
import pytest
from tomoscan.esrf.scan.fluoscan import FluoTomoScan
from tomoscan.esrf.scan.fluoscan import FluoTomoScan3D
from tomoscan.tests.datasets import GitlabDataset
logging.disable(logging.INFO)
@pytest.fixture(scope="class")
def fluodata(request):
def fluodata3D(request):
cls = request.cls
cls.scan_dir = GitlabDataset.get_dataset("fluo_datasets")
cls.dataset_basename = "CP1_XRD_insitu_top_ft_100nm_projection"
cls.scan = FluoTomoScan(
cls.dataset_basename = "CP1_XRD_insitu_top_ft_100nm"
cls.scan = FluoTomoScan3D(
scan=cls.scan_dir,
dataset_basename=cls.dataset_basename,
detectors=(),
)
@pytest.mark.usefixtures("fluodata")
class TestFluo:
@pytest.mark.usefixtures("fluodata3D")
class TestFluo3D:
def test_all_detectors(self):
assert (
len(self.scan.el_lines) == 14
......@@ -33,7 +33,7 @@ class TestFluo:
), f"There should be 3 'detectors' (xmap, falcon and weighted), {len(self.detectors)} were found."
def test_one_detector(self):
scan = FluoTomoScan(
scan = FluoTomoScan3D(
scan=self.scan_dir,
dataset_basename=self.dataset_basename,
detectors=("xmap",),
......@@ -48,7 +48,7 @@ class TestFluo:
# One ghost detector (no corresponding files)
# test general section setters
with pytest.raises(ValueError):
scan = FluoTomoScan(
scan = FluoTomoScan3D(
scan=self.scan_dir,
dataset_basename=self.dataset_basename,
detectors=("toto",),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment