GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit 4b80f524 authored by Wout De Nolf's avatar Wout De Nolf

[writer] fix writer tests (alias and API change)

parent 9779ee5a
Pipeline #17756 failed with stages
in 37 minutes and 45 seconds
......@@ -14,6 +14,7 @@ Register metadata generators for a configurable writer
"""
import enum
from bliss import current_session
from bliss.scanning import scan_meta
from .scan_writers import writer_config_publish
......@@ -54,17 +55,16 @@ def register_metadata_generators(force=False, **kwargs):
scan_meta.CATEGORIES = enum.Enum(
scan_meta.CATEGORIES.__name__, list(categories)
)
generators.clear()
scan_meta.USER_SCAN_META = None
generators = scan_meta.get_user_scan_meta()
# Generators are called at the start of the scan:
# bliss.scanning.scan.Scan.__init__
# and at the end of the scan
# run bliss.scanning.scan.Scan.run (cleanup section)
#
# The generator 'instrument.positioners' is an exception.
# It is only called at the beginning of the scan by
# removing it before calling the generators a second time.
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
mod.register_metadata_generators(generators)
generators = scan_meta.create_user_scan_meta()
current_session.user_scan_meta = generators
# Generators are called at the start of the scan:
# bliss.scanning.scan.Scan.__init__
# and at the end of the scan
# run bliss.scanning.scan.Scan.run (cleanup section)
#
# The generator 'instrument.positioners' is an exception.
# It is only called at the beginning of the scan by
# removing it before calling the generators a second time.
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
mod.register_metadata_generators(generators)
......@@ -236,6 +236,7 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
:returns dict: subscanname:dict(fullname:dict)
"""
ret = {}
config = bool(devices)
for subscan, subscaninfo in scan_info["acquisition_chain"].items():
subdevices = ret[subscan] = {}
# These are the "positioners"
......@@ -252,7 +253,8 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
device["device_type"] = "positioner"
device["master_index"] = master_index
master_index += 1
_add_display_name(device, fullname, aliasmap)
if _allow_alias(device, config):
_add_alias(device, fullname, aliasmap)
# These are the 0D, 1D and 2D "detectors"
dic = subscaninfo
aliasmap = dic.get("display_names", {})
......@@ -263,7 +265,8 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
device = update_device(subdevices, fullname, units)
if fullname.startswith("timer") and key == "scalars":
device["device_type"] = "positionergroup"
_add_display_name(device, fullname, aliasmap)
if _allow_alias(device, config):
_add_alias(device, fullname, aliasmap)
parse_devices(
subdevices,
short_names=short_names,
......@@ -272,7 +275,14 @@ def device_info(devices, scan_info, short_names=True, multivalue_positioners=Fal
return ret
def _add_display_name(device, fullname, display_names):
def _allow_alias(device, config):
"""
Allowing channel aliases without configuration creates a mess
"""
return config or device["device_type"] in ["positioner", "positionergroup"]
def _add_alias(device, fullname, display_names):
"""
:param dict device:
:param str fullname:
......
......@@ -109,7 +109,7 @@ class NexusScanWriterBase(gevent.Greenlet):
# Cache
self._data_nodes = [] # list(bliss.data.node.DataNode)
self._datasets = {} # subscan:dict(bliss.data.node.DataNode.fullname:DatasetProxy)
self._nxroot = defaultdict(lambda: None) # cache for recursive calling
self._nxroot = {} # cache for recursive calling
self._nxentry = None # cache for recursive calling
self._nxroot_locks = locks
self._devices = {} # subscan:dict(fullname:dict)
......@@ -283,7 +283,8 @@ class NexusScanWriterBase(gevent.Greenlet):
Yields the NXroot instance (h5py.File) or None
when information is missing
"""
if self._nxroot[level] is None:
nxroot = self._nxroot.get(level, None)
if nxroot is None:
filename = self._filename(level=level)
if filename:
# REMARK: External readers could have opened
......@@ -306,7 +307,7 @@ class NexusScanWriterBase(gevent.Greenlet):
self._nxroot[level] = None
yield None
else:
yield self._nxroot[level]
yield nxroot
@contextmanager
def nxentry(self, subscan):
......@@ -681,7 +682,7 @@ class NexusScanWriterBase(gevent.Greenlet):
if nxentry is not None:
with self._nxroot_locks.acquire(nxentry.file.filename):
nexus.updated(nxentry, final=True, parents=True)
self.logger.info("Scan marked as DONE in HDF5")
self.logger.info("Scan marked as DONE in HDF5")
@property
def instrument_name(self):
......
......@@ -553,8 +553,8 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
:param str subscan:
"""
filenames = self.filenames
n = len(filenames) - 1
if n <= 0:
n = len(filenames)
if n <= 1:
return
filenames = filenames[1:]
with self.nxentry(subscan) as nxentry:
......@@ -564,7 +564,7 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
prefix, ext = os.path.splitext(os.path.basename(nxentry.file.filename))
link = h5py.ExternalLink(nxentry.file.filename, nxentry.name)
linkname = prefix + ": " + nxentry.name[1:]
for level in range(1, n + 1):
for level in range(1, n):
with self.nxroot(level=level) as nxroot:
if nxroot is None:
continue
......
......@@ -122,13 +122,14 @@ def validate_master_links(scan, subscan=1, config=True):
:param bool config: configurable writer
"""
uri = scan_utils.scan_uri(scan, subscan=subscan, config=config)
uri = nexus.normUri(uri)
for filename in scan_utils.scan_filenames(scan, config=config):
with nexus.nxRoot(filename) as nxroot:
for key in nxroot:
if uri == nexus.getUri(nxroot[key]):
if uri == nexus.normUri(nexus.getUri(nxroot[key])):
break
else:
assert False, filename
assert False, uri
def validate_nxentry(nxentry, config=True, withpolicy=True, technique=None):
......
......@@ -38,7 +38,7 @@ class TimeOutError(Exception):
pass
def wait_scan_data_finished(scans, config=True, timeout=60, writer_stdout=None):
def wait_scan_data_finished(scans, config=True, timeout=20, writer_stdout=None):
"""
:param list(bliss.scanning.scan.Scan) scans:
:param bool config: configurable writer
......@@ -50,13 +50,27 @@ def wait_scan_data_finished(scans, config=True, timeout=60, writer_stdout=None):
with gevent.Timeout(timeout, TimeOutError):
while uris:
uris = [uri for uri in uris if not nexus.nxComplete(uri)]
gevent.sleep(1)
gevent.sleep(0.1)
except TimeOutError:
# _terminate_writer()
if writer_stdout is not None:
print_output(writer_stdout)
assert not uris, uris
def _terminate_writer():
"""
This is a temporary fix when HDF5 files appear not updated.
Not sure yet what causes this.
"""
import psutil
for proc in psutil.process_iter():
if "nexus_writer_" in str(proc.cmdline()):
proc.kill()
# proc.wait()
def wait_scan_data_exists(scans, config=True, timeout=120):
"""
:param list(bliss.scanning.scan.Scan) scans:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment