Commit 19534422 authored by Wout De Nolf's avatar Wout De Nolf Committed by Linus Pithan

NexusWriter: copy basics from id21blissutils

parent becc53e2
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""External Nexus writer
.. autosummary::
:toctree:
nexus_writer_service
writers
io
utils
"""
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from .nexus_writer_service import main
main()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Nexus writer I/O utilities
.. autosummary::
:toctree:
nexus
io_utils
h5_external
"""
\ No newline at end of file
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Utilities for raw external data wrapping in HDF5
"""
import os
import numpy
import fabio
from fabio.edfimage import EdfImage
from .io_utils import mkdir
def swap_flattening_order(lst, shape, order):
"""
Swap order of flattened list
:param list lst: flattened shape
:param tuple shape: original shape of `lst`
:param str order: flattening order of `lst`
:returns list:
"""
if len(shape) <= 1:
return lst
if order == 'C':
ofrom, oto = 'C', 'F'
elif order == 'F':
ofrom, oto = 'F', 'C'
else:
raise ValueError("Order must be 'C' or 'F'")
idx = numpy.arange(len(lst))
idx = idx.reshape(shape, order=oto)
idx = idx.flatten(order=ofrom)
return [lst[i] for i in idx]
def add_edf_arguments(filenames, createkwargs=None):
"""
Arguments for `h5py.create_dataset` to link to EDF data frames.
:param list(str or tuple) filenames: file names (str) and optionally image indices (tuple)
:param dict: result of previous call to append to
:returns dict:
:raises RuntimeError: not supported by external datasets
"""
if not isinstance(createkwargs, dict):
createkwargs = {}
stack = isinstance(filenames, list)
if stack:
if not filenames:
return createkwargs
else:
filenames = [filenames]
shape0 = createkwargs.get('frame_shape', tuple())
for filename in filenames:
if isinstance(filename, (tuple, list)):
filename, indices = filename
if not isinstance(indices, (tuple, list)):
indices = [indices]
else:
indices = None
if '.edf.' in os.path.basename(filename):
raise RuntimeError('{}: external datasets with compression not supported'
.format(repr(filename)))
if indices:
img = fabio.open(filename)
# EdfImage.getframe returns an EdfImage, not a EdfFrame
def getframe(img):
return img._frames[img.currentframe]
it = (getframe(img.getframe(i)) for i in indices)
else:
it = EdfImage.lazy_iterator(filename)
for frame in it:
if frame.swap_needed():
raise RuntimeError('{} (frame {}): external datasets do not support byte-swap'
.format(repr(filename), frame.iFrame))
compressioni = frame._data_compression
if compressioni:
compressioni = compressioni.lower()
if compressioni == "none":
compressioni = None
if compressioni is not None:
raise RuntimeError('{} (frame {}): external datasets with compression not supported'
.format(repr(filename), frame.iFrame))
shapei = frame.shape
dtypei = frame.dtype
start = frame.start
size = frame.size # TODO: need compressed size
external = filename, start, size
def assertEqual(key, value, evalue):
if value != evalue:
raise RuntimeError('{} (frame {}): {} = {} instead of {}'
.format(repr(filename), frame.iFrame,
repr(key), value, evalue))
if shape0:
assertEqual('shape', shapei, shape0)
else:
createkwargs['frame_shape'] = shape0 = shapei
if 'dtype' in createkwargs:
assertEqual('dtype', dtypei, createkwargs['dtype'])
else:
createkwargs['dtype'] = dtypei
if 'compression' in createkwargs:
assertEqual('compression', compressioni, createkwargs['compression'])
else:
createkwargs['compression'] = compressioni
if 'external' in createkwargs:
createkwargs['external'].append(external)
else:
createkwargs['external'] = [external]
return createkwargs
def resize(createkwargs, enframes, filename, fillvalue):
"""
Add/remove external files (before finalization).
:param dict createkwargs:
:param enframes shape: number of frames
:param str filename: in case not enough external files
:param num fillvalue: in case not enough external files
:returns int: number of frames skipped
"""
frame_shape = createkwargs.get('frame_shape', None)
if not frame_shape:
raise RuntimeError('The shape of one external frame must be provided')
nframes = len(createkwargs['external'])
if nframes > enframes:
createkwargs['external'] = createkwargs['external'][:enframes]
elif nframes < enframes:
if nframes:
ext = os.path.splitext(createkwargs['external'][0])[-1]
else:
ext = '.edf'
if os.path.splitext(filename)[-1] != ext:
filename += ext
if ext == '.edf':
mkdir(filename)
EdfImage(data=numpy.full(fillvalue, frame_shape)).write(filename)
else:
raise RuntimeError('Dummy file with extension {} not supported'
.format(repr(ext)))
createkwargs['external'] += [filename] * (enframes-nframes)
return nframes - enframes
def finalize(createkwargs, order='C', shape=None):
"""
Finalize external dataset arguments: define shape
:param dict createkwargs:
:param tuple shape: scan shape (default: (nframes,))
:param str order: fill order of shape
:raises RuntimeError: scan shape does not match number of frames
"""
nframes = len(createkwargs['external'])
frame_shape = createkwargs.pop('frame_shape', None)
if not frame_shape:
raise RuntimeError('The shape of one external frame must be provided')
if shape:
createkwargs['shape'] = shape + frame_shape
if order == 'F':
external = swap_flattening_order(createkwargs['external'], shape, 'C')
createkwargs['external'] = external
else:
createkwargs['shape'] = (nframes,) + frame_shape
def add_arguments(file_format, filenames, shape=None, createkwargs=None):
"""
Arguments for `h5py.create_dataset` to link to data frames.
The resulting shape will be `shape + frame_shape`
:param str file_format:
:param list(str or tuple) filenames: file names (str) and optionally image indices (tuple)
:param dict: result of previous call to append to
:param order(str): refers to the scan dimensions, not the image dimensions
:returns dict:
"""
if file_format == 'edf':
return add_edf_arguments(filenames,
createkwargs=createkwargs)
else:
raise ValueError('Unknown external data format ' + repr(file_format))
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
import errno
def mkdir(path):
"""
Create directory recursively and silent when already existing
:param str path:
"""
try:
if path:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
This diff is collapsed.
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import gevent
import os
import errno
import logging
from bliss.data.node import get_session_node
from .utils import async_utils
from .utils import logging_utils
from .scan_writers import writer_base
from .scan_writers import writer_config
logger = logging.getLogger(__name__)
cli_saveoptions = {'noconfig': ({'action': 'store_true',
'help': 'Do not use extra writer information from Redis'},
'noconfig')}
def all_cli_saveoptions(noconfig=False):
if noconfig:
ret = dict(writer_base.cli_saveoptions)
else:
ret = dict(writer_config.cli_saveoptions)
ret.update(cli_saveoptions)
return ret
def default_saveoptions(noconfig=False):
if noconfig:
return writer_base.default_saveoptions
else:
return writer_config.default_saveoptions
def close_pipe(file_descriptor):
try:
os.close(file_descriptor)
except OSError as e:
if e.errno == errno.EBADF:
pass
else:
raise e
def session_writer(session_name, noconfig=False, **saveoptions):
"""
Listen to session scan events and spawn a writer for each new scan.
:param str session_name:
:param str noconfig: generic or configurable writer
:param **saveoptions:
"""
if noconfig:
writerclass = writer_base.NexusScanWriterBase
else:
writerclass = writer_config.NexusScanWriterConfigurable
session_node = get_session_node(session_name) # bliss.data.node.DataNode
writers = {}
default = None, None, None
locks = async_utils.SharedLockPool()
sessionlogger = logging_utils.CustomLogger(logger, 'Session ' + repr(session_name))
try:
sessionlogger.info('Start listening to scans ...')
for event_type, node in session_node.iterator.walk_on_new_events(filter='scan'):
if event_type.name == "NEW_NODE":
# Scan starts: launch separate writer thread
fd_read, fd_write = os.pipe()
writer = writerclass(node, locks, fd_read,
parentlogger=sessionlogger,
**saveoptions)
writer.start()
writers[node.db_name] = writer, fd_read, fd_write
elif event_type.name == "END_SCAN":
# Scan ends: trigger EXTERNAL_EVENT on scan node
writer, fd_read, fd_write = writers.get(node.db_name, default)
if fd_write is not None:
sessionlogger.info('END_SCAN received for scan {}'.format(node.name))
os.write(fd_write, b"END_SCAN received")
# Purge dead writers
for node_db_name in list(writers.keys()):
writer, fd_read, fd_write = writers.get(node_db_name, default)
if writer is not None:
if not writer:
writers.pop(node_db_name, None)
close_pipe(fd_write)
close_pipe(fd_read)
# Show the active writers
if writers:
sessionlogger.info('Running writers: {}'
.format([repr(writer) for writer, _, _ in writers.values()]))
except gevent.GreenletExit:
sessionlogger.info('Stop listening to scans ...')
if writers:
greenlets = []
pipes = []
for writer, fd_read, fd_write in writers.values():
pipes.append(fd_read)
pipes.append(fd_write)
if writer:
writer.kill()
greenlets.append(writer)
if greenlets:
sessionlogger.info('Stop writers {} ...'.format(greenlets))
else:
sessionlogger.info('No running writers to kill.')
gevent.joinall(greenlets)
for file_descriptor in pipes:
close_pipe(file_descriptor)
else:
sessionlogger.info('No running writers to kill.')
sessionlogger.info('Listener exits.')
def start_session_writer(session_name, **saveoptions):
"""
This starts the main session writer in a Greenlet.
:param str session_name: does not need to exist yet
:param **saveoptions: see `session_writer`
:returns Greenlet:
"""
greenlet = gevent.spawn(session_writer, session_name, **saveoptions)
async_utils.kill_on_exit(greenlet)
return greenlet
def main():
"""
Parse CLI arguments, start a session writer and block.
"""
# Define CLI
import argparse
parser = argparse.ArgumentParser(description='Attach writer to Bliss session')
parser.add_argument('session_name', type=str,
help='Session name')
_cli_saveoptions = all_cli_saveoptions()
for attr, (okwargs, option) in _cli_saveoptions.items():
parser.add_argument('--' + attr, **okwargs)
# Parse CLI arguments
args, unknown = parser.parse_known_args()
kwargs = {}
_cli_saveoptions = all_cli_saveoptions(noconfig=args.noconfig)
for attr, (_, option) in _cli_saveoptions.items():
try:
kwargs[option] = getattr(args, attr)
except AttributeError:
continue
# Launch the session writer
logid = 'Session ' + repr(args.session_name)
sessionlogger = logging_utils.CustomLogger(logger, logid)
greenlet = main(args.session_name, **kwargs)
greenlet.join()
sessionlogger.info('Nexus writer exits')
if __name__ == "__main__":
main()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Nexus writers associated to a single scan
.. autosummary::
:toctree:
writer_base
writer_config
devices
nx_dataset
scan_utils
"""
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Compile device information before and after Redis publication
"""
mcanamemap = {'spectrum': 'data',
'icr': 'input_rate',
'ocr': 'output_rate',
'triggers': 'input_counts',
'events': 'output_counts',
'deadtime': 'dead_time',
'livetime': 'live_time',
'realtime': 'elapsed_time'}
mcatypemap = {'spectrum': 'principal',
'icr': 'icr',
'ocr': 'ocr',
'triggers': 'triggers',
'events': 'events',
'deadtime': 'deadtime',
'livetime': 'livetime',
'realtime': 'realtime'}
mcaunitmap = {'icr': 'hertz',
'ocr': 'hertz',
'livetime': 's',
'realtime': 's'}
timernamemap = {'elapsed_time': 'value',
'epoch': 'epoch'}
timertypemap = {'elapsed_time': 'principal',
'epoch': 'epoch'}
limanamemap = {'image': 'data'}
limatypemap = {'image': 'principal'}
counternamemap = {}
countertypemap = {}
def shortnamemap(names, separator=':'):
"""
Map full Redis names to short (but still unique) names
:param lst(str) names:
:param str separator:
:returns dict:
"""
parts = [name.split(separator) for name in names]
nparts = max(map(len, parts))
parts = [([''] * (nparts - len(lst))) + lst for lst in parts]
ret = {}
for i in reversed(range(-nparts, 0)):
joinednames = [separator.join(s for s in lst[i:] if s)
for lst in parts]
newnames = joinednames + list(ret.values())
selection = [(idx, (separator.join(s for s in lst if s), name))
for idx, (name, lst) in enumerate(zip(joinednames, parts))
if newnames.count(name) == 1]
if selection:
idx, tuples = list(zip(*selection))
ret.update(tuples)
parts = [lst for j, lst in enumerate(parts) if j not in idx]
return ret
def fill_device(fullname, device):
"""
Add missing keys with default values
:param str fulname:
:param dict device:
"""
device['device_type'] = device.get('device_type', '') # type for the writer (not saved)
# e.g. positioner, mca
device['device_name'] = device.get('device_name', fullname) # HDF5 group name
# measurement or positioners when missing
device['device_info'] = device.get('device_info', {}) # HDF5 group datasets
device['data_type'] = device.get('data_type', 'principal') # principal value of this HDF5 group
device['data_name'] = device.get('data_name', 'data') # HDF5 dataset name
device['data_info'] = device.get('data_info', {}) # HDF5 dataset attributes
device['unique_name'] = device.get('unique_name', fullname) # Unique name for HDF5 links
device['master_index'] = -1 # 0> axis order used for plotting
def update_device(devices, fullname, units=None):
"""
Add missing device and/or keys
:param dict devices:
:param str fullname:
:param dict units:
"""
devices[fullname] = device = devices.get(fullname, {})
fill_device(fullname, device)
if units:
unit = units.get(fullname, None)
if unit:
device['data_info']['units'] = unit
return device
def parse_devices(devices, multivalue_positioners=False):
"""
Determine names and types based on device name and type
:param dict devices:
:param bool multivalue_positioners:
"""
namemap = shortnamemap(list(devices.keys()))
for fullname, device in devices.items():
device['device_name'] = namemap[fullname]
if device['device_type'] == 'mca':
# 'xmap1:xxxxxx_det1'
# device_name = 'xmap1:det1'
# data_type = mcatypemap('xxxxxx')
# data_name = mcanamemap('xxxxxx')
parts = fullname.split(':')
lastparts = parts[-1].split('_')
mcachannel = '_'.join(lastparts[1:])
if not mcachannel:
mcachannel = 'sum'
parts = parts[:-1] + [mcachannel]
datatype = lastparts[0] # xxxxxx
device['device_name'] = ':'.join(parts)
device['data_type'] = mcatypemap.get(datatype, datatype)
device['data_name'] = mcanamemap.get(datatype, datatype)
device['data_info']['units'] = mcaunitmap.get(datatype, None)
elif device['device_type'] == 'lima':
# 'frelon1:image'
# 'frelon1:roi_counters:roi1_min'
# 'frelon1:xxxx:fwhm_x'
parts = fullname.split(':')
datatype = parts[1] # image, roi_counters or xxxx
if parts[1] == 'roi_counters':
datatypedefault = ':'.join(parts[2:])
else:
datatypedefault = ':'.join(parts[1:])
device['device_name'] = parts[0]
device['data_type'] = limatypemap.get(datatype, datatypedefault)
device['data_name'] = limanamemap.get(datatype, datatypedefault)
elif device['device_type'] == 'samplingcounter':
if device['data_type'] == 'signal':
device['data_name'] = 'data'
device['data_type'] = 'principal'
else:
# 'simdiodeSAMPLES_xxxxx'
# device_name = 'simdiodeSAMPLES'
# data_type = countertypemap('xxxxxx')
# data_name = counternamemap('xxxxxx')
parts = device['device_name'].split('_')
datatype = parts[-1] # xxxxxx
parts = ['_'.join(parts[:-1])]
device['device_name'] = '_'.join(parts)
device['data_type'] = countertypemap.get(datatype, datatype)
device['data_name'] = counternamemap.get(datatype, datatype)
elif device['device_type'] == 'positionergroup':
# 'timer1:xxxxxx' -> 'xxxxxx'