Commit c5d8a6d4 authored by Wout De Nolf's avatar Wout De Nolf Committed by Matias Guijarro

[writer] add test with lima master

parent c9555308
Pipeline #21213 canceled with stages
in 1 minute and 9 seconds
......@@ -41,20 +41,20 @@ def assert_scangroup_data(sequence, **kwargs):
def validate_scan_data(
scan,
subscan=1,
masters=None,
positioners=None,
detectors=None,
notes=None,
master_name="timer",
scan_shape=None,
scan_shape=tuple(),
config=True,
policy=True,
alt=False,
hastimer=True,
):
"""
:param bliss.scanning.scan.Scan scan:
:param int subscan:
:param tuple masters: fast axis first by default `master_name` when 1D scan
or None otherwise
:param list positioners: fast axis first
:param list(str) detectors: expected detectors (derived from technique when missing)
:param list(str) notes:
:param str master_name: chain master name
......@@ -62,6 +62,7 @@ def validate_scan_data(
:param bool config: configurable writer
:param bool policy: data policy
:param bool alt: alternative writer options
:param bool hastimer:
"""
# Parse arguments
if config:
......@@ -70,18 +71,9 @@ def validate_scan_data(
save_options = base_options()
if alt:
save_options = {k: not v for k, v in save_options.items()}
if scan_shape is None:
scan_shape = tuple()
variable_length = not all(scan_shape)
if masters is None:
if not scan_shape:
masters = tuple()
elif len(scan_shape) == 1:
if save_options["multivalue_positioners"]:
masters = (master_name,)
else:
masters = ("elapsed_time",)
assert len(scan_shape) == len(masters)
if not positioners:
positioners = []
if policy:
det_technique = nxw_test_config.technique["withpolicy"]
else:
......@@ -91,9 +83,11 @@ def validate_scan_data(
if detectors:
for name, alias in zip(["diode2", "diode9"], ["diode2alias", "diode9alias"]):
if config:
detectors = [alias if d == name else d for d in detectors]
detectors = [
d if alias in d else d.replace(name, alias) for d in detectors
]
else:
detectors = [name if d == alias else d for d in detectors]
detectors = [d.replace(alias, name) for d in detectors]
if config:
scan_technique = scan.scan_info["nexuswriter"]["technique"]["name"]
else:
......@@ -116,22 +110,24 @@ def validate_scan_data(
nxentry["measurement"],
scan_shape=scan_shape,
config=config,
masters=masters,
positioners=positioners,
technique=det_technique,
save_options=save_options,
detectors=detectors,
master_name=master_name,
hastimer=hastimer,
)
validate_instrument(
nxentry["instrument"],
scan_shape=scan_shape,
config=config,
masters=masters,
positioners=positioners,
policy=policy,
technique=det_technique,
save_options=save_options,
detectors=detectors,
master_name=master_name,
hastimer=hastimer,
)
if not variable_length:
validate_plots(
......@@ -141,7 +137,8 @@ def validate_scan_data(
technique=scan_technique,
detectors=detectors,
scan_shape=scan_shape,
masters=masters,
positioners=positioners,
master_name=master_name,
save_options=save_options,
)
validate_applications(
......@@ -231,6 +228,7 @@ def validate_nxentry(
actual = set(nxentry.keys())
expected = {"instrument", "measurement", "title", "start_time", "end_time"}
if variable_length:
# Multiple plots based on
for name in list(actual):
if nxentry[name].attrs.get("NX_class") == "NXdata":
actual.remove(name)
......@@ -250,22 +248,24 @@ def validate_nxentry(
def validate_measurement(
measurement,
scan_shape=None,
masters=None,
positioners=None,
config=True,
technique=None,
save_options=None,
detectors=None,
master_name=None,
hastimer=None,
):
"""
:param h5py.Group nxentry:
:param tuple scan_shape: fast axis first
:param tuple masters: fast axis first
:param list positioners: fast axis first
:param bool config: configurable writer
:param str technique:
:param dict save_options:
:param list(str) detectors:
:param str master_name:
:param bool hastimer:
"""
assert measurement.attrs["NX_class"] == "NXcollection"
# Detectors
......@@ -273,20 +273,13 @@ def validate_measurement(
config=config, technique=technique, detectors=detectors
)
# Positioners
if masters:
datasets[0] |= set(masters)
else:
if save_options["multivalue_positioners"]:
datasets[0] |= {master_name}
else:
datasets[0] |= {"elapsed_time"}
if save_options["multivalue_positioners"]:
datasets[0] |= {
"pos_{}".format(master_name),
"pos_{}_epoch".format(master_name),
}
else:
datasets[0] |= {"pos_elapsed_time", "pos_epoch"}
pos_instrument, pos_meas, pos_pgroup = expected_positioners(
master_name=master_name,
positioners=positioners,
hastimer=hastimer,
save_options=save_options,
)
datasets[0] |= set(pos_meas)
# Check all datasets present
expected = set()
for names in datasets.values():
......@@ -319,23 +312,25 @@ def validate_instrument(
instrument,
scan_shape=None,
config=True,
masters=None,
positioners=None,
policy=True,
technique=None,
save_options=None,
detectors=None,
master_name=None,
hastimer=None,
):
"""
:param h5py.Group nxentry:
:param tuple scan_shape: fast axis first
:param bool config: configurable writer
:param tuple masters:
:param list positioners:
:param bool policy: data policy
:param str technique:
:param dict save_options:
:param list(str) detectors:
:param str master_name:
:param bool hastimer:
"""
# Positioner groups
expected_posg = {
......@@ -350,11 +345,12 @@ def validate_instrument(
config=config, technique=technique, detectors=detectors
)
# Positioners
expected_pos = set(masters)
if save_options["multivalue_positioners"]:
expected_pos |= {master_name}
else:
expected_pos |= {"elapsed_time", "epoch"}
pos_instrument, _, pos_positioners = expected_positioners(
master_name=master_name,
positioners=positioners,
hastimer=hastimer,
save_options=save_options,
)
# Check all subgroups present
if config:
expected = {"title"}
......@@ -362,25 +358,19 @@ def validate_instrument(
expected = set()
expected |= expected_posg
expected |= expected_dets
expected |= expected_pos
expected |= set(pos_instrument.keys())
assert_set_equal(set(instrument.keys()), expected)
# Validate content of positioner NXcollections
for name in expected_posg:
assert instrument[name].attrs["NX_class"] == "NXcollection", name
expected = {"robx", "roby", "robz"}
if name == "positioners":
expected |= expected_pos
if save_options["multivalue_positioners"]:
expected |= {"{}_epoch".format(master_name)}
expected |= set(pos_positioners)
assert_set_equal(set(instrument[name].keys()), expected)
# Validate content of NXpositioner groups
for name in expected_pos:
for name, content in pos_instrument.items():
assert instrument[name].attrs["NX_class"] == "NXpositioner", name
if save_options["multivalue_positioners"] and name == master_name:
expected = {"value", "epoch"}
else:
expected = {"value"}
assert_set_equal(set(instrument[name].keys()), expected, msg=name)
assert_set_equal(set(instrument[name].keys()), set(content), msg=name)
# Validate content of NXdetector groups
variable_length = not all(scan_shape)
for name in expected_dets:
......@@ -402,7 +392,8 @@ def validate_plots(
technique=None,
detectors=None,
scan_shape=None,
masters=None,
positioners=None,
master_name=None,
save_options=None,
):
"""
......@@ -412,7 +403,8 @@ def validate_plots(
:param str technique:
:param list(str) detectors:
:param tuple scan_shape: fast axis first
:param tuple masters: fast axis first
:param list positioners: fast axis first
:param str master_name:
:param dict save_options:
"""
plots = expected_plots(technique, config=config, policy=policy, detectors=detectors)
......@@ -424,7 +416,8 @@ def validate_plots(
info["type"],
info["signals"],
scan_shape=scan_shape,
masters=masters,
positioners=positioners,
master_name=master_name,
save_options=save_options,
)
else:
......@@ -475,7 +468,8 @@ def validate_appxrf(nxsubentry, save_options=None, detectors=None):
expected |= {"i0", "it"}
mcas = [0, 1]
if save_options["stack_mcas"]:
expected |= {"mca", "elapsed_time", "live_time"}
if mcas:
expected |= {"mca", "elapsed_time", "live_time"}
else:
for i in mcas:
expected |= {
......@@ -500,7 +494,8 @@ def validate_nxdata(
ptype,
expected_signals,
scan_shape=None,
masters=None,
positioners=None,
master_name="timer",
save_options=None,
):
"""
......@@ -509,7 +504,8 @@ def validate_nxdata(
:param str ptype:
:param list expected_signals:
:param tuple scan_shape: fast axis first
:param tuple masters: fast axis first
:param list positioners: fast axis first
:param str master_name:
:param dict save_options:
"""
assert nxdata.attrs["NX_class"] == "NXdata", nxdata.name
......@@ -519,10 +515,15 @@ def validate_nxdata(
escan_shape = (numpy.product(scan_shape, dtype=int),)
else:
escan_shape = scan_shape
axes = []
for lst in positioners:
if len(lst) > 1 and save_options["multivalue_positioners"]:
lst = [master_name]
axes.append(lst)
if ptype == "grid":
# C-order: fast axis last
escan_shape = escan_shape[::-1]
masters = masters[::-1]
axes = axes[::-1]
# Validate signal shape and interpretation
scan_ndim = len(scan_shape)
variable_length = not all(scan_shape)
......@@ -541,14 +542,19 @@ def validate_nxdata(
dset, detector_ndim, save_options, variable_length=variable_length
)
# Validate axes
if not scan_shape:
axes = []
if detector_ndim:
if len(scan_shape) > 1 and ptype == "flat":
# Scatter plot of non-scalar detector
masters = tuple()
axes = []
else:
masters += tuple("datadim{}".format(i) for i in range(detector_ndim))
axes = tuple(nxdata.attrs.get("axes", tuple()))
assert axes == masters, (axes, masters, scan_shape, ptype, nxdata.name)
axes += [["datadim{}".format(i)] for i in range(detector_ndim)]
axes = [lst[0] if lst else None for lst in axes]
if None in axes:
axes = []
nxdata_axes = list(nxdata.attrs.get("axes", []))
assert nxdata_axes == axes, (nxdata_axes, axes, scan_shape, ptype, nxdata.name)
def validate_notes(nxentry, notes):
......@@ -711,6 +717,53 @@ def expected_applications(technique, config=True, policy=True):
return apps
def expected_positioners(
master_name=None, positioners=None, hastimer=None, save_options=None
):
"""
Expected positioners
:param str master_name:
:param list(str) positioners:
:param bool hastimer:
:param dict save_options:
:returns dict, set, set: content, measurement, positioners
"""
content = {} # groups under NXinstrument
measurement = set() # links under measurement
pgroup = set() # links under instrument/positioners
for axes in positioners:
if len(axes) > 1:
if save_options["multivalue_positioners"]:
content[master_name] = ["value"] + axes[1:]
measurement.add(master_name)
pgroup.add(master_name)
pgroup |= {master_name + "_" + axis for axis in axes[1:]}
else:
for axis in axes:
content[axis] = ["value"]
measurement.add(axes[0])
pgroup |= set(axes)
elif axes:
content[axes[0]] = ["value"]
measurement.add(axes[0])
pgroup.add(axes[0])
if hastimer:
if save_options["multivalue_positioners"]:
content[master_name] = ["value", "epoch"]
measurement |= {
"pos_{}".format(master_name),
"pos_{}_epoch".format(master_name),
}
pgroup |= {master_name, master_name + "_epoch"}
else:
content["elapsed_time"] = ["value"]
content["epoch"] = ["value"]
measurement |= {"pos_elapsed_time", "pos_epoch"}
pgroup |= {"elapsed_time", "epoch"}
return content, measurement, pgroup
def expected_detectors(config=True, technique=None, detectors=None):
"""
Expected detectors
......@@ -720,31 +773,23 @@ def expected_detectors(config=True, technique=None, detectors=None):
:param list(str) detectors:
:returns set:
"""
if detectors:
detectors = set(detectors)
if config:
# Data channels are grouped per detector
expected = {
"diode2alias",
"diode3",
"diode4",
"diode5",
"diode6",
"diode7",
"diode8",
"diode9alias",
"sim_ct_gauss",
"sim_ct_gauss_noise",
"thermo_sample",
}
if config:
expected |= {"diode2alias", "diode9alias"}
else:
expected |= {"diode2", "diode9"}
if detectors:
expected &= detectors
if "xrf" in technique or "xas" in technique or detectors:
names = {"simu1", "simu2"}
if detectors:
names &= detectors
for name in names:
expected |= {
name + "_det0",
......@@ -755,8 +800,6 @@ def expected_detectors(config=True, technique=None, detectors=None):
}
if "xrd" in technique or detectors:
names = {"lima_simulator", "lima_simulator2"}
if detectors:
names &= detectors
for name in names:
expected |= {
name,
......@@ -766,6 +809,7 @@ def expected_detectors(config=True, technique=None, detectors=None):
name + "_roi4",
name + "_bpm",
}
expected = detectors_filter(expected, detectors)
else:
# Each data channel is a detector
expected = set()
......@@ -777,6 +821,67 @@ def expected_detectors(config=True, technique=None, detectors=None):
return expected
def detectors_filter(expected, detectors, removeprefix=False):
"""
:param sequence expected:
:param list(str) detectors:
:returns set:
"""
if not detectors:
return expected
result = []
for pattern in detector_pattern(detectors):
result += [d for d in expected if re.match(pattern, d)]
if removeprefix:
result = remove_controller_prefix(
result,
["^lima_simulator_", "^lima_simulator2_"],
["lima_simulator", "lima_simulator2"],
["{}_roi_counters_", "{}_bpm_", "{}_"],
)
result = remove_controller_prefix(
result, ["^simu1_", "^simu2_"], ["simu1", "simu2"], ["{}_"]
)
return set(result)
def remove_controller_prefix(names, patterns, connames, replacefmts):
"""
:param list(str) names:
:param list(str) patterns:
:param list(str) connames:
:returns list(str):
"""
connames = [
conname
for conname, pattern in zip(connames, patterns)
if any(re.match(pattern, name) for name in names)
]
if len(connames) == 1:
for fmt in replacefmts:
s = fmt.format(connames[0])
names = [name.replace(s, "") for name in names]
return names
def detector_pattern(detectors):
patterns = {
"diode7": "^diode7.*$",
"diode9": "^diode9.*$",
"diode9alias": "^diode9alias.*$",
"lima_simulator": "^lima_simulator([^2].*|)$",
"lima_simulator2": "^lima_simulator2.*$",
"simu1": "^simu1.*$",
"simu2": "^simu2.*$",
}
return [
d
if d.startswith("^") and d.startswith("$")
else patterns.get(d, "^{}$".format(d))
for d in detectors
]
def expected_detector_content(name, config=True):
"""
:param bool config: configurable writer
......@@ -847,8 +952,6 @@ def expected_channels(config=True, technique=None, detectors=None):
:param list(str) detectors:
:returns dict: key are the unique names (used in plots and measurement)
"""
if detectors:
detectors = set(detectors)
datasets = {0: set(), 1: set(), 2: set()}
# Normal diodes
names = {
......@@ -866,13 +969,9 @@ def expected_channels(config=True, technique=None, detectors=None):
names |= {"diode2alias", "diode9alias"}
else:
names |= {"diode2", "diode9"}
if detectors:
names &= detectors
datasets[0] |= names
# Statistics diodes
names = {"diode7"}
if detectors:
names &= detectors
for name in names:
datasets[0] |= {
name + "_N",
......@@ -887,15 +986,11 @@ def expected_channels(config=True, technique=None, detectors=None):
names = {"diode9alias"}
else:
names = {"diode9"}
if detectors:
names &= detectors
for name in names:
datasets[1] |= {name + "_samples"}
# MCA's with ROI's
if "xrf" in technique or "xas" in technique or detectors:
names = {"simu1", "simu2"}
if detectors:
names &= detectors
if config:
for conname in names:
for detname in ["det0", "det1", "det2", "det3"]:
......@@ -918,10 +1013,7 @@ def expected_channels(config=True, technique=None, detectors=None):
else:
for conname in names:
for detname in ["det0", "det1", "det2", "det3"]:
if len(names) > 1:
prefix = conname + "_"
else:
prefix = ""
prefix = conname + "_"
datasets[1] |= {prefix + "spectrum_" + detname}
datasets[0] |= {
prefix + "triggers_" + detname,
......@@ -939,8 +1031,6 @@ def expected_channels(config=True, technique=None, detectors=None):
# Lima's with ROI's and BPM
if "xrd" in technique or detectors:
names = {"lima_simulator", "lima_simulator2"}
if detectors:
names &= detectors
if config:
for conname in names:
datasets[2] |= {conname}
......@@ -974,14 +1064,9 @@ def expected_channels(config=True, technique=None, detectors=None):
}
else:
for conname in names:
if len(names) > 1:
prefix = conname + "_"
prefix_roi = conname + "_roi_counters_"
prefix_bpm = conname + "_bpm_"
else:
prefix = ""
prefix_roi = ""
prefix_bpm = ""
prefix = conname + "_"
prefix_roi = conname + "_roi_counters_"
prefix_bpm = conname + "_bpm_"
datasets[2] |= {prefix + "image"}
datasets[0] |= {
prefix_roi + "roi1_min",
......@@ -1011,6 +1096,8 @@ def expected_channels(config=True, technique=None, detectors=None):
prefix_bpm + "intensity",
prefix_bpm + "acq_time",
}
for k in datasets:
datasets[k] = detectors_filter(datasets[k], detectors, removeprefix=not config)
return datasets
......
......@@ -36,14 +36,14 @@ def test_nxw_amesh_base_nopolicy(nexus_writer_base_nopolicy):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_amesh(session=None, tmpdir=None, writer=None, **kwargs):
masters = "robx", "roby"
positioners = [["robx"], ["roby"]]
scan_shape = 4, 5
scan = scans.amesh(
session.env_dict[masters[0]],
session.env_dict[positioners[0][0]],
0,
1,
scan_shape[0] - 1,
session.env_dict[masters[1]],
session.env_dict[positioners[1][0]],
0,
1,
scan_shape[1] - 1,
......@@ -53,5 +53,5 @@ def _test_nxw_amesh(session=None, tmpdir=None, writer=None, **kwargs):
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(
scan, masters=masters, scan_shape=scan_shape, **kwargs
scan, positioners=positioners, scan_shape=scan_shape, **kwargs
)
......@@ -36,13 +36,13 @@ def test_nxw_ascan_base_nopolicy(nexus_writer_base_nopolicy):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_ascan(session=None, tmpdir=None, writer=None, **kwargs):
masters = ("robx",)
positioners = [["robx"]]
scan_shape = (10,)
scan = scans.ascan(
session.env_dict[masters[0]], 0, 1, scan_shape[0] - 1, .1, run=False
session.env_dict[positioners[0][0]], 0, 1, scan_shape[0] - 1, .1, run=False
)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(
scan, masters=masters, scan_shape=scan_shape, **kwargs