Commit 6c3e7929 authored by Linus Pithan's avatar Linus Pithan Committed by Matias Guijarro

- write data references to hdf5 after scan

- writing 'instruments' and 'scan_meta' at the end of the scan to hdf5
parent b66a30f7
......@@ -8,6 +8,9 @@
from .roi import Roi
from .properties import LimaProperty
from bliss.common.measurement import BaseCounter
import numpy
import h5py
import os
class ImageCounter(BaseCounter):
......@@ -57,3 +60,33 @@ class ImageCounter(BaseCounter):
"Lima.image: set roi only accepts roi (class)"
" or (x,y,width,height) values"
)
# handling of reference saving in hdf5
def to_ref_array(self, channel, root_path):
""" used to produce a string version of a lima reference that can be saved in hdf5
"""
# looks like the events are not emitted after saving,
# therefore we will use 'last_image_ready' instead
# of "last_image_saved" for now
# last_image_saved = event_dict["data"]["last_image_saved"]
lima_data_view = channel.data_node.get(0, -1)
tmp = lima_data_view._get_filenames(
channel.data_node.info, *range(0, len(lima_data_view))
)
if tmp != []:
tmp = numpy.array(tmp, ndmin=2)
relpath = [os.path.relpath(i, start=root_path) for i in tmp[:, 0]]
basename = [os.path.basename(i) for i in tmp[:, 0]]
entry = tmp[:, 1]
frame = tmp[:, 2]
file_type = tmp[:, 3]
return numpy.array(
(basename, file_type, frame, entry, relpath),
dtype=h5py.special_dtype(vlen=str),
).T
return None
......@@ -54,6 +54,7 @@ class AcquisitionChannel(AliasMixin, object):
self.__reference = reference
self.__description = {"reference": reference}
self.__data_node_type = data_node_type
self.__node = None
if isinstance(description, dict):
self.__description.update(description)
......@@ -109,6 +110,14 @@ class AcquisitionChannel(AliasMixin, object):
def unit(self):
return self.__unit
@property
def data_node(self):
return self.__node
@data_node.setter
def data_node(self, node):
self.__node = node
def emit(self, data):
if not self.reference:
data = self._check_and_reshape(data)
......
......@@ -647,6 +647,19 @@ class Scan:
def statistics(self):
return Statistics(self._acq_chain._stats_dict)
@property
def get_channels_dict(self):
"""
returns a dict containing all channels used in this scan
identified by their fullname
"""
flatten = lambda l: [item for sublist in l for item in sublist]
return {
c.fullname: c
for c in flatten([n.channels for n in self.acq_chain.nodes_list])
}
def add_preset(self, preset):
"""
Add a preset for this scan
......@@ -955,7 +968,7 @@ class Scan:
if self.writer:
# write scan_info to file
self.writer.finalize_scan_entry(self.node.name, self._scan_info)
self.writer.finalize_scan_entry(self)
self.writer.close()
# Add scan to the globals
......
......@@ -95,6 +95,11 @@ class FileWriter(object):
"""
raise NotImplementedError
def finalize_scan_entry(self, scan):
"""Called at the end of a scan
"""
pass
def new_scan(self, scan_name, scan_info):
raise NotImplementedError
......
......@@ -50,31 +50,6 @@ class Writer(FileWriter):
scan_entry["start_time"] = utc_time
measurement = scan_entry.create_group("measurement")
measurement.attrs["NX_class"] = "NXcollection"
instrument = scan_entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
positioners = instrument.create_group("positioners")
positioners.attrs["NX_class"] = "NXcollection"
positioners_dial = instrument.create_group("positioners_dial")
positioners_dial.attrs["NX_class"] = "NXcollection"
hdf5_scan_info = {
cat_name: scan_info.get(cat_name, {}) for cat_name in categories_names()
}
positioners_dict = hdf5_scan_info.get("instrument", {}).pop("positioners", {})
for pname, ppos in positioners_dict.items():
if isinstance(ppos, float):
positioners.create_dataset(pname, dtype="float64", data=ppos)
positioners_dial_dict = hdf5_scan_info.get("instrument", {}).pop(
"positioners_dial", {}
)
for pname, ppos in positioners_dial_dict.items():
if isinstance(ppos, float):
positioners_dial.create_dataset(pname, dtype="float64", data=ppos)
# pop rest of instrument
instrument_info = hdf5_scan_info.pop("instrument")
dicttoh5(instrument_info, self.file, h5path=f"{scan_name}/instrument")
dicttoh5(hdf5_scan_info, self.file, h5path=f"{scan_name}/scan_info")
return measurement
......@@ -123,6 +98,69 @@ class Writer(FileWriter):
self.last_point_index[channel] += data_len
def finalize_scan_entry(self, scan):
scan_name = scan.node.name
scan_info = scan._scan_info
### fill image references ###
for fname, channel in scan.get_channels_dict.items():
if channel.reference:
try:
data = channel.acq_device.to_ref_array(channel, self.root_path)
shape = numpy.shape(data)
dtype = data.dtype
dataset = self.file.create_dataset(
f"{scan_name}/measurement/{channel.alias_or_fullname}",
shape=shape,
dtype=dtype,
compression="gzip",
)
dataset.attrs.modify("fullname", channel.fullname)
dataset.attrs.modify("alias", channel.alias or "None")
dataset.attrs.modify("has_alias", channel.has_alias)
dataset[:] = data
except Exception as e:
pass
#### use scan_meta to fill fields ####
instrument = self.file.create_group(f"{scan_name}/instrument")
instrument.attrs["NX_class"] = "NXinstrument"
try:
positioners = instrument.create_group("positioners")
positioners.attrs["NX_class"] = "NXcollection"
positioners_dial = instrument.create_group("positioners_dial")
positioners_dial.attrs["NX_class"] = "NXcollection"
hdf5_scan_meta = {
cat_name: scan_info.get(cat_name, {}) for cat_name in categories_names()
}
positioners_dict = hdf5_scan_meta.get("instrument", {}).pop(
"positioners", {}
)
for pname, ppos in positioners_dict.items():
if isinstance(ppos, float):
positioners.create_dataset(pname, dtype="float64", data=ppos)
positioners_dial_dict = hdf5_scan_meta.get("instrument", {}).pop(
"positioners_dial", {}
)
for pname, ppos in positioners_dial_dict.items():
if isinstance(ppos, float):
positioners_dial.create_dataset(pname, dtype="float64", data=ppos)
except Exception:
# dealing with cases where there are no positioners in the session
hdf5_scan_meta.get("instrument", {}).pop("positioners", {})
hdf5_scan_meta.get("instrument", {}).pop("positioners_dial", {})
# pop rest of instrument
instrument_meta = hdf5_scan_meta.pop("instrument")
dicttoh5(instrument_meta, self.file, h5path=f"{scan_name}/instrument")
dicttoh5(hdf5_scan_meta, self.file, h5path=f"{scan_name}/scan_meta")
def close(self):
super(Writer, self).close()
if self.file is not None:
......
import pytest
from bliss.common.tango import DeviceProxy
from bliss.controllers.lima.roi import Roi
@pytest.fixture
def alias_session(beacon, lima_simulator):
session = beacon.get("test_alias")
env_dict = dict()
session.setup(env_dict)
ls = env_dict["lima_simulator"]
rois = ls.roi_counters
dev_name = lima_simulator[0].lower()
roi_dev = DeviceProxy(dev_name.replace("limaccds", "roicounter"))
r1 = Roi(0, 0, 100, 200)
rois["r1"] = r1
r2 = Roi(100, 100, 100, 200)
rois["r2"] = r2
r3 = Roi(200, 200, 200, 200)
rois["r3"] = r3
env_dict["ALIASES"].create_alias("myroi3", "lima_simulator.roi_counters.r3.sum")
yield env_dict, session
session.close()
......@@ -277,3 +277,29 @@ def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--axis-name", help="axis name")
@pytest.fixture
def alias_session(beacon, lima_simulator):
from bliss.common.tango import DeviceProxy
from bliss.controllers.lima.roi import Roi
session = beacon.get("test_alias")
env_dict = dict()
session.setup(env_dict)
ls = env_dict["lima_simulator"]
rois = ls.roi_counters
dev_name = lima_simulator[0].lower()
roi_dev = DeviceProxy(dev_name.replace("limaccds", "roicounter"))
r1 = Roi(0, 0, 100, 200)
rois["r1"] = r1
r2 = Roi(100, 100, 100, 200)
rois["r2"] = r2
r3 = Roi(200, 200, 200, 200)
rois["r3"] = r3
env_dict["ALIASES"].create_alias("myroi3", "lima_simulator.roi_counters.r3.sum")
yield env_dict, session
session.close()
......@@ -137,3 +137,66 @@ def test_subscan_in_hdf5(beacon, lima_simulator, dummy_acq_master, dummy_acq_dev
assert f[subscan_name]["measurement"]["timer2:elapsed_time"]
assert f[scan.node.name]["measurement"]["dummy1:nb"]
assert f[subscan_name]["measurement"]["dummy2:nb"]
def test_image_reference_in_hdf5(alias_session, scan_tmpdir):
env_dict, session = alias_session
# put scan file in a tmp directory
env_dict["SCAN_SAVING"].base_path = str(scan_tmpdir)
s = scans.ascan(env_dict["robyy"], 0, 1, 3, .1, env_dict["lima_simulator"])
f = h5py.File(s.writer.filename)
refs = numpy.array(f["1_ascan/measurement/lima_simulator:image"])
assert numpy.array_equal(
refs,
numpy.array(
[
[
"lima_simulator_0000.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0000.edf",
],
[
"lima_simulator_0001.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0001.edf",
],
[
"lima_simulator_0002.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0002.edf",
],
],
dtype=object,
),
)
def test_lima_instrument_entry(alias_session, scan_tmpdir):
env_dict, session = alias_session
# put scan file in a tmp directory
env_dict["SCAN_SAVING"].base_path = str(scan_tmpdir)
s = scans.ascan(env_dict["robyy"], 0, 1, 3, .1, env_dict["lima_simulator"])
f = h5py.File(s.writer.filename)
assert (
"saving_frame_per_file"
in f["1_ascan/instrument/lima_simulator/lima_parameters"]
)
assert "acq_mode" in f["1_ascan/instrument/lima_simulator/lima_parameters"]
assert "height" in f["1_ascan/instrument/lima_simulator/roi_counters/r1"]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment