Commit 363240a2 authored by Wout De Nolf's avatar Wout De Nolf

[writer] small refactoring

parent 546b0cdc
"""
Define generators for scan data publishing in Redis
"""
from bliss.scanning import scan_meta
from bliss import global_map
import inspect
import enum
import logging
import os
from bliss import global_map
from bliss.scanning import scan_meta
from bliss.common.measurement import SamplingMode
from ..utils import config
from ..writer import devices
logger = logging.getLogger(__name__)
......@@ -104,7 +105,7 @@ def fill_device_info(scan):
:param bliss.scanning.scan.Scan scan:
"""
logger.debug('fill device info')
return {'devices': devices.redis_info(scan)}
return {'devices': device_redis_info(scan)}
def fill_filenames(scan):
......@@ -114,3 +115,97 @@ def fill_filenames(scan):
logger.debug('fill filename info')
filenames = [name for name in config.filenames() if name]
return {'filenames': filenames}
def device_redis_info(scan):
"""
Publish information on devices (defines types and groups counters).
:param bliss.scanning.scan.Scan scan:
:returns dict:
"""
devices = {}
# This is not all of them
for ctr in global_map.get_counters_iter():
fullname = ctr.fullname.replace('.', ':') # Redis name
# Derived from: bliss.common.measurement.BaseCounter
# bliss.common.measurement.Counter
# bliss.common.measurement.SamplingCounter
# bliss.common.temperature.TempControllerCounter
# bliss.controllers.simulation_diode.SimulationDiodeSamplingCounter
# bliss.scanning.acquisition.mca.BaseMcaCounter
# bliss.scanning.acquisition.mca.SpectrumMcaCounter
# bliss.scanning.acquisition.mca.StatisticsMcaCounter
ctr_classes = [c.__name__ for c in inspect.getmro(ctr.__class__)]
#print(ctr.fullname, type(ctr), type(ctr.controller), ctr_classes)
#controller_classes = [c.__name__ for c in inspect.getmro(ctr.controller.__class__)]
if 'SpectrumMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.controller.detector_brand +\
'/' + ctr.controller.detector_type}
device = {'device_info': device_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'StatisticsMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.controller.detector_brand +\
'/' + ctr.controller.detector_type}
device = {'device_info': device_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'RoiMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.mca.detector_brand +\
'/' + ctr.mca.detector_type}
roi = ctr.mca.rois.resolve(ctr.roi_name)
data_info = {'roi_start': roi[0],
'roi_end': roi[1]}
device = {'device_info': device_info,
'data_info': data_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'LimaBpmCounter' in ctr_classes:
device_info = {'type': 'lima'}
device = {'device_info': device_info,
'device_type': 'lima'}
devices[fullname] = device
elif 'LimaImageCounter' in ctr_classes:
device_info = {'type': 'lima'}
device = {'device_info': device_info,
'device_type': 'lima'}
devices[fullname] = device
elif 'RoiStatCounter' in ctr_classes:
device_info = {'type': 'lima'}
roi = ctr.parent_roi_counters.get(ctr.roi_name)
data_info = {'roi_'+k: v for k, v in roi.to_dict().items()}
device = {'device_info': device_info,
'device_type': 'lima',
'data_info': data_info}
devices[fullname] = device
elif 'TempControllerCounter' in ctr_classes:
device_info = {'type': 'temperature',
'description': 'temperature'}
device = {'device_info': device_info,
'device_type': 'temperature'}
devices[fullname] = device
elif 'SamplingCounter' in ctr_classes:
device_info = {'type': 'samplingcounter',
'mode': str(ctr.mode).split('.')[-1]}
device = {'device_info': device_info,
'device_type': 'samplingcounter',
'data_type': 'signal'}
devices[fullname] = device
if ctr.mode == SamplingMode.SAMPLES:
device = {'device_info': device_info,
'device_type': 'samplingcounter'}
devices[fullname+'_samples'] = device
elif ctr.mode == SamplingMode.STATS:
for stat in 'N', 'std', 'var', 'min', 'max', 'p2v':
device = {'device_info': device_info,
'device_type': 'samplingcounter'}
devices[fullname+'_'+stat] = device
else:
logger.info('Counter {} {} published as generic detector'
.format(fullname, ctr_classes))
devices[fullname] = {}
return devices
......@@ -141,13 +141,14 @@ class test_scan_writer(BlissTestCase):
-5, msg=self._assertmsg(entry))
self._backup(entry)
@unittest.skip('do not run')
def test_infinite(self):
def _test_infinite(self):
tests = (self.test_ct, self.test_loopscan,
self.test_ascan, self.test_amesh)
self.test_timescan, self.test_ascan,
self.test_amesh)
while True:
for test in tests:
test()
break
@unittest.skip('data size of subscans are not always correct')
def test_subscan(self):
......@@ -339,9 +340,9 @@ class test_scan_writer(BlissTestCase):
return
from shutil import copyfile
from datetime import datetime
cmd = entry['title'][()].split(' ')[0]
dst = '/data/id21/inhouse/wout/bliss/nexus/example_{}_{}.h5'\
.format(cmd, datetime.now().strftime('%d%b%Y'))
#cmd = entry['title'][()].split(' ')[0]
dst = '/data/id21/inhouse/wout/bliss/nexus/example_{}.h5'\
.format(datetime.now().strftime('%d%b%Y'))
copyfile(entry.file.filename, dst)
......@@ -378,7 +379,6 @@ def test_suite():
testSuite.addTest(utclass("test_timescan"))
testSuite.addTest(utclass("test_ascan"))
testSuite.addTest(utclass("test_amesh"))
testSuite.addTest(utclass("test_infinite"))
testSuite.addTest(utclass("test_subscan"))
testSuite.addTest(utclass("test_async_ct"))
testSuite.addTest(utclass("test_async_timescan"))
......
......@@ -306,6 +306,7 @@ class DatasetProxy():
else:
self.logger.debug('merged external HDF5 datasets (copy because VDS not supported)')
elif self._external_raw:
# REMARK: cannot be reshaped!!!
value.pop('chunks')
value.pop('maxshape')
value.update(self._external_raw)
......
"""
Device information before and after Redis publication
"""
import inspect
import logging
from bliss import global_map
from bliss.common.measurement import SamplingMode
logger = logging.getLogger(__name__)
mcanamemap = {'spectrum': 'data',
'icr': 'input_rate',
......@@ -206,9 +199,9 @@ def parse_devices(devices, multivalue_positioners=False):
def device_info(devices, scan_info, multivalue_positioners=False):
"""
Merge device information from `redis_info` and from scan_info
Merge device information from `device_redis_info` and from scan_info
:param dict devices: as provided by `redis_info`
:param dict devices: as provided by `device_redis_info`
:param dict scan_info:
:param bool multivalue_positioners:
:returns dict: subscanname:dict(fullname:dict)
......@@ -251,97 +244,3 @@ def device_info(devices, scan_info, multivalue_positioners=False):
with open('/data/id21/inhouse/wout/bliss/nexus/scan_info.json', 'w') as f:
json.dump(scan_info, f, indent=2)
return ret
def redis_info(scan):
"""
For publication
:param bliss.scanning.scan.Scan scan:
:returns dict:
"""
devices = {}
# This is not all of them
for ctr in global_map.get_counters_iter():
fullname = ctr.fullname.replace('.', ':') # Redis name
# Derived from: bliss.common.measurement.BaseCounter
# bliss.common.measurement.Counter
# bliss.common.measurement.SamplingCounter
# bliss.common.temperature.TempControllerCounter
# bliss.controllers.simulation_diode.SimulationDiodeSamplingCounter
# bliss.scanning.acquisition.mca.BaseMcaCounter
# bliss.scanning.acquisition.mca.SpectrumMcaCounter
# bliss.scanning.acquisition.mca.StatisticsMcaCounter
ctr_classes = [c.__name__ for c in inspect.getmro(ctr.__class__)]
#print(ctr.fullname, type(ctr), type(ctr.controller), ctr_classes)
#controller_classes = [c.__name__ for c in inspect.getmro(ctr.controller.__class__)]
if 'SpectrumMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.controller.detector_brand +\
'/' + ctr.controller.detector_type}
device = {'device_info': device_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'StatisticsMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.controller.detector_brand +\
'/' + ctr.controller.detector_type}
device = {'device_info': device_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'RoiMcaCounter' in ctr_classes:
device_info = {'type': 'mca',
'description': ctr.mca.detector_brand +\
'/' + ctr.mca.detector_type}
roi = ctr.mca.rois.resolve(ctr.roi_name)
data_info = {'roi_start': roi[0],
'roi_end': roi[1]}
device = {'device_info': device_info,
'data_info': data_info,
'device_type': 'mca'}
devices[fullname] = device
elif 'LimaBpmCounter' in ctr_classes:
device_info = {'type': 'lima'}
device = {'device_info': device_info,
'device_type': 'lima'}
devices[fullname] = device
elif 'LimaImageCounter' in ctr_classes:
device_info = {'type': 'lima'}
device = {'device_info': device_info,
'device_type': 'lima'}
devices[fullname] = device
elif 'RoiStatCounter' in ctr_classes:
device_info = {'type': 'lima'}
roi = ctr.parent_roi_counters.get(ctr.roi_name)
data_info = {'roi_'+k: v for k, v in roi.to_dict().items()}
device = {'device_info': device_info,
'device_type': 'lima',
'data_info': data_info}
devices[fullname] = device
elif 'TempControllerCounter' in ctr_classes:
device_info = {'type': 'temperature',
'description': 'temperature'}
device = {'device_info': device_info,
'device_type': 'temperature'}
devices[fullname] = device
elif 'SamplingCounter' in ctr_classes:
device_info = {'type': 'samplingcounter',
'mode': str(ctr.mode).split('.')[-1]}
device = {'device_info': device_info,
'device_type': 'samplingcounter',
'data_type': 'signal'}
devices[fullname] = device
if ctr.mode == SamplingMode.SAMPLES:
device = {'device_info': device_info,
'device_type': 'samplingcounter'}
devices[fullname+'_samples'] = device
elif ctr.mode == SamplingMode.STATS:
for stat in 'N', 'std', 'var', 'min', 'max', 'p2v':
device = {'device_info': device_info,
'device_type': 'samplingcounter'}
devices[fullname+'_'+stat] = device
else:
logger.info('Counter {} {} published as generic detector'
.format(fullname, ctr_classes))
devices[fullname] = {}
return devices
......@@ -7,14 +7,13 @@ import numpy
import traceback
import logging
import datetime
import re
from contextlib import contextmanager
from collections import defaultdict
from . import devices
from . import utils
from ..io import nexus
from ..utils import scanutils
from ..writer.dataset_proxy import DatasetProxy
from .dataset_proxy import DatasetProxy
logger = logging.getLogger(__name__)
......@@ -453,25 +452,18 @@ class NexusScanWriterBase(gevent.Greenlet):
if not plotselected and firstplot is not None:
nexus.markDefault(plot)
def _select_plot_signals(self, subscan, plotname, items=None,
ndim=-1, grid=False):
def _select_plot_signals(self, subscan, plotname, ndim=-1, grid=False):
"""
Select plot signals as specified in the static beamline configuration.
Select plot signals based on detector dimensions.
:param str subscan:
:param str plotname:
:param list items: signal names for plotting
:param int ndim: signal dimensions in case no items
:param int ndim: detector dimensions
:param bool grid: preserve scan shape
:returns dict: (str, tuple): (str, [(name, value, attrs)])
"""
signaldict = {}
if items:
for configname in items:
for fullname, dproxy in self.detector_iter(subscan):
if self._configname_refers_to_fullname(configname, fullname):
self._add_signal(plotname, grid, dproxy, signaldict)
elif ndim >= 0:
if ndim >= 0:
for fullname, dproxy in self.detector_iter(subscan):
if dproxy.detector_ndim == ndim:
self._add_signal(plotname, grid, dproxy, signaldict)
......@@ -516,8 +508,7 @@ class NexusScanWriterBase(gevent.Greenlet):
scan_ndim = len(scan_shape)
detector_ndim = len(detector_shape)
value = {'data': nexus.getUri(dset),
'shape': shape,
'order': self.order(scan_ndim)}
'shape': shape}
interpretation = nexus.nxDatasetInterpretation(scan_ndim, detector_ndim, scan_ndim)
attrs['interpretation'] = interpretation
# Add arguments to signaldict
......@@ -597,8 +588,7 @@ class NexusScanWriterBase(gevent.Greenlet):
else: # Scatter plot
if value.ndim > 1:
value = {'data': nexus.getUri(value),
'shape': scan_shape,
'order': order}
'shape': scan_shape}
lst.append((name, value, attrs))
axes = lst
else:
......@@ -715,14 +705,16 @@ class NexusScanWriterBase(gevent.Greenlet):
except gevent.GreenletExit:
self.logger.info('Writer stop requested')
except Exception:
self.logger.error('Stop writer due to exception:\n{}'.format(traceback.format_exc()))
self.logger.error('Stop writer due to exception:\n{}'
.format(traceback.format_exc()))
finally:
if self.save:
nbytes = 0
try:
nbytes = self._finalize()
except Exception:
self.logger.error(traceback.format_exc())
self.logger.error('Not properly finalized due to exception:\n{}'
.format(traceback.format_exc()))
finally:
dtime = datetime.datetime.now() - self.starttime
self.logger.info('Finished writing {} in {}'
......@@ -965,20 +957,19 @@ class NexusScanWriterBase(gevent.Greenlet):
elif ndim == 1:
return (self.scan_size(subscan),)
else:
# Fast axis first
s = [self.subscan_info_get(subscan, 'npoints{}'.format(i))
for i in range(1, ndim+1)]
if self.order(ndim) == 'C':
# Fast axis last
s = s[::-1]
return tuple(s)
def order(self, ndim):
"""
Data is published as a flat list in the order of
nested scan loops (inner, outer1, outer2, ...).
nested scan loops (fast, slow1, slow2, ...).
We need the order in which the final shape is filled.
F: row, column, depth, ... (preserve order)
C: ..., depth, column, row (reverse order)
"""
return self._order[ndim]
......@@ -1326,17 +1317,3 @@ class NexusScanWriterBase(gevent.Greenlet):
else:
attrs = {}
nexus.nxCreateDataSet(nxpositioners, mot, pos, attrs)
def _configname_refers_to_fullname(self, configname, fullname):
"""
Checks whether a Redis node's full name is "equal to" a name from
scan's 'scan_info' dictionary.
:param str configname: from the 'scan_info' dictionary
:param str fullname: node.fullname
"""
seps = r'[\.:]'
configparts = re.split(seps, configname)
fullparts = re.split(seps, fullname)
return all(pfull.endswith(pconfig) for pfull, pconfig
in zip(fullparts[::-1], configparts[::-1]))
......@@ -3,6 +3,7 @@ Configurable Nexus writer listening to Redis events
"""
import os
import re
import h5py
import datetime
from contextlib import contextmanager
......@@ -43,7 +44,7 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
@property
def _default_filename(self):
return ''
@property
def config_writer(self):
"""
......@@ -97,6 +98,29 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
"""
return self.config_technique.get('plotselect', '')
def _select_plot_signals(self, subscan, plotname, items=None,
ndim=-1, grid=False):
"""
Select plot signals as specified in the static beamline configuration.
:param str subscan:
:param str plotname:
:param list items: signal names for plotting
:param int ndim: detector dimensions in case no items
:param bool grid: preserve scan shape
:returns dict: (str, tuple): (str, [(name, value, attrs)])
"""
if items:
signaldict = {}
for configname in items:
for fullname, dproxy in self.detector_iter(subscan):
if self._matching_fullname(configname, fullname):
self._add_signal(plotname, grid, dproxy, signaldict)
else:
signaldict = super(NexusScanWriterConfigurable, self)\
._select_plot_signals(subscan, plotname, ndim=ndim, grid=grid)
return signaldict
@property
def positioners_end(self):
positioners = self.config_writer.get('positioners', {})
......@@ -185,7 +209,7 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
if ndim == 0:
counternames = [self._get_application(xrfapplidef, 'I0'),
self._get_application(xrfapplidef, 'It')]
if any(self._configname_refers_to_fullname(configname, fullname)
if any(self._matching_fullname(configname, fullname)
for configname in counternames):
return 'sensor'
return ''
......@@ -354,16 +378,18 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
static configuration
:param str applifmt: format for application definition name
:param bool virtual: virtual dataset or copy
:param **kwargs: see `_application_config_to_fullname`
:param **kwargs: see `_iter_fullnames`
"""
if not confignames or appliname in parent:
return
uris = []
for configname in confignames:
for fullname in self._application_config_to_fullname(subscan,
configname,
appliname,
**kwargs):
notfoundmsg = 'Application definition incomplete ({} not found for {})'\
.format(repr(configname), repr(appliname))
for fullname in self._iter_fullnames(subscan,
configname,
notfoundmsg=notfoundmsg,
**kwargs):
dproxy = self.datasetproxy(subscan, fullname)
with dproxy.open() as dset:
if not uris:
......@@ -377,7 +403,8 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
if uris:
nexus.nxCreateDataSet(parent, appliname, value, attrs)
def _save_application_links(self, subscan, parent, confignames, applifmt, **kwargs):
def _save_application_links(self, subscan, parent, confignames, applifmt,
**kwargs):
"""
Add dataset links to an application.
......@@ -386,7 +413,7 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
:param list(str) confignames: names specified in the beamline
static configuration
:param str applifmt: format for application definition name
:param **kwargs: see `_application_config_to_fullname`
:param **kwargs: see `_iter_fullnames`
:returns list(str): Redis fullnames
"""
if not confignames:
......@@ -405,40 +432,62 @@ class NexusScanWriterConfigurable(writer_base.NexusScanWriterBase):
:param str configname: name specified in the beamline
static configuration
:param str appliname: name in the Nexus application definition
:param **kwargs: see `_application_config_to_fullname`
:param **kwargs: see `_iter_fullnames`
"""
if not configname or appliname in parent:
return
for fullname in self._application_config_to_fullname(subscan,
configname,
appliname,
**kwargs):
notfoundmsg = 'Application definition incomplete ({} not found for {})'\
.format(repr(configname), repr(appliname))
for fullname in self._iter_fullnames(subscan,
configname,
notfoundmsg=notfoundmsg,
**kwargs):
dproxy = self.datasetproxy(subscan, fullname)
with dproxy.open() as dset:
nexus.nxCreateDataSet(parent, appliname, dset, None)
def _application_config_to_fullname(self, subscan, configname, appliname,
devicetype=None, datatype=None):
def _matching_fullname(self, configname, fullname):
"""
Checks whether a Redis node's full name is referred to
by name from the writer configuration.
Examples:
"iodet" refers to "simulation_diode_controller:iodet"
"xmap1:det0" refers to "xmap1:realtime_det0"
"xmap1:det0" refers to "simxmap1:spectrum_det0"
:param str configname: from the writer configuration
:param str fullname: node.fullname
"""
seps = r'[\.:]'
configparts = re.split(seps, configname)
fullparts = re.split(seps, fullname)
return all(pfull.endswith(pconfig) for pfull, pconfig
in zip(fullparts[::-1], configparts[::-1]))
def _iter_fullnames(self, subscan, configname, devicetype=None,
datatype=None, notfoundmsg=None):
"""
Generates Redis fullnames that match the configname
Yield all Redis node's full names referred to by a name
from the writer configuration.
:param str subscan:
:param str configname: name specified in the beamline
static configuration
:param str devicetype: device type
:param str datatype: data type
:param str notfoundmsg:
:yields str: Redis node fullname
"""
incomplete = True
for fullname, dproxy in self.detector_iter(subscan):
if self._configname_refers_to_fullname(configname, fullname):
if self._matching_fullname(configname, fullname):
if (devicetype == dproxy.type or not devicetype) and\
(datatype == dproxy.data_type or not datatype):
incomplete = False
yield fullname
if incomplete:
self.logger.warning('Application definition incomplete ({} not found for {})'
.format(repr(configname), repr(appliname)))
if incomplete and notfoundmsg:
self.logger.warning(notfoundmsg)
def _create_master_links(self, subscan):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment