GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit d32968f5 authored by Wout De Nolf's avatar Wout De Nolf

[writer] nexus writer service

parent d1f8edac
Pipeline #19314 failed with stages
in 32 minutes and 48 seconds
......@@ -293,8 +293,9 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
def stop(self):
log_debug(self, "SIMULATION_COUNTER_ACQ_DEV -- stop()")
if self.distribution == "GAUSSIAN" and not self.is_count_scan():
print(
f"SIMULATION_COUNTER_ACQ_DEV -- (Theorical values) {self.name} mu={self.mu:g} sigma={self.sigma:g} fwhm={self.fwhm:g}"
log_debug(
self,
f"SIMULATION_COUNTER_ACQ_DEV -- (Theorical values) {self.name} mu={self.mu:g} sigma={self.sigma:g} fwhm={self.fwhm:g}",
)
pass
......
......@@ -323,12 +323,7 @@ class LimaImageChannelDataNode(DataNode):
file_path = path_format % file_nb
if file_format == "HDF5":
returned_params.append(
(
file_path,
self._path_in_hdf5(file_path),
image_index_in_file,
file_format,
)
(file_path, "entry0000", image_index_in_file, file_format)
)
else:
returned_params.append(
......@@ -337,7 +332,7 @@ class LimaImageChannelDataNode(DataNode):
return returned_params
@staticmethod
def _path_in_hdf5(filename, signal=True):
def _default_hdf5_dataset(f, signal=True):
# TODO:
# same as nexus_writer_service.io.nexus.getDefault
# handle OSError?
......@@ -351,34 +346,33 @@ class LimaImageChannelDataNode(DataNode):
return v
path = ""
with h5py.File(filename, mode="r") as f:
default = strattr(f, "default", "")
if default and not default.startswith("/"):
default = "/" + default
while default:
try:
node = f[default]
except KeyError:
break
nxclass = strattr(node, "NX_class", "")
if nxclass == "NXdata":
if signal:
name = strattr(node, "signal", "data")
try:
path = node[name].name
except KeyError:
pass
else:
path = node.name
break
default = strattr(f, "default", "")
if default and not default.startswith("/"):
default = "/" + default
while default:
try:
node = f[default]
except KeyError:
break
nxclass = strattr(node, "NX_class", "")
if nxclass == "NXdata":
if signal:
name = strattr(node, "signal", "data")
try:
path = node[name].name
except KeyError:
pass
else:
add = strattr(node, "default", "")
if add.startswith("/"):
default = add
elif add:
default += "/" + add
else:
break
path = node.name
break
else:
add = strattr(node, "default", "")
if add.startswith("/"):
default = add
elif add:
default += "/" + add
else:
break
return path
def _get_from_file(self, image_nb):
......@@ -400,6 +394,7 @@ class LimaImageChannelDataNode(DataNode):
elif file_format == "HDF5":
if h5py is not None:
with h5py.File(filename) as f:
path_in_file = self._default_hdf5_dataset(f)
dataset = f[path_in_file]
return dataset[image_index]
else:
......
......@@ -1220,7 +1220,7 @@ class ChainNode:
def set_parameters(self, acq_params=None, ctrl_params=None, force=False):
""" Store the scan and/or acquisition parameters into the node.
These parameters will be used when the acquisition object is instanciated (see self.create_acquisition_object )
These parameters will be used when the acquisition object is instantiated (see self.create_acquisition_object )
If the parameters have been set already, new parameters will be ignored (except if Force==True).
"""
......
__all__ = ["csv", "hdf5"]
__all__ = ["hdf5", "nexus", "null"]
This diff is collapsed.
......@@ -38,4 +38,4 @@ class Writer(FileWriter):
@property
def filename(self):
return os.path.join(self.root_path, self.data_filename + ".null")
return ""
# External data processing
BLISS offers the possiblitiy to a separate process (on the system level) for retrieving the acquired data in order to save or process.
# Example 1: Save data in HDF5
!!! note
BLISS already comes with on external writer. This is just an example.
The example script discussed here is provided in Bliss repository
at [scripts/external_saving_example/external_saving_example.py](https://gitlab.esrf.fr/bliss/bliss/blob/master/scripts/external_saving_example/external_saving_example.py).
To have a minimal working Bliss environment have a look at the [installation notes](index.md#installation-outside-esrf)
and the [test_configuration setup](index.md#use-bliss-without-hardware).
### Listening to a Bliss session
When running the script
python scripts/external_saving_example/external_saving_example.py
it is listening to new scans in the Bliss __test_session__
```python
listen_to_session_wait_for_scans("test_session")
```
Connect to the node 'test_session' in redis:
```python
session_node = get_session_node(session)
```
Using the `walk_on_new_events()` function with `filter="scan"`(limit walk to nodes of type `node.type == "scan"` ) in order to handle new events on scan nodes:
- `NEW_NODE` when a new scan is launched
- `END_SCAN` when a scan terminates.
```python
# wait for new events on scan
for event_type, node in session_node.iterator.walk_on_new_events(
filter="scan", from_next=True):
```
### Receiving events from a scan
In the example script a new instance of the class `HDF5_Writer` is created per scan that is started. Following the
initialisation a _gevent greenlet_ is spawned to run the actual listener in a non blocking way. Inside `def run(self)`
a second iterator is started walking through all events emitted by the scan
[(see data structure section)](data_structure.md#experiment-and-redis-data-structure):
```python
for event_type, node in self.scan_node.iterator.walk_events():
```
!!!hint
Data generated by scans in BLISS is emitted though a structure called _channel_. Further reading can be found at
- [Overview scan engine](scan_engine.md)
- [Bliss data nodes](scan_data_node.md)
- [Data structure of published data](data_structure.md)
Once an event is received it can be categorized by the event type:
- `NEW_NODE`
- `NEW_DATA_IN_CHANNEL`
and by node type:
- `channel`
- `lima`
Currently 0d and and 1d data is directly kept in redis and published through _channels_ (`node.type == "channel"`).
For each new _channel_ a corresponding hdf5 dataset is created and filled with data emitted on a `"NEW_DATA_IN_CHANNEL"` event.
2d data (e.g. lima images) is not saved in redis itself, but can be retrieved through references (method `get_image` of class `LimaDataView`). However, for this example we
do not want to deal with the 2d data itself but only resolve the final saving destination.
In this example data from channels (not lima) is written to the hdf5 file as soon
as the event is received, but references of images are only saved in hdf5 once the scan has ended (nothing is preventing to do this also on the fly).
### Finalizing the scan dataset on "END_SCAN"
Once the `END_SCAN` event is received the `finalize` method of the `HDF5_Writer` instance is called
to
- 1) stop the listening on events of the regarding scan,
- 2) have a final synchronization for all datasets of the scan and
- 3) to write `instrument` and meta-data entries to hdf5.
### Meta-data and Instrument dataset
Each scan has an attached `scan_info` structure (nested dict) which e.g. contains meta-data entries which also have to be put into the hdf5. In Bliss there is a `dicttoh5`
function which is derived from its pendant in _silx.io.dictdump_, that puts in place correct `h5dataset.attrs["NX_class"]` attributes when converting the python dict
structure into hdf5 datasets.
## Examples of complex scans
In order to have some test cases for more demanding scans when working with the presented api a script file that can be executed inside the Bliss shell is provided:
```python
TEST_SESSION [1]: exec(open("scripts/external_saving_example/some_scans.py").read())
```
The same scans can also be executed using Bliss in a library mode running (_TANGO_HOST_ to be chosen according to the running server...)
```bash
BEACON_HOST=localhost TANGO_HOST=localhost:20000 python scripts/external_saving_example/some_scans_bliss_as_library.py
```
This diff is collapsed.
......@@ -15,13 +15,20 @@
:toctree:
nexus_writer_service
session_writer
nexus_register_writer
session_api
metadata
writers
subscribers
io
utils
tango
"""
# TODO: not sure we need this
from gevent import monkey
monkey.patch_all(thread=False)
import logging
from .utils import logging_utils
......
......@@ -18,36 +18,15 @@ import numpy
import fabio
from fabio.edfimage import EdfImage
from .io_utils import mkdir
def swap_flattening_order(lst, shape, order):
"""
Swap order of flattened list
:param list lst: flattened shape
:param tuple shape: original shape of `lst`
:param str order: flattening order of `lst`
:returns list:
"""
if len(shape) <= 1:
return lst
if order == "C":
ofrom, oto = "C", "F"
elif order == "F":
ofrom, oto = "F", "C"
else:
raise ValueError("Order must be 'C' or 'F'")
idx = numpy.arange(len(lst))
idx = idx.reshape(shape, order=oto)
idx = idx.flatten(order=ofrom)
return [lst[i] for i in idx]
from ..utils.array_order import Order
def add_edf_arguments(filenames, createkwargs=None):
"""
Arguments for `h5py.create_dataset` to link to EDF data frames.
:param list(str or tuple) filenames: file names (str) and optionally image indices (tuple)
:param list(str or tuple) filenames: file names (str) and optionally
image indices (tuple)
:param dict: result of previous call to append to
:returns dict:
:raises RuntimeError: not supported by external datasets
......@@ -67,7 +46,7 @@ def add_edf_arguments(filenames, createkwargs=None):
if not isinstance(indices, (tuple, list)):
indices = [indices]
else:
indices = None
indices = []
if ".edf." in os.path.basename(filename):
raise RuntimeError(
"{}: external datasets with compression not supported".format(
......@@ -76,14 +55,10 @@ def add_edf_arguments(filenames, createkwargs=None):
)
if indices:
img = fabio.open(filename)
# EdfImage.getframe returns an EdfImage, not an EdfFrame
def getframe(img):
return img._frames[img.currentframe]
it = (getframe(img.getframe(i)) for i in indices)
it = (img._frames[i] for i in indices)
else:
it = EdfImage.lazy_iterator(filename)
for frame in it:
if frame.swap_needed():
raise RuntimeError(
......@@ -103,6 +78,9 @@ def add_edf_arguments(filenames, createkwargs=None):
)
)
shapei = frame.shape
if len(shapei) == 1:
# TODO: bug in fabio?
shapei = 1, shapei[0]
dtypei = frame.dtype
start = frame.start
size = frame.size # TODO: need compressed size
......@@ -159,7 +137,11 @@ def resize(createkwargs, enframes, filename, fillvalue):
if os.path.splitext(filename)[-1] != ext:
filename += ext
if ext == ".edf":
mkdir(filename)
mkdir(os.path.dirname(filename))
if not frame_shape:
frame_shape = 1, 1
elif len(frame_shape) == 1:
frame_shape = 1, frame_shape[0]
EdfImage(data=numpy.full(fillvalue, frame_shape)).write(filename)
else:
raise RuntimeError(
......@@ -169,13 +151,14 @@ def resize(createkwargs, enframes, filename, fillvalue):
return nframes - enframes
def finalize(createkwargs, order="C", shape=None):
def finalize(createkwargs, addorder=None, fillorder=None, shape=None):
"""
Finalize external dataset arguments: define shape
:param dict createkwargs:
:param tuple shape: scan shape (default: (nframes,))
:param str order: fill order of shape
:param Order addorder: add order to external
:param Order fillorder: fill order of shape
:raises RuntimeError: scan shape does not match number of frames
"""
nframes = len(createkwargs["external"])
......@@ -184,26 +167,35 @@ def finalize(createkwargs, order="C", shape=None):
raise RuntimeError("The shape of one external frame must be provided")
if shape:
createkwargs["shape"] = shape + frame_shape
if order == "F":
external = swap_flattening_order(createkwargs["external"], shape, "C")
createkwargs["external"] = external
if not isinstance(addorder, Order):
addorder = Order(addorder)
if not isinstance(fillorder, Order):
fillorder = Order(fillorder)
if fillorder.forder:
raise ValueError("External HDF5 datasets are always saved in C-order")
createkwargs["external"] = addorder.swap_list(
createkwargs["external"], shape, fillorder
)
else:
createkwargs["shape"] = (nframes,) + frame_shape
def add_arguments(file_format, filenames, shape=None, createkwargs=None):
def add_arguments(filenames, createkwargs=None):
"""
Arguments for `h5py.create_dataset` to link to data frames.
The resulting shape will be `shape + frame_shape`
:param str file_format:
:param list(str or tuple) filenames: file names (str) and optionally image indices (tuple)
:param dict: result of previous call to append to
:param order(str): refers to the scan dimensions, not the image dimensions
:param dict createkwargs: result of previous call to append to
:returns dict:
"""
if file_format == "edf":
if not filenames:
return
first = filenames[0]
if isinstance(first, (tuple, list)):
first = first[0]
file_format = os.path.splitext(first)[-1].lower()
if file_format == ".edf":
return add_edf_arguments(filenames, createkwargs=createkwargs)
else:
raise ValueError("Unknown external data format " + repr(file_format))
......@@ -50,11 +50,25 @@ def mkdir(path):
:param str path:
"""
try:
if path:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
if path:
path = os.path.abspath(path)
os.makedirs(path, exist_ok=True)
def close_files(*fds):
exceptions = []
for fd in fds:
try:
if fd is None:
continue
try:
os.close(fd)
except OSError as e:
if e.errno == errno.EBADF:
pass
else:
raise
except Exception as e:
exceptions.append(e)
if exceptions:
raise Exception(exceptions)
This diff is collapsed.
......@@ -15,9 +15,9 @@ Register metadata generators for a configurable writer
import enum
from bliss.scanning import scan_meta
from .scan_writers import writer_config_publish
from .subscribers import scan_writer_publish
GENERATORS = {"writer_config": writer_config_publish}
GENERATORS = {"writer_config": scan_writer_publish}
def register_all_metadata_generators(force=False):
......
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Nexus writer service for Bliss
"""
import os
from bliss.common.tango import DeviceProxy, DevFailed, Database
from tango import DbDevInfo
from .utils import logging_utils
logger = logging_utils.getLogger(__name__, __file__)
WRITER_CLASS = "NexusWriter"
def beamline():
"""
:returns str:
"""
name = "id00"
for k in "BEAMLINENAME", "BEAMLINE":
name = os.environ.get(k, name)
return name.lower()
def find_session_writers(session_name, db=None):
"""
Find all TANGO devices of class NexusWriter listening to a particular BLISS session.
:param std session_name:
:param Database db:
:returns list(str): device names
"""
if db is None:
db = Database()
writers = []
for obj_name in db.get_device_name("*", WRITER_CLASS):
prop = db.get_device_property(obj_name, "session")["session"]
if prop:
if session_name == prop[0]:
writers.append(obj_name)
logger.info(
"Registered writers for session {}: {}".format(repr(session_name), writers)
)
return writers
def find_session_writer(session_name, db=None):
"""
Find TANGO device of class NexusWriter listening to a particular BLISS session.
:param std session_name:
:param Database db:
:returns str or None: device name
:raises RuntimeError: more than one writer listening to session
"""
writers = find_session_writers(session_name, db=db)
if writers:
if len(writers) > 1:
raise ValueError(
"Found more than one writer for session {}: {}".format(
repr(session_name), writers
)
)
return writers[0]
else:
return None
def get_uri(p):
return "tango://{}:{}/{}".format(p.get_db_host(), p.get_db_port(), p.dev_name())
def ensure_existence(
session_name,
server="nexuswriter",
instance="nexuswriters",
domain=None,
family="bliss_nxwriter",
member=None,
use_existing=True,
):
"""
Find or register TANGO device of class NexusWriter
:param str session_name:
:param str server: device server name
:param str instance: device server instance
:param str domain: location of device
:param str family: type of device
:param str member: device name (Default: `session_name`)
:param bool use_existing: use an existing writer for this session (if any)
:returns DeviceProxy:
"""
db = Database()
if not member:
member = session_name
if not domain():
domain = beamline()
dev_name = "/".join([domain, family, member])
if use_existing:
pdev_name = find_session_writer(session_name, db=db)
if pdev_name:
proxy = DeviceProxy(pdev_name)
msg = "{} already registered".format(repr(get_uri(proxy)))
if dev_name == pdev_name:
logger.info(msg)
else:
logger.warning(msg)
return proxy
return register(session_name, dev_name, server=server, instance=instance)
def register(session_name, dev_name, server="nexuswriter", instance="nexuswriters"):
"""
Register TANGO device of class NexusWriter
:param str session_name:
:param str dev_name: for example id00/bliss_nxwriter/test_session
:param str instance: device server instance
:returns DeviceProxy:
"""
# Register proxy is not already registered
try:
proxy = DeviceProxy(dev_name)
logger.info("{} already registered".format(repr(get_uri(proxy))))
except DevFailed:
db = Database()
dev_info = DbDevInfo()
dev_info.name = dev_name
dev_info._class = WRITER_CLASS
server = "/".join([server, instance])
dev_info.server = server
db.add_device(dev_info)
proxy = DeviceProxy(dev_name)
logger.info("{} registered".format(repr(get_uri(proxy))))
proxy.put_property({"session": session_name})
try:
session = proxy.get_property("session")["session"][0]
except (IndexError, KeyError):
session = ""
if session != session_name:
raise RuntimeError(
"{} is listening to Bliss session {} instead of {}".format(