Commit 200d9e6b authored by Wout De Nolf's avatar Wout De Nolf

[writer] save groups and node/scan metadata

parent b763a482
Pipeline #19745 passed with stages
in 35 minutes and 40 seconds
......@@ -19,13 +19,13 @@ device:
session: test_session
```
Alternatively you can register the device manually or by using this helper script which ensures there there is only writer listening per BLISS session
The device class should always be __NexusWriter__ and the __session__ property should be the BLISS session name. If you want to register the device manually with the TANGO database, you can use a helper function to avoid mistakes (correct class name and session property, only one TANGO device per session)
```bash
$ python -m nexus_writer_service.nexus_register_writer test_session --domain id00 --instance nexuswriters
```
In this example we registered a writer for BLISS session __test_session__ which runs under domain __id00__ in TANGO server instance __nexuswriters__. The device family is __bliss_nxwriter__ by default and the device name is equal to the session name. Running multiple session writers in on TANGO server instance (i.e. one process) is allowed but not recommended if the associated BLISS sessions may produce lots of data simultaneously.
In this example we registered a writer for BLISS session __test_session__ which runs under domain __id00__ in TANGO server instance __nexuswriters__. By default the device family is __bliss_nxwriter__ and the device name is equal to the session name. Running multiple session writers in on TANGO server instance (i.e. one process) is allowed but not recommended if the associated BLISS sessions may produce lots of data simultaneously.
### Start the Tango server
......
......@@ -68,7 +68,7 @@ def close_files(*fds):
pass
else:
raise
except Exception as e:
except BaseException as e:
exceptions.append(e)
if exceptions:
raise Exception(exceptions)
This diff is collapsed.
......@@ -28,14 +28,36 @@ def register_all_metadata_generators(force=False):
:param bool force: re-initialize when already done
"""
kwargs = {k: True for k in GENERATORS}
register_metadata_generators(force=force, **kwargs)
register_metadata_categories(force=force, **kwargs)
register_metadata_generators(**kwargs)
def register_metadata_generators(force=False, **kwargs):
def register_metadata_generators(**kwargs):
"""
Register metadata generators in a bliss session for
the scan writers (currently only one).
:param **kwargs: any key of `GENERATORS`
"""
# Generators are called at the start of the scan:
# bliss.scanning.scan.Scan.__init__
# and at the end of the scan
# run bliss.scanning.scan.Scan.run (cleanup section)
#
# The generator 'instrument.positioners' is an exception.
# It is only called at the beginning of the scan by
# removing it before calling the generators a second time.
generators = scan_meta.get_user_scan_meta()
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
mod.register_metadata_generators(generators)
def register_metadata_categories(force=False, **kwargs):
"""
Register metadata categories in a bliss session for
the scan writers (currently only one).
:param bool force: re-initialize when already done
:param **kwargs: any key of `GENERATORS`
"""
......@@ -54,16 +76,4 @@ def register_metadata_generators(force=False, **kwargs):
scan_meta.CATEGORIES = enum.Enum(
scan_meta.CATEGORIES.__name__, list(categories)
)
generators = scan_meta.scan_meta()
scan_meta.USER_SCAN_META = generators
# Generators are called at the start of the scan:
# bliss.scanning.scan.Scan.__init__
# and at the end of the scan
# run bliss.scanning.scan.Scan.run (cleanup section)
#
# The generator 'instrument.positioners' is an exception.
# It is only called at the beginning of the scan by
# removing it before calling the generators a second time.
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
mod.register_metadata_generators(generators)
scan_meta.USER_SCAN_META = scan_meta.scan_meta()
......@@ -106,7 +106,7 @@ def ensure_existence(
db = Database()
if not member:
member = session_name
if not domain():
if not domain:
domain = beamline()
dev_name = "/".join([domain, family, member])
if use_existing:
......
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
import abc
import logging
import numpy
from contextlib import contextmanager
from ..utils.logging_utils import CustomLogger
from ..io import nexus
logger = logging.getLogger(__name__)
class BaseProxy(abc.ABC):
"""
Wraps HDF5 creation and growth.
"""
def __init__(self, filename=None, parent=None, filecontext=None, parentlogger=None):
"""
:param str filename: HDF5 file name
:param str filecontext: HDF5 open context manager
:param str parent: path in the HDF5 file
:param parentlogger:
"""
if filecontext is None:
filecontext = self._filecontext
self.filename = filename
self.filecontext = filecontext
self.parent = parent
if parentlogger is None:
parentlogger = logger
self.logger = CustomLogger(parentlogger, self)
self.npoints = 0
def __repr__(self):
if self.name:
return self.path
else:
return os.path.splitext(os.path.basename(self.filename))[0]
@property
def path(self):
if self.name:
return "/".join([self.parent, self.name])
else:
return self.parent
@property
def uri(self):
return self.filename + "::" + self.path
@abc.abstractproperty
def name(self):
pass
@contextmanager
def _filecontext(self):
with nexus.nxRoot(self.filename, mode="a") as nxroot:
yield nxroot
def ensure_existance(self):
with self.filecontext() as nxroot:
if self.exists:
return
self._create(nxroot)
@abc.abstractmethod
def _create(self, nxroot):
pass
@property
def exists(self):
"""
:returns bool:
"""
with self.filecontext() as nxroot:
return self.path in nxroot
@contextmanager
def open(self, ensure_existance=False):
"""
:param bool ensure_existance:
:yields h5py.Dataset or None:
"""
with self.filecontext() as nxroot:
if ensure_existance:
self.ensure_existance()
if self.path in nxroot:
yield nxroot[self.path]
else:
self.logger.warning(repr(self.uri) + " does not exist")
yield None
def add(self, newdata):
"""
Add data
:param sequence newdata:
"""
with self.open(ensure_existance=True) as destination:
try:
self.npoints += self._insert_data(destination, newdata)
except TypeError as e:
self.logger.error(e)
raise
@abc.abstractmethod
def _insert_data(self, destination, newdata):
"""
Insert new data in dataset
:param h5py.Dataset or h5py.Group dset:
:param sequence newdata:
:returns int: number of added points
"""
pass
@property
def npoints_expected(self):
return 0
@property
def complete(self):
"""
Variable length scans are marked complete when we have some data
"""
n, nall = self.npoints, self.npoints_expected
return n and n >= nall
@property
def progress(self):
if self.npoints_expected:
return self.npoints / self.npoints_expected
else:
if self.npoints:
return numpy.nan
else:
return 0
@property
def progress_string(self):
if self.npoints_expected:
sortkey = self.npoints / self.npoints_expected
s = "{:.0f}%".format(sortkey * 100)
else:
sortkey = self.npoints
s = "{:d}pts".format(sortkey)
return s, sortkey
@property
def _progress_log_suffix(self):
return ""
def log_progress(self, expect_complete=False):
"""
:param bool expect_complete:
:returns int, bool, str:
"""
npoints_expected = self.npoints_expected
npoints_current = self.npoints
complete = self.complete
if expect_complete:
if complete:
msg = "{}/{} points published{}".format(
npoints_current, npoints_expected, self._progress_log_suffix
)
self.logger.debug(msg)
else:
msg = "only {}/{} points published{}".format(
npoints_current, npoints_expected, self._progress_log_suffix
)
self.logger.warning(msg)
else:
msg = "progress {}/{}{}".format(
npoints_current, npoints_expected, self._progress_log_suffix
)
self.logger.debug(msg)
return complete
......@@ -87,12 +87,14 @@ class BaseSubscriber(object):
STATES.FAULT: [],
}
def __init__(self, db_name, node_type=None, parentlogger=None, profiling=False):
def __init__(
self, db_name, node_type=None, parentlogger=None, resource_profiling=False
):
"""
:param str db_name:
:param str node_type:
:param parentlogger:
:param bool profiling: mem and cpu usage
:param bool resource_profiling:
"""
self.state = self.STATES.INIT
self.state_reason = "instantiation"
......@@ -103,7 +105,7 @@ class BaseSubscriber(object):
if parentlogger is None:
parentlogger = logger
self.logger = CustomLogger(parentlogger, self)
self.profiling = profiling
self.resource_profiling = resource_profiling
self._greenlet = None
self._log_task_period = 5
......@@ -292,7 +294,7 @@ class BaseSubscriber(object):
"""
try:
yield
except Exception as e:
except BaseException as e:
# No need to reraise or set the listener state to FAULT
self.logger.error(
"Exception while {}:\n{}".format(action, traceback.format_exc())
......@@ -303,7 +305,7 @@ class BaseSubscriber(object):
Greenlet main function without the resource (de)allocation
"""
try:
if self.profiling:
if self.resource_profiling:
with profiling.profile(
logger=self.logger,
timelimit=50,
......@@ -320,7 +322,7 @@ class BaseSubscriber(object):
except KeyboardInterrupt:
self._set_state(self.STATES.FAULT, "KeyboardInterrupt")
self.logger.warning("Stop listening to Redis events (KeyboardInterrupt)")
except Exception as e:
except BaseException as e:
self._set_state(self.STATES.FAULT, e)
self.logger.error(
"Stop listening due to exception:\n{}".format(traceback.format_exc())
......@@ -350,7 +352,7 @@ class BaseSubscriber(object):
except KeyboardInterrupt:
self._set_state(self.STATES.FAULT, "KeyboardInterrupt")
raise
except Exception as e:
except BaseException as e:
self._set_state(self.STATES.FAULT, e)
self.logger.warning(
"Processing {} event caused an exception:\n{}".format(
......@@ -363,7 +365,7 @@ class BaseSubscriber(object):
finally:
try:
self._event_loop_finalize(**kwargs)
except Exception as e:
except BaseException as e:
self._set_state(self.STATES.FAULT, e)
self.logger.error(
"Not properly finalized due to exception:\n{}".format(
......
......@@ -18,10 +18,10 @@ from gevent import sleep
from collections import OrderedDict
from contextlib import contextmanager
from fabio.edfimage import EdfImage
from .base_proxy import BaseProxy
from ..io import nexus
from ..io import h5_external
from ..io.io_utils import mkdir
from ..utils import logging_utils
from ..utils.array_order import Order
......@@ -83,7 +83,7 @@ class FileSizeMonitor:
self.reset()
class DatasetProxy:
class DatasetProxy(BaseProxy):
"""
Wraps HDF5 dataset creating and growth.
"""
......@@ -116,12 +116,14 @@ class DatasetProxy:
:param Order publishorder: order in which the scan shape is published
:param parentlogger:
"""
# HDF5 parameters
if filecontext is None:
filecontext = self._filecontext
self.filename = filename
self.filecontext = filecontext
self.parent = parent
if parentlogger is None:
parentlogger = logger
super().__init__(
filename=filename,
parent=parent,
filecontext=filecontext,
parentlogger=parentlogger,
)
# Shape and order
if sum(n == 0 for n in scan_shape) > 1:
......@@ -145,33 +147,16 @@ class DatasetProxy:
self.device = device
# Internals
self.npoints = 0
self._external_raw = []
self._external_datasets = []
self._external_names = []
self._external_raw_formats = ["edf"]
if parentlogger is None:
parentlogger = logger
self.logger = logging_utils.CustomLogger(parentlogger, self)
@contextmanager
def _filecontext(self):
with nexus.nxRoot(self.filename, mode="a") as nxroot:
yield nxroot
def __repr__(self):
return "{}: shape = {}, dtype={}".format(
repr(self.path), self.shape, self.dtype.__name__
)
@property
def path(self):
return "/".join([self.parent, self.name])
@property
def uri(self):
return self.filename + "::" + self.path
@property
def name(self):
return normalize_nexus_name(self.device["data_name"])
......@@ -339,7 +324,7 @@ class DatasetProxy:
self._external_names += newdata
self.npoints = len(self._external_names)
def add_internal(self, newdata):
def add(self, newdata):
"""
Add data to dataset (copy)
......@@ -349,12 +334,9 @@ class DatasetProxy:
if self.is_external:
msg = "{} already has external data".format(self)
raise RuntimeError(msg)
with self.open(ensure_existance=True) as dset:
try:
self.npoints += self._insert_data(dset, newdata)
except TypeError as e:
self.logger.error(e)
raise
super().add(newdata)
add_internal = add
def _insert_data(self, dset, newdata):
"""
......@@ -496,6 +478,8 @@ class DatasetProxy:
"""
Value reader gets for uninitialized elements
"""
if self.dtype in (str, bytes):
return ""
fillvalue = numpy.nan
try:
numpy.array(fillvalue, self.dtype)
......@@ -688,90 +672,16 @@ class DatasetProxy:
attrs = {k: v for k, v in attrs.items() if v is not None}
return attrs
def ensure_existance(self):
with self.filecontext() as nxroot:
if self.exists:
return
parent = nxroot[self.parent]
nexus.nxCreateDataSet(parent, self.name, self._dset_value, self._dset_attrs)
@property
def exists(self):
"""
:returns bool:
def _create(self, nxroot):
"""
with self.filecontext() as nxroot:
return self.path in nxroot
@contextmanager
def open(self, ensure_existance=False):
Create the dataset
"""
:param bool ensure_existance:
:yields h5py.Dataset or None:
"""
with self.filecontext() as nxroot:
if ensure_existance:
self.ensure_existance()
if self.path in nxroot:
yield nxroot[self.path]
else:
self.logger.warning(repr(self.uri) + " does not exist")
yield None
parent = nxroot[self.parent]
nexus.nxCreateDataSet(parent, self.name, self._dset_value, self._dset_attrs)
@property
def complete(self):
"""
Variable length scans are marked complete when we have some data
"""
n, nall = self.npoints, self.npoints_expected
return n and n >= nall
@property
def progress(self):
if self.npoints_expected:
return self.npoints / self.npoints_expected
else:
if self.npoints:
return numpy.nan
else:
return 0
@property
def progress_string(self):
if self.npoints_expected:
sortkey = self.npoints / self.npoints_expected
s = "{:.0f}%".format(sortkey * 100)
else:
sortkey = self.npoints
s = "{:d}pts".format(sortkey)
return s, sortkey
def log_progress(self, expect_complete=False):
"""
:param bool expect_complete:
:returns int, bool, str:
"""
npoints_expected = self.npoints_expected
npoints_current = self.npoints
complete = self.complete
datasize = format_bytes(self.current_bytes)
if expect_complete:
if complete:
msg = "{}/{} points published ({})".format(
npoints_current, npoints_expected, datasize
)
self.logger.debug(msg)
else:
msg = "only {}/{} points published ({})".format(
npoints_current, npoints_expected, datasize
)
self.logger.warning(msg)
else:
msg = "progress {}/{} ({})".format(
npoints_current, npoints_expected, datasize
)
self.logger.debug(msg)
return complete
def _progress_log_string(self):
return " ({})".format(format_bytes(self.current_bytes))
def reshape(self, scan_save_shape, detector_shape=None):
"""
......@@ -905,3 +815,16 @@ class DatasetProxy:
"_".join(map(str, self.current_detector_shape)) + "_" + self.dtype.__name__
)
return os.path.join(dirname, "dummy", "dummy_" + name + ext)
def add_metadata(self, treedict, parent=False, **kwargs):
"""
Add datasets/attributes (typically used for metadata)
:param dict treedict:
:param bool parent:
:param kwargs: see `dicttonx`
"""
with self.open(ensure_existance=True) as destination:
if parent:
destination = destination.parent
nexus.dicttonx(treedict, destination, **kwargs)
......@@ -43,9 +43,9 @@ timernamemap = {"elapsed_time": "value", "epoch": "epoch"}
timertypemap = {"elapsed_time": "principal", "epoch": "epoch"}
limanamemap = {"image": "data"}
limanamemap = {"image": "data", "sum": "data"}
limatypemap = {"image": "principal"}
limatypemap = {"image": "principal", "sum": "principal"}
counternamemap = {}
......@@ -79,45 +79,52 @@ def shortnamemap(names, separator=":"):
return ret
def fill_device(fullname, device):
def fill_device(fullname, device, device_info=None, data_info=None):
"""
Add missing keys with default values
device_type: type for the writer (not saved), e.g. positioner, mca, lima
device_name: HDF5 group name (measurement or positioners when missing)
device_info: HDF5 group datasets
data_type: "principal" (data of NXdetector or value of NXpositioner) or other
data_name: HDF5 dataset name
data_info: HDF5 dataset attributes
unique_name: Unique name for HDF5 links
master_index: >=0 axis order used for plotting
dependencies: fullnames
:param str fulname:
:param dict device:
:param dict device_info:
:param dict data_info:
"""
device["device_type"] = device.get(
"device_type", ""
) # type for the writer (not saved)
# e.g. positioner, mca
device["device_name"] = device.get("device_name", fullname) # HDF5 group name
# measurement or positioners when missing
device["device_info"] = device.get("device_info", {}) # HDF5 group datasets
device["data_type"] = device.get(
"data_type", "principal"
) # principal value of this HDF5 group
device["data_name"] = device.get("data_name", "data") # HDF5 dataset name
device["data_info"] = device.get("data_info", {}) # HDF5 dataset attributes