Commit fe571414 authored by Matias Guijarro's avatar Matias Guijarro

Merge branch 'nxw_devicename_bug' into 'master'

NexusWriter devicename bug

See merge request !1997
parents 24b7c2c0 c5d8a6d4
Pipeline #21230 passed with stages
in 4 minutes and 35 seconds
......@@ -276,38 +276,26 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
config = bool(devices)
for subscan, subscaninfo in scan_info["acquisition_chain"].items():
subdevices = ret[subscan] = {}
# These are the "positioners"
masterinfo = subscaninfo["master"]
units = masterinfo.get("scalars_units", {})
aliasmap = masterinfo.get("display_names", {})
master_index = 0
lst = masterinfo.get("scalars", [])
for fullname in lst:
subdevices[fullname] = devices.get(fullname, {})
data_info = {"units": units.get(fullname, None)}
device = update_device(subdevices, fullname, data_info=data_info)
if is_positioner_group(fullname, lst):
device["device_type"] = "positionergroup"
else:
device["device_type"] = "positioner"
device["master_index"] = master_index
master_index += 1
if _allow_alias(device, config):
_add_alias(device, fullname, aliasmap)
# These are the 0D, 1D and 2D "detectors"
aliasmap = subscaninfo.get("display_names", {})
for key in "scalars", "spectra", "images":
units = subscaninfo.get(key + "_units", {})
lst = subscaninfo.get(key, [])
for fullname in lst:
subdevices[fullname] = devices.get(fullname, {})
data_info = {"units": units.get(fullname, None)}
device = update_device(subdevices, fullname, data_info=data_info)
if key == "scalars":
if is_positioner_group(fullname, lst):
device["device_type"] = "positionergroup"
if _allow_alias(device, config):
_add_alias(device, fullname, aliasmap)
_extract_device_info(
devices, subdevices, masterinfo, ["scalars"], config=config, master=True
)
_extract_device_info(
devices,
subdevices,
masterinfo,
["spectra", "images"],
config=config,
master=True,
)
_extract_device_info(
devices,
subdevices,
subscaninfo,
["scalars", "spectra", "images"],
config=config,
master=False,
)
parse_devices(
subdevices,
short_names=short_names,
......@@ -316,6 +304,29 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
return ret
def _extract_device_info(devices, subdevices, info, keys, config=True, master=False):
aliasmap = info.get("display_names", {})
for key in keys:
units = info.get(key + "_units", {})
lst = info.get(key, [])
for i, fullname in enumerate(lst):
subdevices[fullname] = devices.get(fullname, {})
data_info = {"units": units.get(fullname, None)}
device = update_device(subdevices, fullname, data_info=data_info)
if key == "scalars":
if master:
if is_positioner_group(fullname, lst):
device["device_type"] = "positionergroup"
else:
device["device_type"] = "positioner"
device["master_index"] = i
else:
if is_positioner_group(fullname, lst):
device["device_type"] = "positionergroup"
if _allow_alias(device, config):
_add_alias(device, fullname, aliasmap)
def _allow_alias(device, config):
"""
Allowing channel aliases without configuration creates a mess
......
......@@ -228,18 +228,10 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
"""
Executed at the end of the event loop
"""
try:
with self._capture_finalize_exceptions():
self._finalize_hdf5()
except BaseException as e:
self._set_state(self.STATES.FAULT, e)
self.logger.error(
"Not properly finalized due to exception:\n{}".format(
traceback.format_exc()
)
)
finally:
self.log_progress("Finished writing to {}".format(repr(self.filename)))
super()._event_loop_finalize(**kwargs)
self.log_progress("Finished writing to {}".format(repr(self.filename)))
super()._event_loop_finalize(**kwargs)
def _register_event_loop_tasks(self, nxroot=None, **kwargs):
"""
......@@ -263,18 +255,22 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
self.logger.info("Fetch last data")
for node in self._nodes:
self._fetch_data(node, last=True)
with self._capture_finalize_exceptions():
self._fetch_data(node, last=True)
# Skip because fix length scans can have variable data points
# self.logger.info(
# "Ensure all dataset have the same number of points"
# )
#
# for subscan in self._enabled_subscans:
# self._ensure_same_length(subscan)
# with self._capture_finalize_exceptions():
# self._ensure_same_length(subscan)
self.logger.info("Link external data (VDS or raw)")
for node in self._nodes:
self._ensure_dataset_existance(node)
with self._capture_finalize_exceptions():
self._ensure_dataset_existance(node)
self.logger.info("Save detector metadata")
skip = set()
......@@ -285,6 +281,16 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
self._finalize_subscan(subscan)
self._mark_done(subscan)
@contextmanager
def _capture_finalize_exceptions(self):
try:
yield
except BaseException as e:
self._set_state(self.STATES.FAULT, e)
self.logger.error(
"Captured finalization exception:\n{}".format(traceback.format_exc())
)
def get_subscan_info(self, subscan, key, default=None, cache=False):
"""
Get from the subscan's info dictionary.
......@@ -963,10 +969,14 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
:param Subscan subscan:
"""
self._save_positioners(subscan)
self._create_plots(subscan)
self._fetch_subscan_metadata(subscan)
self._fetch_subscan_notes(subscan)
with self._capture_finalize_exceptions():
self._save_positioners(subscan)
with self._capture_finalize_exceptions():
self._create_plots(subscan)
with self._capture_finalize_exceptions():
self._fetch_subscan_metadata(subscan)
with self._capture_finalize_exceptions():
self._fetch_subscan_notes(subscan)
@property
def current_bytes(self):
......
......@@ -156,7 +156,8 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
:param Subscan subscan:
"""
super()._finalize_subscan(subscan)
self._save_applications(subscan)
with self._capture_finalize_exceptions():
self._save_applications(subscan)
def mca_iter(self, subscan):
"""
......
This diff is collapsed.
......@@ -36,14 +36,14 @@ def test_nxw_amesh_base_nopolicy(nexus_writer_base_nopolicy):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_amesh(session=None, tmpdir=None, writer=None, **kwargs):
masters = "robx", "roby"
positioners = [["robx"], ["roby"]]
scan_shape = 4, 5
scan = scans.amesh(
session.env_dict[masters[0]],
session.env_dict[positioners[0][0]],
0,
1,
scan_shape[0] - 1,
session.env_dict[masters[1]],
session.env_dict[positioners[1][0]],
0,
1,
scan_shape[1] - 1,
......@@ -53,5 +53,5 @@ def _test_nxw_amesh(session=None, tmpdir=None, writer=None, **kwargs):
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(
scan, masters=masters, scan_shape=scan_shape, **kwargs
scan, positioners=positioners, scan_shape=scan_shape, **kwargs
)
......@@ -36,13 +36,13 @@ def test_nxw_ascan_base_nopolicy(nexus_writer_base_nopolicy):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_ascan(session=None, tmpdir=None, writer=None, **kwargs):
masters = ("robx",)
positioners = [["robx"]]
scan_shape = (10,)
scan = scans.ascan(
session.env_dict[masters[0]], 0, 1, scan_shape[0] - 1, .1, run=False
session.env_dict[positioners[0][0]], 0, 1, scan_shape[0] - 1, .1, run=False
)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(
scan, masters=masters, scan_shape=scan_shape, **kwargs
scan, positioners=positioners, scan_shape=scan_shape, **kwargs
)
......@@ -39,4 +39,6 @@ def _test_nxw_ct(session=None, tmpdir=None, writer=None, **kwargs):
scan = scans.ct(.1, run=False, save=True)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(scan, **kwargs)
nxw_test_data.assert_scan_data(
scan, positioners=[["elapsed_time", "epoch"]], **kwargs
)
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import nxw_test_utils
import nxw_test_data
def test_nxw_limact(nexus_writer_config):
_test_nxw_limact(**nexus_writer_config)
def test_nxw_limact_alt(nexus_writer_config_alt):
_test_nxw_limact(**nexus_writer_config_alt)
def test_nxw_limact_nopolicy(nexus_writer_config_nopolicy):
_test_nxw_limact(**nexus_writer_config_nopolicy)
def test_nxw_limact_base(nexus_writer_base):
_test_nxw_limact(**nexus_writer_base)
def test_nxw_limact_base_alt(nexus_writer_base_alt):
_test_nxw_limact(**nexus_writer_base_alt)
def test_nxw_limact_base_nopolicy(nexus_writer_base_nopolicy):
_test_nxw_limact(**nexus_writer_base_nopolicy)
def _test_nxw_limact(session=None, tmpdir=None, writer=None, **kwargs):
lima = session.env_dict["lima_simulator"]
scan = session.env_dict["limact"](lima, 0.1)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
if kwargs["config"]:
detectors = ["^lima_simulator$"]
else:
detectors = ["^lima_simulator_image$"]
nxw_test_data.assert_scan_data(
scan,
scan_shape=(1,),
positioners=[[]],
detectors=detectors,
hastimer=False,
**kwargs
)
......@@ -40,4 +40,6 @@ def _test_nxw_loopscan(session=None, tmpdir=None, writer=None, **kwargs):
scan = scans.loopscan(scan_shape[0], .1, run=False)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(scan, scan_shape=scan_shape, **kwargs)
nxw_test_data.assert_scan_data(
scan, scan_shape=scan_shape, positioners=[["elapsed_time", "epoch"]], **kwargs
)
......@@ -81,8 +81,8 @@ def _test_aloopscan(session=None, tmpdir=None, writer=None, **kwargs):
nxw_test_data.assert_scan_data(
scan,
subscan=1,
masters=(mot,),
scan_shape=(npoints1,),
positioners=[[mot]],
detectors=detectors1,
master_name="subscan1tmr",
**kwargs
......@@ -91,6 +91,7 @@ def _test_aloopscan(session=None, tmpdir=None, writer=None, **kwargs):
scan,
subscan=2,
scan_shape=(npoints2,),
positioners=[["elapsed_time", "epoch"]],
detectors=detectors2,
master_name="subscan2tmr",
**kwargs
......@@ -136,6 +137,7 @@ def _test_limatimescan(session=None, tmpdir=None, writer=None, **kwargs):
scan,
subscan=1,
scan_shape=(npoints1,),
positioners=[["elapsed_time", "epoch"]],
detectors=detectors1,
master_name="subscan1tmr",
**kwargs
......@@ -144,6 +146,7 @@ def _test_limatimescan(session=None, tmpdir=None, writer=None, **kwargs):
scan,
subscan=2,
scan_shape=(npoints2,),
positioners=[["elapsed_time", "epoch"]],
detectors=detectors2,
master_name="subscan2tmr",
**kwargs
......
......@@ -22,4 +22,6 @@ def _test_nxw_notes(session=None, tmpdir=None, writer=None, **kwargs):
scan.add_comment(note)
nxw_test_utils.run_scan(scan)
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(scan, notes=notes, **kwargs)
nxw_test_data.assert_scan_data(
scan, notes=notes, positioners=[["elapsed_time", "epoch"]], **kwargs
)
......@@ -17,7 +17,6 @@ def test_nxw_parallel(nexus_writer_config):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_parallel(session=None, tmpdir=None, writer=None, **kwargs):
session.scan_saving.technique = "none"
detectors = (
"diode2alias",
"diode3",
......@@ -40,5 +39,9 @@ def _test_nxw_parallel(session=None, tmpdir=None, writer=None, **kwargs):
nxw_test_utils.wait_scan_data_finished(lst, writer=writer)
for npoints, (scan, detector) in enumerate(zip(lst, detectors), 10):
nxw_test_data.assert_scan_data(
scan, scan_shape=(npoints,), detectors=[detector], **kwargs
scan,
scan_shape=(npoints,),
positioners=[["elapsed_time", "epoch"]],
detectors=[detector],
**kwargs
)
......@@ -58,7 +58,6 @@ def _test_nxw_readers(
reset_dispatcher = False
try:
detector = "diode3"
session.scan_saving.technique = "none"
if mode == "a" and not enable_file_locking:
# Readers will not crash the scan (not sure why) but corrupt the file
scan_shape = (100,)
......@@ -96,7 +95,11 @@ def _test_nxw_readers(
nxw_test_utils.assert_scan_data_exists([scan])
nxw_test_utils.assert_scan_data_not_corrupt([scan])
nxw_test_data.assert_scan_data(
scan, scan_shape=scan_shape, detectors=[detector], **kwargs
scan,
scan_shape=scan_shape,
positioners=[["elapsed_time", "epoch"]],
detectors=[detector],
**kwargs
)
finally:
if reset_dispatcher:
......
......@@ -69,10 +69,18 @@ def _test_nxw_scangroup(session=None, tmpdir=None, writer=None, **kwargs):
[scan1, scan2, scan_seq.sequence.scan, scan_grp.scan], writer=writer
)
nxw_test_data.assert_scan_data(
scan1, scan_shape=(npoints,), detectors=["diode3"], **kwargs
scan1,
scan_shape=(npoints,),
positioners=[["elapsed_time", "epoch"]],
detectors=["diode3"],
**kwargs
)
nxw_test_data.assert_scan_data(
scan2, scan_shape=(npoints,), detectors=["diode4"], masters=("robx",), **kwargs
scan2,
scan_shape=(npoints,),
positioners=[["robx"]],
detectors=["diode4"],
**kwargs
)
nxw_test_data.assert_scangroup_data(scan_seq.sequence, **kwargs)
nxw_test_data.assert_scangroup_data(scan_grp, **kwargs)
......@@ -93,7 +93,9 @@ def _test_nxw_timescan(session=None, tmpdir=None, writer=None, **kwargs):
# Verify data
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(scan, scan_shape=(0,), **kwargs)
nxw_test_data.assert_scan_data(
scan, scan_shape=(0,), positioners=[["elapsed_time", "epoch"]], **kwargs
)
# TODO: no proper cleanup by Bliss
dispatcher.reset()
......@@ -95,6 +95,13 @@ def limatimescan(lima, npoints1, expo1, detectors1, npoints2, expo2, detectors2)
return Scan(chain, "limatimescan", save=True)
def limact(lima, expo):
"""Simple lima image
"""
chain = _lime_ct_chain(name, lima, expo)
return Scan(chain, "limact", save=True)
def _timer_chain(name, npoints, expo, detectors):
chain = AcquisitionChain()
timer_master = SoftwareTimerMaster(expo, npoints=npoints, name=name + "tmr")
......@@ -137,6 +144,21 @@ def _lima_chain(name, lima, npoints, expo, detectors):
return chain
def _lime_ct_chain(name, lima, expo):
chain = AcquisitionChain()
scan_params = {"npoints": 1, "type": "loopscan"}
acq_params = {
"acq_nb_frames": 10,
"acq_expo_time": expo / 10,
"acq_mode": "SINGLE",
"acq_trigger_mode": "INTERNAL_TRIGGER",
"prepare_once": True,
"start_once": False,
}
_add_detectors(chain, None, [lima.image], scan_params, acq_params)
return chain
def _add_detectors(chain, master, detectors, scan_params, acq_params):
builder = ChainBuilder(detectors)
for top_node in builder.get_top_level_nodes():
......@@ -149,4 +171,7 @@ def _add_detectors(chain, master, detectors, scan_params, acq_params):
scan_params, acq_params
)
cnode.set_parameters(acq_params=node_acq_params, ctrl_params=None)
chain.add(master, top_node)
if master is None:
chain.add(top_node)
else:
chain.add(master, slave=top_node)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment