Commit e290ee50 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

[writer] change handling of timer channels (elapsed_time and epoch) and other...

[writer] change handling of timer channels (elapsed_time and epoch) and other scans with multi-channel masters
parent 3248ebd7
......@@ -39,10 +39,6 @@ mcatypemap = {
mcaunitmap = {"icr": "hertz", "ocr": "hertz", "livetime": "s", "realtime": "s"}
timernamemap = {"elapsed_time": "value", "epoch": "epoch"}
timertypemap = {"elapsed_time": "principal", "epoch": "epoch"}
limanamemap = {"image": "data", "sum": "data"}
limatypemap = {"image": "principal", "sum": "principal"}
......@@ -93,11 +89,14 @@ def fill_device(fullname, device, device_info=None, data_info=None):
device_type: type for the writer (not saved), e.g. positioner, mca, lima
device_name: HDF5 group name (measurement or positioners when missing)
device_info: HDF5 group datasets
data_type: "principal" (data of NXdetector or value of NXpositioner) or other
data_type: maximal one device dataset has type "principal"
a principal's data_name is either "data" (detector) or "value" (positioner)
data_name: HDF5 dataset name
data_info: HDF5 dataset attributes
unique_name: Unique name for HDF5 links
master_index: >=0 axis order used for plotting
single: only data of this device
a single's data_name is either "data" (detector) or "value" (positioner)
dependencies: fullnames
:param str fulname:
......@@ -117,6 +116,7 @@ def fill_device(fullname, device, device_info=None, data_info=None):
device["data_info"] = device.get("data_info", data_info)
device["unique_name"] = device.get("unique_name", fullname)
device["master_index"] = -1
device["single"] = None
device["dependencies"] = {}
device["metadata_keys"] = {}
......@@ -209,62 +209,42 @@ def parse_devices(devices, short_names=True, multivalue_positioners=False):
device["data_type"] = countertypemap.get(datatype, datatype)
device["data_name"] = counternamemap.get(datatype, datatype)
elif device["device_type"] == "positionergroup":
# TODO: currently only timers, no other masters exist like this (yet!!!)
# 'timer1:xxxxxx' -> 'xxxxxx'
# device_name = 'timer1'
# data_type = timertypemap('xxxxxx')
# data_name = timernamemap('xxxxxx')
parts = fullname.split(":")
timertype = parts[-1]
device["device_type"] = "positioner"
if device["data_type"] != "principal":
device["data_type"] = parts[-1]
if multivalue_positioners:
# All of them are masters but only one of them
# is a principle value
device["device_name"] = ":".join(parts[:-1])
device["data_type"] = timertypemap.get(timertype, device["data_type"])
device["data_name"] = timernamemap.get(timertype, device["data_name"])
# What to do here?
# if device['data_type'] != 'principal':
# device['master_index'] = -1
if device["data_type"] == "principal":
device["data_name"] = "value"
else:
device["data_name"] = device["data_type"]
else:
# All of them are principal values but only one of them
# is a master
device["data_type"] = timertypemap.get(timertype, device["data_type"])
if device["data_type"] != "principal":
device["master_index"] = -1
device["data_type"] = "principal"
device["data_name"] = "value"
device["single"] = True
elif device["device_type"] == "positioner":
device["data_name"] = "value"
device["data_type"] = "principal"
else:
device["data_name"] = "data"
device["data_type"] = "principal"
if device["data_type"] == "principal":
if device["single"] is None:
device["single"] = device["data_type"] == "principal"
if device["single"]:
device["unique_name"] = device["device_name"]
else:
device["unique_name"] = device["device_name"] + ":" + device["data_name"]
def is_positioner_group(fullname, all_fullnames):
"""
A positioner group is a master which publishes more than one channel.
This is currently only a timer.
"""
parts = fullname.split(":")
if len(parts) == 2:
if parts[1] in ["elapsed_time", "epoch"]:
return all(parts[0] + ":" + name for name in ["elapsed_time", "epoch"])
return False
def device_info(devices, scan_info, short_names=True, multivalue_positioners=False):
def device_info(
devices, scan_info, ndim=1, short_names=True, multivalue_positioners=False
):
"""
Merge device information from `writer_config_publish.device_info`
and from the scan info published by the Bliss core library.
:param dict devices: as provided by `writer_config_publish.device_info`
:param dict scan_info:
:param int ndim: number of scan dimensions
:param bool short_names:
:param bool multivalue_positioners:
:returns dict: subscanname:dict(fullname:dict)
......@@ -277,13 +257,20 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
subdevices = ret[subscan] = {}
masterinfo = subscaninfo["master"]
_extract_device_info(
devices, subdevices, masterinfo, ["scalars"], config=config, master=True
devices,
subdevices,
masterinfo,
["scalars"],
ndim=ndim,
config=config,
master=True,
)
_extract_device_info(
devices,
subdevices,
masterinfo,
["spectra", "images"],
ndim=ndim,
config=config,
master=True,
)
......@@ -292,6 +279,7 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
subdevices,
subscaninfo,
["scalars", "spectra", "images"],
ndim=ndim,
config=config,
master=False,
)
......@@ -303,7 +291,9 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
return ret
def _extract_device_info(devices, subdevices, info, keys, config=True, master=False):
def _extract_device_info(
devices, subdevices, info, keys, ndim=1, config=True, master=False
):
for key in keys:
units = info.get(key + "_units", {})
lst = info.get(key, [])
......@@ -313,11 +303,13 @@ def _extract_device_info(devices, subdevices, info, keys, config=True, master=Fa
device = update_device(subdevices, fullname, data_info=data_info)
if key == "scalars":
if master:
if is_positioner_group(fullname, lst):
if len(lst) > 1 and ndim <= 1:
device["device_type"] = "positionergroup"
device["master_index"] = 0
else:
device["device_type"] = "positioner"
device["master_index"] = i
else:
if is_positioner_group(fullname, lst):
device["device_type"] = "positionergroup"
device["master_index"] = i
if i == 0:
device["data_type"] = "principal"
else:
device["data_type"] = ""
......@@ -304,7 +304,10 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param default:
:param bool cache:
"""
return subscan.get_info(key, default=default, cache=cache)
if subscan is None:
return self.get_info(key, default=default, cache=cache)
else:
return subscan.get_info(key, default=default, cache=cache)
@property
def save(self):
......@@ -352,6 +355,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
self._devices = devices.device_info(
self.config_devices,
self.info,
self.scan_ndim(None),
short_names=self.saveoptions["short_names"],
multivalue_positioners=self.saveoptions["multivalue_positioners"],
)
......@@ -397,7 +401,10 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
device = devices.update_device(subdevices, fullname)
if not device["device_type"]:
device["device_type"] = self._device_type(node)
if self.is_scan_group and device["device_type"] == "positioner":
if self.is_scan_group and device["device_type"] in (
"positioner",
"positionergroup",
):
device["device_type"] = "groupinfo"
if device["data_name"] == "value":
device["data_name"] = "data"
......@@ -1245,7 +1252,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
# Create parent: NXdetector, NXpositioner or measurement
device = self.device(subscan, node)
parentname = dataset_proxy.normalize_nexus_name(device["device_name"])
if device["device_type"] == "positioner":
if device["device_type"] in ("positioner", "positionergroup"):
# Add as separate positioner group
parentcontext = self.nxpositioner
parentcontextargs = subscan, parentname
......@@ -1362,6 +1369,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param Subscan subscan:
:returns int:
"""
if subscan is None:
return self.get_info("npoints", cache=True)
# TODO: currently subscans always give 0 (npoints is not published in Redis)
return self.get_subscan_info(subscan, "npoints", default=0, cache=True)
......@@ -1702,7 +1711,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:returns str, DatasetProxy: fullname and dataset handles
"""
for fullname, dproxy in subscan.datasets.items():
if dproxy.device_type == "positioner":
if dproxy.device_type in ("positioner", "positionergroup"):
if onlyprincipals and dproxy.data_type != "principal":
continue
if onlymasters and dproxy.master_index < 0:
......@@ -1717,18 +1726,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:returns str, DatasetProxy: fullname and dataset handle
"""
for fullname, dproxy in subscan.datasets.items():
if dproxy.device_type != "positioner":
yield fullname, dproxy
def principal_iter(self, subscan):
"""
Yields all principal dataset handles
:param Subscan subscan:
:returns str, DatasetProxy: fullname and dataset handle
"""
for fullname, dproxy in subscan.datasets.items():
if dproxy.data_type != "principal":
if dproxy.device_type not in ("positioner", "positionergroup"):
yield fullname, dproxy
def _save_positioners(self, subscan):
......@@ -1805,7 +1803,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param DatasetProxy dproxy:
"""
self._add_to_measurement_group(subscan, dproxy)
if dproxy.device_type == "positioner":
if dproxy.device_type in ("positioner", "positionergroup"):
self._add_to_positioners_group(subscan, dproxy)
def _add_to_measurement_group(self, subscan, dproxy):
......@@ -1815,7 +1813,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param Subscan subscan:
:param DatasetProxy dproxy:
"""
posprefix = "pstn_"
with self.nxmeasurement(subscan) as measurement:
if measurement is None:
return
......@@ -1825,23 +1822,11 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
if not linkname:
dproxy.logger.warning("cannot be linked too")
return
if dproxy.device_type == "positioner":
linknames = []
# Positioners should always be there under
# a different name when not in positioners
# snapshot
if linkname not in self.motors:
linknames.append(posprefix + linkname)
# Principle positioners which are masters should
# be there under their normal name
if dproxy.master_index >= 0 and dproxy.data_type == "principal":
linknames.append(linkname)
else:
linknames = [linkname]
linknames = [linkname]
for linkname in linknames:
if linkname in measurement:
self.logger.warning(
f"Duplicate name '{linkname}' in the measurement group. Rename this detector or positioner (avoid prefix '{posprefix}')."
f"Duplicate name '{linkname}' in the measurement group. Rename this detector or positioner."
)
else:
nexus.createLink(measurement, linkname, dproxy.path)
......
......@@ -49,7 +49,7 @@ def validate_scan_data(
config=True,
policy=True,
alt=False,
hastimer=True,
softtimer="master",
):
"""
:param bliss.scanning.scan.Scan scan:
......@@ -62,7 +62,7 @@ def validate_scan_data(
:param bool config: configurable writer
:param bool policy: data policy
:param bool alt: alternative writer options
:param bool hastimer:
:param str softtimer: "detector", "master" or neither
"""
# Parse arguments
if config:
......@@ -100,6 +100,8 @@ def validate_scan_data(
detectors=detectors,
notes=notes,
variable_length=variable_length,
master_name=master_name,
softtimer=softtimer,
)
validate_measurement(
nxentry["measurement"],
......@@ -110,7 +112,7 @@ def validate_scan_data(
save_options=save_options,
detectors=detectors,
master_name=master_name,
hastimer=hastimer,
softtimer=softtimer,
)
validate_instrument(
nxentry["instrument"],
......@@ -122,7 +124,7 @@ def validate_scan_data(
save_options=save_options,
detectors=detectors,
master_name=master_name,
hastimer=hastimer,
softtimer=softtimer,
)
if not variable_length:
validate_plots(
......@@ -134,6 +136,7 @@ def validate_scan_data(
scan_shape=scan_shape,
positioners=positioners,
master_name=master_name,
softtimer=softtimer,
save_options=save_options,
)
validate_applications(
......@@ -209,6 +212,8 @@ def validate_nxentry(
detectors=None,
notes=None,
variable_length=None,
master_name=None,
softtimer=None,
):
"""
:param h5py.Group nxentry:
......@@ -216,7 +221,10 @@ def validate_nxentry(
:param bool policy: data policy
:param str technique:
:param list(str) detectors:
:param bool notes:
:param bool variable_length: e.g. timescan
:param str master_name:
:param str softtimer:
"""
assert nxentry.parent.attrs["NX_class"] == "NXroot"
assert nxentry.attrs["NX_class"] == "NXentry"
......@@ -229,7 +237,12 @@ def validate_nxentry(
actual.remove(name)
else:
plots = expected_plots(
technique, config=config, policy=policy, detectors=detectors
technique,
config=config,
policy=policy,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
for name, info in plots.items():
if info["signals"]:
......@@ -249,7 +262,7 @@ def validate_measurement(
save_options=None,
detectors=None,
master_name=None,
hastimer=None,
softtimer=None,
):
"""
:param h5py.Group nxentry:
......@@ -260,19 +273,20 @@ def validate_measurement(
:param dict save_options:
:param list(str) detectors:
:param str master_name:
:param bool hastimer:
:param str softtimer:
"""
assert measurement.attrs["NX_class"] == "NXcollection"
# Detectors
datasets = expected_channels(
config=config, technique=technique, detectors=detectors
config=config,
technique=technique,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
# Positioners
pos_instrument, pos_meas, pos_pgroup = expected_positioners(
master_name=master_name,
positioners=positioners,
hastimer=hastimer,
save_options=save_options,
master_name=master_name, positioners=positioners, save_options=save_options
)
datasets[0] |= set(pos_meas)
# Check all datasets present
......@@ -313,7 +327,7 @@ def validate_instrument(
save_options=None,
detectors=None,
master_name=None,
hastimer=None,
softtimer=None,
):
"""
:param h5py.Group nxentry:
......@@ -325,7 +339,7 @@ def validate_instrument(
:param dict save_options:
:param list(str) detectors:
:param str master_name:
:param bool hastimer:
:param str softtimer:
"""
# Positioner groups
expected_posg = {
......@@ -337,14 +351,15 @@ def validate_instrument(
}
# Detectors
expected_dets = expected_detectors(
config=config, technique=technique, detectors=detectors
config=config,
technique=technique,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
# Positioners
pos_instrument, _, pos_positioners = expected_positioners(
master_name=master_name,
positioners=positioners,
hastimer=hastimer,
save_options=save_options,
master_name=master_name, positioners=positioners, save_options=save_options
)
# Check all subgroups present
if config:
......@@ -389,6 +404,7 @@ def validate_plots(
scan_shape=None,
positioners=None,
master_name=None,
softtimer=None,
save_options=None,
):
"""
......@@ -400,9 +416,17 @@ def validate_plots(
:param tuple scan_shape: fast axis first
:param list positioners: fast axis first
:param str master_name:
:param str softtimer:
:param dict save_options:
"""
plots = expected_plots(technique, config=config, policy=policy, detectors=detectors)
plots = expected_plots(
technique,
config=config,
policy=policy,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
for name, info in plots.items():
if info["signals"]:
validate_nxdata(
......@@ -570,7 +594,14 @@ def validate_notes(nxentry, notes):
assert subgroup["type"][()] == "text/plain"
def expected_plots(technique, config=True, policy=True, detectors=None):
def expected_plots(
technique,
config=True,
policy=True,
detectors=None,
master_name=None,
softtimer=None,
):
"""
All expected plots for this technique (see nexus_definitions.yml)
......@@ -578,11 +609,17 @@ def expected_plots(technique, config=True, policy=True, detectors=None):
:param bool config: configurable writer
:param bool policy: data policy
:param list(str) detectors:
:param str master_name:
:param str softtimer:
:returns dict: grouped by detector dimension and flat/grid
"""
plots = dict()
channels = expected_channels(
config=config, technique=technique, detectors=detectors
config=config,
technique=technique,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
def ismca(name):
......@@ -713,14 +750,14 @@ def expected_applications(technique, config=True, policy=True):
def expected_positioners(
master_name=None, positioners=None, hastimer=None, save_options=None
master_name=None, positioners=None, softtimer=None, save_options=None
):
"""
Expected positioners
:param str master_name:
:param list(str) positioners:
:param bool hastimer:
:param str softtimer:
:param dict save_options:
:returns dict, set, set: content, measurement, positioners
"""
......@@ -734,36 +771,40 @@ def expected_positioners(
measurement.add(master_name)
pgroup.add(master_name)
pgroup |= {master_name + "_" + axis for axis in axes[1:]}
measurement |= {master_name + "_" + axis for axis in axes[1:]}
else:
for axis in axes:
content[axis] = ["value"]
measurement.add(axes[0])
measurement |= set(axes)
pgroup |= set(axes)
elif axes:
content[axes[0]] = ["value"]
measurement.add(axes[0])
pgroup.add(axes[0])
if hastimer:
posprefix = "pstn_"
if softtimer == "master":
if save_options["multivalue_positioners"]:
content[master_name] = ["value", "epoch"]
measurement |= {posprefix + master_name, posprefix + master_name + "_epoch"}
measurement |= {master_name, master_name + "_epoch"}
pgroup |= {master_name, master_name + "_epoch"}
else:
content["elapsed_time"] = ["value"]
content["epoch"] = ["value"]
measurement |= {posprefix + "elapsed_time", posprefix + "epoch"}
measurement |= {"elapsed_time", "epoch"}
pgroup |= {"elapsed_time", "epoch"}
return content, measurement, pgroup
def expected_detectors(config=True, technique=None, detectors=None):
def expected_detectors(
config=True, technique=None, detectors=None, master_name=None, softtimer=None
):
"""
Expected detectors
:param bool config: configurable writer
:param str technique:
:param list(str) detectors:
:param str master_name:
:param str softtimer:
:returns set:
"""
if config:
......@@ -803,11 +844,17 @@ def expected_detectors(config=True, technique=None, detectors=None):
name + "_bpm",
}
expected = detectors_filter(expected, detectors)
if softtimer == "detector":
expected |= {"elapsed_time", "epoch"}
else:
# Each data channel is a detector
expected = set()
channels = expected_channels(
config=config, technique=technique, detectors=detectors
config=config,
technique=technique,
detectors=detectors,
master_name=master_name,
softtimer=softtimer,
)
for names in channels.values():
expected |= names
......@@ -936,13 +983,17 @@ def expected_detector_content(name, config=True):
return datasets
def expected_channels(config=True, technique=None, detectors=None):
def expected_channels(
config=True, technique=None, detectors=None, master_name=None, softtimer=None
):
"""
Expected channels grouped per dimension
:param bool config: configurable writer
:param str technique:
:param list(str) detectors:
:param str master_name:
:param str softtimer:
:returns dict: key are the unique names (used in plots and measurement)
"""
datasets = {0: set(), 1: set(), 2: set()}
......@@ -1085,6 +1136,8 @@ def expected_channels(config=True, technique=None, detectors=None):
}
for k in datasets:
datasets[k] = detectors_filter(datasets[k], detectors, removeprefix=not config)
if softtimer == "detector":
datasets[0] |= {"elapsed_time", "epoch"}