GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit 0da07c49 authored by Wout De Nolf's avatar Wout De Nolf Committed by Linus Pithan

porting NexusWriter from id21blissutis into bliss project

parent b57989a1
#NeXus complient external writer process
To start the external writer inside an environment where bliss is installed
To start a session writer as a process inside an environment where bliss is installed
```bash
$NexusWriter -s test_session --log=info
NexusSessionWriter test_session --log=info
```
To start a session writer as a service inside an environment where bliss is installed
```bash
NexusWriterService
```
# Example: using Bliss data api for hdf5 saving
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""External Nexus writer
......@@ -11,7 +15,15 @@
:toctree:
nexus_writer_service
session_writer
metadata
writers
io
utils
"""
import logging
from .utils import logging_utils
logger = logging.getLogger(__name__)
logging_utils.cliconfig(logger)
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from .nexus_writer_service import main
......
......@@ -13,4 +13,4 @@
nexus
io_utils
h5_external
"""
\ No newline at end of file
"""
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
......@@ -27,10 +31,10 @@ def swap_flattening_order(lst, shape, order):
"""
if len(shape) <= 1:
return lst
if order == 'C':
ofrom, oto = 'C', 'F'
elif order == 'F':
ofrom, oto = 'F', 'C'
if order == "C":
ofrom, oto = "C", "F"
elif order == "F":
ofrom, oto = "F", "C"
else:
raise ValueError("Order must be 'C' or 'F'")
idx = numpy.arange(len(lst))
......@@ -56,7 +60,7 @@ def add_edf_arguments(filenames, createkwargs=None):
return createkwargs
else:
filenames = [filenames]
shape0 = createkwargs.get('frame_shape', tuple())
shape0 = createkwargs.get("frame_shape", tuple())
for filename in filenames:
if isinstance(filename, (tuple, list)):
filename, indices = filename
......@@ -64,30 +68,40 @@ def add_edf_arguments(filenames, createkwargs=None):
indices = [indices]
else:
indices = None
if '.edf.' in os.path.basename(filename):
raise RuntimeError('{}: external datasets with compression not supported'
.format(repr(filename)))
if ".edf." in os.path.basename(filename):
raise RuntimeError(
"{}: external datasets with compression not supported".format(
repr(filename)
)
)
if indices:
img = fabio.open(filename)
# EdfImage.getframe returns an EdfImage, not a EdfFrame
def getframe(img):
return img._frames[img.currentframe]
it = (getframe(img.getframe(i)) for i in indices)
else:
it = EdfImage.lazy_iterator(filename)
for frame in it:
if frame.swap_needed():
raise RuntimeError('{} (frame {}): external datasets do not support byte-swap'
.format(repr(filename), frame.iFrame))
raise RuntimeError(
"{} (frame {}): external datasets do not support byte-swap".format(
repr(filename), frame.iFrame
)
)
compressioni = frame._data_compression
if compressioni:
compressioni = compressioni.lower()
if compressioni == "none":
compressioni = None
if compressioni is not None:
raise RuntimeError('{} (frame {}): external datasets with compression not supported'
.format(repr(filename), frame.iFrame))
raise RuntimeError(
"{} (frame {}): external datasets with compression not supported".format(
repr(filename), frame.iFrame
)
)
shapei = frame.shape
dtypei = frame.dtype
start = frame.start
......@@ -96,26 +110,28 @@ def add_edf_arguments(filenames, createkwargs=None):
def assertEqual(key, value, evalue):
if value != evalue:
raise RuntimeError('{} (frame {}): {} = {} instead of {}'
.format(repr(filename), frame.iFrame,
repr(key), value, evalue))
raise RuntimeError(
"{} (frame {}): {} = {} instead of {}".format(
repr(filename), frame.iFrame, repr(key), value, evalue
)
)
if shape0:
assertEqual('shape', shapei, shape0)
assertEqual("shape", shapei, shape0)
else:
createkwargs['frame_shape'] = shape0 = shapei
if 'dtype' in createkwargs:
assertEqual('dtype', dtypei, createkwargs['dtype'])
createkwargs["frame_shape"] = shape0 = shapei
if "dtype" in createkwargs:
assertEqual("dtype", dtypei, createkwargs["dtype"])
else:
createkwargs['dtype'] = dtypei
if 'compression' in createkwargs:
assertEqual('compression', compressioni, createkwargs['compression'])
createkwargs["dtype"] = dtypei
if "compression" in createkwargs:
assertEqual("compression", compressioni, createkwargs["compression"])
else:
createkwargs['compression'] = compressioni
if 'external' in createkwargs:
createkwargs['external'].append(external)
createkwargs["compression"] = compressioni
if "external" in createkwargs:
createkwargs["external"].append(external)
else:
createkwargs['external'] = [external]
createkwargs["external"] = [external]
return createkwargs
......@@ -129,30 +145,31 @@ def resize(createkwargs, enframes, filename, fillvalue):
:param num fillvalue: in case not enough external files
:returns int: number of frames skipped
"""
frame_shape = createkwargs.get('frame_shape', None)
frame_shape = createkwargs.get("frame_shape", None)
if not frame_shape:
raise RuntimeError('The shape of one external frame must be provided')
nframes = len(createkwargs['external'])
raise RuntimeError("The shape of one external frame must be provided")
nframes = len(createkwargs["external"])
if nframes > enframes:
createkwargs['external'] = createkwargs['external'][:enframes]
createkwargs["external"] = createkwargs["external"][:enframes]
elif nframes < enframes:
if nframes:
ext = os.path.splitext(createkwargs['external'][0])[-1]
ext = os.path.splitext(createkwargs["external"][0])[-1]
else:
ext = '.edf'
ext = ".edf"
if os.path.splitext(filename)[-1] != ext:
filename += ext
if ext == '.edf':
if ext == ".edf":
mkdir(filename)
EdfImage(data=numpy.full(fillvalue, frame_shape)).write(filename)
else:
raise RuntimeError('Dummy file with extension {} not supported'
.format(repr(ext)))
createkwargs['external'] += [filename] * (enframes-nframes)
raise RuntimeError(
"Dummy file with extension {} not supported".format(repr(ext))
)
createkwargs["external"] += [filename] * (enframes - nframes)
return nframes - enframes
def finalize(createkwargs, order='C', shape=None):
def finalize(createkwargs, order="C", shape=None):
"""
Finalize external dataset arguments: define shape
......@@ -161,17 +178,17 @@ def finalize(createkwargs, order='C', shape=None):
:param str order: fill order of shape
:raises RuntimeError: scan shape does not match number of frames
"""
nframes = len(createkwargs['external'])
frame_shape = createkwargs.pop('frame_shape', None)
nframes = len(createkwargs["external"])
frame_shape = createkwargs.pop("frame_shape", None)
if not frame_shape:
raise RuntimeError('The shape of one external frame must be provided')
raise RuntimeError("The shape of one external frame must be provided")
if shape:
createkwargs['shape'] = shape + frame_shape
if order == 'F':
external = swap_flattening_order(createkwargs['external'], shape, 'C')
createkwargs['external'] = external
createkwargs["shape"] = shape + frame_shape
if order == "F":
external = swap_flattening_order(createkwargs["external"], shape, "C")
createkwargs["external"] = external
else:
createkwargs['shape'] = (nframes,) + frame_shape
createkwargs["shape"] = (nframes,) + frame_shape
def add_arguments(file_format, filenames, shape=None, createkwargs=None):
......@@ -186,8 +203,7 @@ def add_arguments(file_format, filenames, shape=None, createkwargs=None):
:param order(str): refers to the scan dimensions, not the image dimensions
:returns dict:
"""
if file_format == 'edf':
return add_edf_arguments(filenames,
createkwargs=createkwargs)
if file_format == "edf":
return add_edf_arguments(filenames, createkwargs=createkwargs)
else:
raise ValueError('Unknown external data format ' + repr(file_format))
raise ValueError("Unknown external data format " + repr(file_format))
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
import errno
import random
import tempfile
import string
def tempname(
size=6, chars=string.ascii_lowercase + string.digits, prefix="", suffix=""
):
"""
Random name with prefix and suffix
"""
# Number of combinations: n^size (default: 62^6)
name = "".join(random.choice(chars) for _ in range(size))
return prefix + name + suffix
def temproot():
"""
OS tmp directory
"""
return tempfile.gettempdir()
def tempdir(root=None, **kwargs):
"""
Random directory in OS tmp directory
"""
if not root:
root = temproot()
return os.path.join(root, tempname(**kwargs))
def mkdir(path):
......
This diff is collapsed.
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Register metadata generators for a configurable writer
"""
import enum
from bliss.scanning import scan_meta
from .scan_writers import writer_config_publish
GENERATORS = {"writer_config": writer_config_publish}
def register_all_metadata_generators(force=False):
"""
Register all metadata generators in a bliss session for
the scan writers (currently only one).
:param bool force: re-initialize when already done
"""
kwargs = {k: True for k in GENERATORS}
register_metadata_generators(force=force, **kwargs)
def register_metadata_generators(force=False, **kwargs):
"""
Register metadata generators in a bliss session for
the scan writers (currently only one).
:param bool force: re-initialize when already done
:param **kwargs: any key of `GENERATORS`
"""
# Add custom categories
generators = scan_meta.get_user_scan_meta()
categories = {m.name for m in scan_meta.CATEGORIES}
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
categories |= set(mod.CATEGORIES)
try:
if force:
raise AttributeError
for attr in categories:
getattr(generators, attr.lower())
except AttributeError:
scan_meta.CATEGORIES = enum.Enum(
scan_meta.CATEGORIES.__name__, list(categories)
)
generators.clear()
scan_meta.USER_SCAN_META = None
generators = scan_meta.get_user_scan_meta()
# Generators are called at the start of the scan:
# bliss.scanning.scan.Scan.__init__
# and at the end of the scan
# run bliss.scanning.scan.Scan.run (cleanup section)
#
# The generator 'instrument.positioners' is an exception.
# It is only called at the beginning of the scan by
# removing it before calling the generators a second time.
for k, mod in GENERATORS.items():
if kwargs.get(k, False):
mod.register_metadata_generators(generators)
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import gevent
import os
import errno
import logging
from bliss.data.node import get_session_node
from .utils import async_utils
from .utils import logging_utils
from .scan_writers import writer_base
from .scan_writers import writer_config
logger = logging.getLogger(__name__)
cli_saveoptions = {'noconfig': ({'action': 'store_true',
'help': 'Do not use extra writer information from Redis'},
'noconfig')}
def all_cli_saveoptions(noconfig=False):
if noconfig:
ret = dict(writer_base.cli_saveoptions)
else:
ret = dict(writer_config.cli_saveoptions)
ret.update(cli_saveoptions)
return ret
def default_saveoptions(noconfig=False):
if noconfig:
return writer_base.default_saveoptions
else:
return writer_config.default_saveoptions
def close_pipe(file_descriptor):
try:
os.close(file_descriptor)
except OSError as e:
if e.errno == errno.EBADF:
pass
else:
raise e
def session_writer(session_name, noconfig=False, **saveoptions):
"""
Listen to session scan events and spawn a writer for each new scan.
:param str session_name:
:param str noconfig: generic or configurable writer
:param **saveoptions:
"""
if noconfig:
writerclass = writer_base.NexusScanWriterBase
else:
writerclass = writer_config.NexusScanWriterConfigurable
session_node = get_session_node(session_name) # bliss.data.node.DataNode
writers = {}
default = None, None, None
locks = async_utils.SharedLockPool()
sessionlogger = logging_utils.CustomLogger(logger, 'Session ' + repr(session_name))
try:
sessionlogger.info('Start listening to scans ...')
for event_type, node in session_node.iterator.walk_on_new_events(filter='scan'):
if event_type.name == "NEW_NODE":
# Scan starts: launch separate writer thread
fd_read, fd_write = os.pipe()
writer = writerclass(node, locks, fd_read,
parentlogger=sessionlogger,
**saveoptions)
writer.start()
writers[node.db_name] = writer, fd_read, fd_write
elif event_type.name == "END_SCAN":
# Scan ends: trigger EXTERNAL_EVENT on scan node
writer, fd_read, fd_write = writers.get(node.db_name, default)
if fd_write is not None:
sessionlogger.info('END_SCAN received for scan {}'.format(node.name))
os.write(fd_write, b"END_SCAN received")
# Purge dead writers
for node_db_name in list(writers.keys()):
writer, fd_read, fd_write = writers.get(node_db_name, default)
if writer is not None:
if not writer:
writers.pop(node_db_name, None)
close_pipe(fd_write)
close_pipe(fd_read)
# Show the active writers
if writers:
sessionlogger.info('Running writers: {}'
.format([repr(writer) for writer, _, _ in writers.values()]))
except gevent.GreenletExit:
sessionlogger.info('Stop listening to scans ...')
if writers:
greenlets = []
pipes = []
for writer, fd_read, fd_write in writers.values():
pipes.append(fd_read)
pipes.append(fd_write)
if writer:
writer.kill()
greenlets.append(writer)
if greenlets:
sessionlogger.info('Stop writers {} ...'.format(greenlets))
else:
sessionlogger.info('No running writers to kill.')
gevent.joinall(greenlets)
for file_descriptor in pipes:
close_pipe(file_descriptor)
else:
sessionlogger.info('No running writers to kill.')
sessionlogger.info('Listener exits.')
def start_session_writer(session_name, **saveoptions):
"""
This starts the main session writer in a Greenlet.
:param str session_name: does not need to exist yet
:param **saveoptions: see `session_writer`
:returns Greenlet:
"""
greenlet = gevent.spawn(session_writer, session_name, **saveoptions)
async_utils.kill_on_exit(greenlet)
return greenlet
"""
Nexus writer service for Bliss
"""
def main():
"""
Parse CLI arguments, start a session writer and block.
"""
# Define CLI
import argparse
parser = argparse.ArgumentParser(description='Attach writer to Bliss session')
parser.add_argument('session_name', type=str,
help='Session name')
_cli_saveoptions = all_cli_saveoptions()
for attr, (okwargs, option) in _cli_saveoptions.items():
parser.add_argument('--' + attr, **okwargs)
# Parse CLI arguments
args, unknown = parser.parse_known_args()
kwargs = {}
_cli_saveoptions = all_cli_saveoptions(noconfig=args.noconfig)
for attr, (_, option) in _cli_saveoptions.items():
try:
kwargs[option] = getattr(args, attr)
except AttributeError:
continue
# Launch the session writer
logid = 'Session ' + repr(args.session_name)
sessionlogger = logging_utils.CustomLogger(logger, logid)
greenlet = main(args.session_name, **kwargs)
greenlet.join()
sessionlogger.info('Nexus writer exits')
raise NotImplementedError("Tango server not implemented yet")
if __name__ == "__main__":
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
# This file is part of the nexus writer service of the BLISS project.
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Nexus writers associated to a single scan
......@@ -12,7 +16,7 @@
writer_base
writer_config
writer_config_publish
devices
nx_dataset
scan_utils
"""
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
def scan_name(info, subscan=1):
"""
:param bliss.scanning.scan.Scan or dict scan_info:
:returns str: