Commit 7489d880 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

add option to the nexus writer: save each scan in a separate file

parent 450a119d
Pipeline #74057 passed with stages
in 106 minutes and 55 seconds
......@@ -1306,6 +1306,7 @@ class Scan:
# this has to be done when the writer is ready
self._scan_info["filename"] = self.writer.filename
self._scan_info["images_path"] = self.writer.images_path(self)
start_timestamp = time.time()
start_time = datetime.datetime.fromtimestamp(start_timestamp)
......
......@@ -176,11 +176,7 @@ class FileWriter:
master_entry = self.new_master(dev, scan_entry)
self._prepare_callbacks(dev, master_entry, self._master_event_callback)
images_path = self._images_root_path_template.format(
scan_name=scan.name,
img_acq_device=dev.name,
scan_number=scan.scan_number,
)
images_path = self.images_path(scan, img_acq_device=dev.name)
self.prepare_saving(dev, images_path)
for slave in dev.slaves:
......@@ -191,6 +187,13 @@ class FileWriter:
slave, master_entry, self._device_event_callback
)
def images_path(self, scan, img_acq_device: str = "{img_acq_device}"):
return self._images_root_path_template.format(
scan_name=scan.name,
img_acq_device=img_acq_device,
scan_number=scan.scan_number,
)
def close(self):
self._remove_callbacks()
......
......@@ -464,6 +464,22 @@ class Writer(FileWriter):
else:
raise TypeError(value)
@property
def separate_scan_files(self) -> Optional[Integral]:
return self.options.get("separate_scan_files")
@separate_scan_files.setter
def separate_scan_files(self, value):
if value is None or isinstance(value, bool):
self.options["separate_scan_files"] = value
else:
raise TypeError(value)
def writer_options(self) -> dict:
# See nexus_writer_service.io.h5_config.guess_dataset_config
return {"chunk_options": self.options.get_all()}
writer_options = self.options.get_all()
separate_scan_files = writer_options.pop("separate_scan_files", None)
return {
"chunk_options": writer_options,
"separate_scan_files": separate_scan_files,
}
......@@ -13,6 +13,7 @@
Basic Nexus writer listening to Redis events of a scan
"""
import os
from typing import Dict, Optional
import numpy
import logging
......@@ -913,6 +914,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param Subscan subscan:
"""
self._save_positioners(subscan)
self._create_master_links(subscan)
def _finalize_subscan(self, subscan):
"""
......@@ -1962,3 +1964,43 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
type="text/plain",
date=date,
)
@property
def master_files(self) -> Dict[str, str]:
if not self.filename:
return dict()
filename = scan_utils.scan_filename(self.node, master=True)
if self.filename == filename:
return dict()
return {"dataset": filename}
def _create_master_links(self, subscan):
"""
Links to the scan's NXentry
:param Subscan subscan:
"""
filenames = self.master_files
if not filenames:
return
with self.nxentry(subscan) as nxentry:
if nxentry is None:
return
self.logger.info("Create scan links in masters ...")
prefix, _ = os.path.splitext(os.path.basename(nxentry.file.filename))
prefix += "_"
nxentry_name = nxentry.name[1:] # remove the leading "/"
for level, filename in filenames.items():
if level == "dataset":
linkname = nxentry_name
else:
linkname = prefix + nxentry_name
with self.nxroot(filename=filename) as nxroot:
if nxroot is None:
continue
if linkname in nxroot:
continue
self.logger.info(
"Create link '%s' in master '%s'", linkname, filename
)
nexus.createLink(nxroot, linkname, nxentry)
......@@ -13,17 +13,16 @@
Configurable Nexus writer listening to Redis events of a scan
"""
import os
import re
import datetime
import logging
from contextlib import contextmanager
from typing import Dict
from . import scan_writer_base
from .nxdata_proxy import NXdataProxy
from ..io import nexus
from ..utils import scan_utils
logger = logging.getLogger(__name__)
......@@ -149,16 +148,6 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
)
return nxplot
def _init_subscan(self, subscan):
"""
Things that can already be saved right after
receiving the new subscan event.
:param Subscan subscan:
"""
super()._init_subscan(subscan)
self._create_master_links(subscan)
def _finalize_subscan(self, subscan):
"""
Save final subscan data.
......@@ -557,34 +546,5 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
self.logger.warning(notfoundmsg)
@property
def master_files(self):
"""
:returns list(str):
"""
return list(scan_utils.scan_master_filenames(self.node).values())
def _create_master_links(self, subscan):
"""
Links to the scan's NXentry
:param Subscan subscan:
"""
filenames = self.master_files
if not filenames:
return
with self.nxentry(subscan) as nxentry:
if nxentry is None:
return
self.logger.info("Create scan links in masters ...")
linkname, ext = os.path.splitext(os.path.basename(nxentry.file.filename))
linkname += "_" + nxentry.name[1:]
for filename in filenames:
with self.nxroot(filename=filename) as nxroot:
if nxroot is None:
continue
if linkname in nxroot:
continue
self.logger.info(
"Create link '%s' in master '%s'", linkname, filename
)
nexus.createLink(nxroot, linkname, nxentry)
def master_files(self) -> Dict[str, str]:
return scan_utils.scan_master_filenames(self.node)
......@@ -16,17 +16,20 @@ ESRF data policy unofficial (but still valid)
import os
def masterfile_templates():
def masterfile_templates(data_policy=None, dataset_template=None):
"""
Templates for HDF5 file names relative to the dataset directory
:returns dict(str):
"""
return {
"dataset_collection": os.path.join(
templates = dict()
if dataset_template:
templates["dataset"] = os.path.extsep.join((dataset_template, "h5"))
if data_policy == "ESRF":
templates["dataset_collection"] = os.path.join(
"..", os.path.extsep.join(("{proposal_name}_{collection_name}", "h5"))
),
"proposal": os.path.join(
)
templates["proposal"] = os.path.join(
"..", "..", os.path.extsep.join(("{proposal_name}_{beamline}", "h5"))
),
}
)
return templates
......@@ -88,14 +88,35 @@ def scan_name(scan, subscan=1):
return f"{scan_number}{'.%d_' % subscan-1}{scan_name}"
def scan_filename(scan):
def scan_filename(scan, master=False):
"""
Name of the file that contains the scan data
:param Scan or ScanNode scan:
:returns str or None:
"""
return scan_info_get(scan, "filename")
filename = scan_info_get(scan, "filename")
if master or not filename:
return filename
writer_options = scan_info_get(scan, "writer_options")
if not writer_options:
return filename
if not writer_options.get("separate_scan_files"):
return filename
images_path = scan_info_get(scan, "images_path")
if not images_path:
return filename
root_dir, filename = os.path.split(filename)
_, ext = os.path.splitext(filename)
filename = "bliss_master" + ext
subdir = os.path.relpath(images_path, root_dir)
parts = list()
for s in subdir.split(os.path.sep):
if "{" in s:
break
if s:
parts.append(s)
return os.path.join(root_dir, *parts, filename)
@session_utils.with_scan_saving
......@@ -120,9 +141,9 @@ def scan_master_filenames(scan, config=True):
:returns dict(str):
"""
if not config:
return {}
info = scan_info_get(scan, "nexuswriter", {})
return info.get("masterfiles", {})
return dict()
info = scan_info_get(scan, "nexuswriter", dict())
return info.get("masterfiles", dict())
@session_utils.with_scan_saving
......@@ -134,11 +155,17 @@ def session_master_filenames(scan_saving=None, config=True):
:param bool config: configurable writer is used
:returns dict(str):
"""
if scan_saving.data_policy != "ESRF" or not config or scan_saving.writer != "nexus":
return {}
eval_dict = {}
if not config or scan_saving.writer != "nexus":
return dict()
eval_dict = dict()
root_path = scan_saving.get_cached_property("root_path", eval_dict=eval_dict)
relative_templates = data_policy.masterfile_templates()
if scan_saving.writer_object.separate_scan_files:
dataset_template = scan_saving.data_filename
else:
dataset_template = None
relative_templates = data_policy.masterfile_templates(
dataset_template=dataset_template, data_policy=scan_saving.data_policy
)
return {
name: scan_saving.eval_template(
os.path.abspath(os.path.join(root_path, s)), eval_dict=eval_dict
......@@ -155,13 +182,16 @@ def scan_filenames(scan, config=True):
:pram bool config: writer parses the extra "nexuswriter" info
:returns dict(str):
"""
filenames = {}
filename = scan_info_get(scan, "filename", None)
if filename:
filenames["dataset"] = filename
filenames = dict()
if config:
info = scan_info_get(scan, "nexuswriter", {})
filenames.update(info.get("masterfiles", {}))
info = scan_info_get(scan, "nexuswriter", dict())
filenames.update(info.get("masterfiles", dict()))
filename = scan_filename(scan)
if filename:
if "dataset" in filenames:
filenames["scan"] = filename
else:
filenames["dataset"] = filename
return filenames
......@@ -174,14 +204,14 @@ def session_filenames(scan_saving=None, config=True):
:pram bool config: writer parses the extra "nexuswriter" info
:returns list(str):
"""
filenames = {}
filenames = dict()
filename = session_filename(scan_saving=scan_saving)
if filename:
filenames["dataset"] = filename
if config:
filenames.update(
session_master_filenames(scan_saving=scan_saving, config=config)
)
if filename:
filenames["dataset"] = filename
return filenames
......
......@@ -193,8 +193,8 @@ def prepare_scan_saving(session=None, tmpdir=None, policy=True, **kwargs):
scan_saving.writer = "nexus"
scan_saving.base_path = tmpdir
scan_saving.data_filename = "{a}_{b}"
scan_saving.add("a", "a")
scan_saving.add("b", "b")
scan_saving.add("a", "default")
scan_saving.add("b", "filename")
technique = nxw_test_config.technique["withoutpolicy"]
measurementgroup.set_active_name(technique + "MG")
......
......@@ -250,8 +250,11 @@ def validate_master_links(scan, subscan=1, config=True):
else:
assert False, uri
else:
for filename in scan_utils.scan_master_filenames(scan, config=True).values():
assert not os.path.exists(filename), filename
for name, filename in scan_utils.scan_master_filenames(
scan, config=True
).items():
exists = name == "dataset"
assert os.path.exists(filename) == exists, filename
def validate_scangroup_links(sequence):
......
......@@ -5,47 +5,64 @@
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
from bliss.common import scans
from tests.nexus_writer.helpers import nxw_test_utils
from tests.nexus_writer.helpers import nxw_test_data
def test_nxw_loopscan(nexus_writer_config):
_test_nxw_loopscan(**nexus_writer_config)
def test_nxw_loopscan_noimages(nexus_writer_config):
_test_nxw_loopscan(**nexus_writer_config, save_images=False)
@pytest.mark.parametrize("separate_scan_files", (True, False))
@pytest.mark.parametrize("save_images", (True, False))
def test_nxw_loopscan(separate_scan_files, save_images, nexus_writer_config):
_test_nxw_loopscan(
**nexus_writer_config,
save_images=save_images,
separate_scan_files=separate_scan_files
)
def test_nxw_loopscan_alt(nexus_writer_config_alt):
_test_nxw_loopscan(**nexus_writer_config_alt)
def test_nxw_loopscan_nopolicy(nexus_writer_config_nopolicy):
_test_nxw_loopscan(**nexus_writer_config_nopolicy)
def test_nxw_loopscan_base(nexus_writer_base):
_test_nxw_loopscan(**nexus_writer_base)
@pytest.mark.parametrize("separate_scan_files", (True, False))
def test_nxw_loopscan_nopolicy(separate_scan_files, nexus_writer_config_nopolicy):
_test_nxw_loopscan(
**nexus_writer_config_nopolicy, separate_scan_files=separate_scan_files
)
def test_nxw_loopscan_base_noimages(nexus_writer_base):
_test_nxw_loopscan(**nexus_writer_base, save_images=False)
@pytest.mark.parametrize("separate_scan_files", (True, False))
@pytest.mark.parametrize("save_images", (True, False))
def test_nxw_loopscan_base(separate_scan_files, save_images, nexus_writer_base):
_test_nxw_loopscan(
**nexus_writer_base,
separate_scan_files=separate_scan_files,
save_images=save_images
)
def test_nxw_loopscan_base_alt(nexus_writer_base_alt):
_test_nxw_loopscan(**nexus_writer_base_alt)
def test_nxw_loopscan_base_nopolicy(nexus_writer_base_nopolicy):
_test_nxw_loopscan(**nexus_writer_base_nopolicy)
@pytest.mark.parametrize("separate_scan_files", (True, False))
def test_nxw_loopscan_base_nopolicy(separate_scan_files, nexus_writer_base_nopolicy):
_test_nxw_loopscan(
**nexus_writer_base_nopolicy, separate_scan_files=separate_scan_files
)
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_loopscan(
session=None, tmpdir=None, writer=None, save_images=True, **kwargs
session=None,
tmpdir=None,
writer=None,
save_images=True,
separate_scan_files=False,
**kwargs
):
session.scan_saving.writer_object.separate_scan_files = separate_scan_files
scan_shape = (10,)
scan = scans.loopscan(scan_shape[0], 0.1, run=False, save_images=save_images)
nxw_test_utils.run_scan(scan)
......
......@@ -13,25 +13,29 @@ from bliss.common import scans
from tests.nexus_writer.helpers import nxw_test_utils
@pytest.mark.parametrize("data_writer", ("nexus", "hdf5", "null"))
WRITER_OPTIONS = ("nexus", "nexus_separate_scan_files", "hdf5", "null")
WRITER_OPTIONS = ("nexus",)
@pytest.mark.parametrize("data_writer", WRITER_OPTIONS)
@pytest.mark.parametrize("save", (True, False))
def test_scan_utils(data_writer, save, nexus_writer_config):
_test_scan_utils(data_writer=data_writer, save=save, **nexus_writer_config)
@pytest.mark.parametrize("data_writer", ("nexus", "hdf5", "null"))
@pytest.mark.parametrize("data_writer", WRITER_OPTIONS)
@pytest.mark.parametrize("save", (True, False))
def test_scan_utils_nopolicy(data_writer, save, nexus_writer_config_nopolicy):
_test_scan_utils(data_writer=data_writer, save=save, **nexus_writer_config_nopolicy)
@pytest.mark.parametrize("data_writer", ("nexus", "hdf5", "null"))
@pytest.mark.parametrize("data_writer", WRITER_OPTIONS)
@pytest.mark.parametrize("save", (True, False))
def test_scan_utils_base(data_writer, save, nexus_writer_config):
_test_scan_utils(data_writer=data_writer, save=save, **nexus_writer_config)
@pytest.mark.parametrize("data_writer", ("nexus", "hdf5", "null"))
@pytest.mark.parametrize("data_writer", WRITER_OPTIONS)
@pytest.mark.parametrize("save", (True, False))
def test_scan_utils_base_nopolicy(data_writer, save, nexus_writer_config_nopolicy):
_test_scan_utils(data_writer=data_writer, save=save, **nexus_writer_config_nopolicy)
......@@ -46,15 +50,34 @@ def _test_scan_utils(
data_writer=None,
save=None,
writer=None,
**kwargs,
**_,
):
separate_scan_files = data_writer == "nexus_separate_scan_files"
if separate_scan_files:
data_writer = "nexus"
session.scan_saving.writer = data_writer
if separate_scan_files:
writer_object = session.scan_saving.writer_object
writer_object.separate_scan_files = separate_scan_files
scan = scans.sct(0.1, session.env_dict["diode3"], save=save)
# Expected file names based in the policy alone (ignore save/writer settings)
master_filenames = {}
if policy:
proposal_session_name = session.scan_saving.proposal_session_name
scan_filename = tmpdir.join(
session.name,
"fs1",
"id00",
"tmp",
"testproposal",
"id00",
"sample",
"sample_0001",
"scan0001",
"bliss_master.h5",
)
dataset_filename = tmpdir.join(
session.name,
"fs1",
......@@ -94,68 +117,80 @@ def _test_scan_utils(
}
filenames = {"dataset": dataset_filename}
else:
dataset_filename = tmpdir.join(session.name, "a_b.h5")
scan_filename = tmpdir.join(session.name, "scan0001", "bliss_master.h5")
dataset_filename = tmpdir.join(session.name, "default_filename.h5")
master_filenames = dict()
filenames = {"dataset": dataset_filename}
if separate_scan_files:
master_filenames["dataset"] = dataset_filename
filenames["scan"] = scan_filename
else:
scan_filename = dataset_filename
filenames.update(master_filenames)
# Check file existence based on policy/save/writer settings
saves_files = save and data_writer != "null"
if saves_files:
nxw_test_utils.wait_scan_data_exists([scan], writer=writer)
saves_masters = saves_files and data_writer == "nexus" and config
assert dataset_filename.check(file=1) == saves_files
saves_extra_masters = saves_files and data_writer == "nexus" and config
assert scan_filename.check(file=1) == saves_files, scan_filename
assert dataset_filename.check(file=1) == saves_files, dataset_filename
for name, f in master_filenames.items():
assert f.check(file=1) == saves_masters
assert f.check(file=1) == saves_extra_masters, f
for name, f in filenames.items():
assert f.check(file=1) == (saves_files and (name == "dataset" or saves_masters))
expected = saves_files and (name in ("dataset", "scan") or saves_extra_masters)
assert f.check(file=1) == expected, f
# Remove unexpected files based on writer settings
saves_files = data_writer != "null"
saves_masters = saves_files and data_writer == "nexus" and config
saves_extra_masters = saves_files and data_writer == "nexus" and config
if not saves_files:
scan_filename = ""
dataset_filename = ""
master_filenames = {}
filenames = {}
elif not saves_masters:
master_filenames = {}
filenames.pop("dataset_collection", None)
filenames.pop("proposal", None)
master_filenames = dict()
filenames = dict()
elif not saves_extra_masters:
for name in ("dataset_collection", "proposal"):
for lst in (filenames, master_filenames):
lst.pop(name, None)
# Check file names from session (save settings are irrelevant)
session_filenames = dict(filenames)
session_filenames.pop("scan", None)
assert scan_utils.session_filename() == dataset_filename
assert scan_utils.session_master_filenames(config=config) == master_filenames
assert scan_utils.session_filenames(config=config) == filenames
assert scan_utils.session_filenames(config=config) == session_filenames
# Check scan uri
if data_writer == "nexus":
dataset_uri = str(dataset_filename) + "::/1.1"
scan_uri = str(scan_filename) + "::/1.1"
elif data_writer == "hdf5":
dataset_uri = str(dataset_filename) + "::/1_ct"
scan_uri = str(scan_filename) + "::/1_ct"
else:
dataset_uri = str(dataset_filename)
if dataset_uri:
assert nexus.exists(dataset_uri) == (save and data_writer != "null")
scan_uri = str(scan_filename)
if scan_uri:
assert nexus.exists(scan_uri) == (save and data_writer != "null")
else:
assert data_writer == "null"