Commit 6432d6cd authored by Matias Guijarro's avatar Matias Guijarro

Merge branch '689_image_links_in_hdf5' into 'master'

image links in hdf5 and second round on scan_meta

Closes #689 and #775

See merge request bliss/bliss!1293
parents 6d57a9c7 ffe27169
......@@ -606,3 +606,19 @@ class autocomplete_property(property):
"""
pass
def deep_update(source, overrides):
"""
Update a nested dictionary or similar mapping.
Modify ``source`` in place.
Copied from https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth/32357112#32357112
"""
for key, value in overrides.items():
if isinstance(value, collections.abc.Mapping) and value:
returned = deep_update(source.get(key, {}), value)
source[key] = returned
else:
source[key] = overrides[key]
return source
......@@ -8,6 +8,9 @@
from .roi import Roi
from .properties import LimaProperty
from bliss.common.measurement import BaseCounter
import numpy
import h5py
import os
class ImageCounter(BaseCounter):
......@@ -57,3 +60,33 @@ class ImageCounter(BaseCounter):
"Lima.image: set roi only accepts roi (class)"
" or (x,y,width,height) values"
)
# handling of reference saving in hdf5
def to_ref_array(self, channel, root_path):
""" used to produce a string version of a lima reference that can be saved in hdf5
"""
# looks like the events are not emitted after saving,
# therefore we will use 'last_image_ready' instead
# of "last_image_saved" for now
# last_image_saved = event_dict["data"]["last_image_saved"]
lima_data_view = channel.data_node.get(0, -1)
tmp = lima_data_view._get_filenames(
channel.data_node.info, *range(0, len(lima_data_view))
)
if tmp != []:
tmp = numpy.array(tmp, ndmin=2)
relpath = [os.path.relpath(i, start=root_path) for i in tmp[:, 0]]
basename = [os.path.basename(i) for i in tmp[:, 0]]
entry = tmp[:, 1]
frame = tmp[:, 2]
file_type = tmp[:, 3]
return numpy.array(
(basename, file_type, frame, entry, relpath),
dtype=h5py.special_dtype(vlen=str),
).T
return None
......@@ -36,6 +36,9 @@ class Roi(object):
def __repr__(self):
return "<%s,%s> <%s x %s>" % (self.x, self.y, self.width, self.height)
def to_dict(self):
return {"x": self.x, "y": self.y, "width": self.width, "height": self.height}
def __eq__(self, other):
return self.p0 == other.p0 and self.p1 == other.p1 and self.name == other.name
......
......@@ -223,11 +223,11 @@ class LimaImageChannelDataNode(DataNode):
raise RuntimeError("Image %d was not saved" % image_nb)
image_index_in_file = image_nb % nb_image_per_file
file_nb = first_file_number + image_index_in_file
file_nb = first_file_number + image_nb // nb_image_per_file
file_path = path_format % file_nb
if file_format == "HDF5":
returned_params.append(
(file_path, "/entry_%04d" % 1, image_index_in_file, file_format)
(file_path, "/entry_%04d" % 0, image_index_in_file, file_format)
)
else:
returned_params.append(
......
......@@ -434,6 +434,11 @@ class DataNode(object):
def name(self):
return self._data.name
@property
@protect_from_kill
def fullname(self):
return self._data.fullname
@property
@protect_from_kill
def type(self):
......
......@@ -314,3 +314,18 @@ class LimaAcquisitionMaster(AcquisitionMaster):
if self._reading_task is None:
return True
return self._reading_task.get()
def fill_meta_at_scan_end(self, scan_meta):
scan_meta.instrument.set(
self, {self.name: {"lima_parameters": self.parameters}}
)
def fill_meta_at_scan_init(self, scan_meta):
try:
rois = dict(self.channels[0].acq_device.master_controller.roi_counters)
roi_counters = dict()
for roi_name, roi in rois.items():
roi_counters[roi_name] = roi.to_dict()
scan_meta.instrument.set(self, {self.name: {"roi_counters": roi_counters}})
except Exception:
pass
......@@ -295,10 +295,18 @@ class AcquisitionMaster(object):
return self.prepare()
def fill_info(self, scan_meta):
def fill_meta_at_scan_init(self, scan_meta):
"""
In this method, acquisition device should fill the information relative to his device in
the scan_meta object.
the scan_meta object. It is called during the scan initialization
i.e: scan_meta.instrument.set(self,{"timing mode":"fast"})
"""
pass
def fill_meta_at_scan_end(self, scan_meta):
"""
In this method, acquisition device should fill the information relative to his device in
the scan_meta object. It is called at the scan end
i.e: scan_meta.instrument.set(self,{"timing mode":"fast"})
"""
pass
......@@ -514,11 +522,19 @@ class AcquisitionDevice(object):
raise RuntimeError("%s: Last reading task is not finished." % self.name)
return self.prepare()
def fill_info(self, scan_meta):
def fill_meta_at_scan_init(self, scan_meta):
"""
In this method, acquisition device should fill the information relative to his device in
the scan_meta object.
i.e: scan_meta.instrument.set(self,{"mca":{"calibration":calib_name}})
the scan_meta object. It is called during the scan initialization
i.e: scan_meta.instrument.set(self,{"timing mode":"fast"})
"""
pass
def fill_meta_at_scan_end(self, scan_meta):
"""
In this method, acquisition device should fill the information relative to his device in
the scan_meta object. It is called at the scan end
i.e: scan_meta.instrument.set(self,{"timing mode":"fast"})
"""
pass
......
......@@ -54,6 +54,7 @@ class AcquisitionChannel(AliasMixin, object):
self.__reference = reference
self.__description = {"reference": reference}
self.__data_node_type = data_node_type
self.__node = None
if isinstance(description, dict):
self.__description.update(description)
......@@ -109,6 +110,14 @@ class AcquisitionChannel(AliasMixin, object):
def unit(self):
return self.__unit
@property
def data_node(self):
return self.__node
@data_node.setter
def data_node(self, node):
self.__node = node
def emit(self, data):
if not self.reference:
data = self._check_and_reshape(data)
......
......@@ -23,7 +23,7 @@ from bliss.common.event import connect, send, disconnect
from bliss.common.cleanup import error_cleanup, axis as cleanup_axis, capture_exceptions
from bliss.common.greenlet_utils import KillMask
from bliss.common.plot import get_flint, CurvePlot, ImagePlot
from bliss.common.utils import periodic_exec
from bliss.common.utils import periodic_exec, deep_update
from .scan_meta import get_user_scan_meta
from bliss.common.utils import Statistics, Null
from bliss.config.conductor import client
......@@ -554,10 +554,10 @@ class Scan:
self.__nodes = dict()
self._devices = []
user_scan_meta = get_user_scan_meta().copy()
# call all master and device to fill scan info
self.user_scan_meta = get_user_scan_meta().copy()
# call all master and device to fill scan_meta
for dev in chain.nodes_list:
dev.fill_info(user_scan_meta)
dev.fill_meta_at_scan_init(self.user_scan_meta)
self._scan_info["session_name"] = session_name
self._scan_info["user_name"] = user_name
self._scan_info["scan_nb"] = self.__scan_number
......@@ -569,7 +569,7 @@ class Scan:
start_time_str = start_time.strftime("%a %b %d %H:%M:%S %Y")
self._scan_info["start_time_str"] = start_time_str
self._scan_info["start_timestamp"] = start_timestamp
self._scan_info.update(user_scan_meta.to_dict(self))
self._scan_info.update(self.user_scan_meta.to_dict(self))
self._data_watch_callback = data_watch_callback
self._data_events = dict()
self._acq_chain = chain
......@@ -647,6 +647,19 @@ class Scan:
def statistics(self):
return Statistics(self._acq_chain._stats_dict)
@property
def get_channels_dict(self):
"""
returns a dict containing all channels used in this scan
identified by their fullname
"""
flatten = lambda l: [item for sublist in l for item in sublist]
return {
c.fullname: c
for c in flatten([n.channels for n in self.acq_chain.nodes_list])
}
def add_preset(self, preset):
"""
Add a preset for this scan
......@@ -816,6 +829,7 @@ class Scan:
unit=channel.unit,
fullname=channel.fullname,
)
channel.data_node = self.nodes[channel]
connect(channel, "new_data", self._channel_event)
def prepare(self, scan_info, devices_tree):
......@@ -939,8 +953,24 @@ class Scan:
self._data_watch_callback.on_scan_end(self.scan_info)
finally:
# check if there is any master or device that would like
# to provide meta data at the end of the scan
for dev in self.acq_chain.nodes_list:
dev.fill_meta_at_scan_end(self.user_scan_meta)
tmp_dict = self.user_scan_meta.to_dict(self)
# make sure that 'positioners' entry is not updated
tmp_dict["instrument"].pop("positioners")
tmp_dict["instrument"].pop("positioners_dial")
deep_update(self._scan_info, tmp_dict)
# update scan_info in redis
self.node._info.update(self.scan_info)
if self.writer:
# write scan_info to file
self.writer.finalize_scan_entry(self)
self.writer.close()
# Add scan to the globals
SCANS.append(self)
# Disconnect events
......
......@@ -95,6 +95,11 @@ class FileWriter(object):
"""
raise NotImplementedError
def finalize_scan_entry(self, scan):
"""Called at the end of a scan
"""
pass
def new_scan(self, scan_name, scan_info):
raise NotImplementedError
......
......@@ -11,6 +11,7 @@ import h5py
import numpy
import time
import datetime
import copy
from silx.io.dictdump import dicttoh5
from bliss.scanning.writer.file import FileWriter
from bliss.scanning.scan_meta import categories_names
......@@ -50,31 +51,6 @@ class Writer(FileWriter):
scan_entry["start_time"] = utc_time
measurement = scan_entry.create_group("measurement")
measurement.attrs["NX_class"] = "NXcollection"
instrument = scan_entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument"
positioners = instrument.create_group("positioners")
positioners.attrs["NX_class"] = "NXcollection"
positioners_dial = instrument.create_group("positioners_dial")
positioners_dial.attrs["NX_class"] = "NXcollection"
hdf5_scan_info = {
cat_name: scan_info.get(cat_name, {}) for cat_name in categories_names()
}
positioners_dict = hdf5_scan_info.get("instrument", {}).pop("positioners", {})
for pname, ppos in positioners_dict.items():
if isinstance(ppos, float):
positioners.create_dataset(pname, dtype="float64", data=ppos)
positioners_dial_dict = hdf5_scan_info.get("instrument", {}).pop(
"positioners_dial", {}
)
for pname, ppos in positioners_dial_dict.items():
if isinstance(ppos, float):
positioners_dial.create_dataset(pname, dtype="float64", data=ppos)
# pop rest of instrument
instrument_info = hdf5_scan_info.pop("instrument")
dicttoh5(instrument_info, self.file, h5path=f"{scan_name}/instrument")
dicttoh5(hdf5_scan_info, self.file, h5path=f"{scan_name}/scan_info")
return measurement
......@@ -123,6 +99,70 @@ class Writer(FileWriter):
self.last_point_index[channel] += data_len
def finalize_scan_entry(self, scan):
scan_name = scan.node.name
scan_info = scan._scan_info
### fill image references ###
for fname, channel in scan.get_channels_dict.items():
if channel.reference:
try:
data = channel.acq_device.to_ref_array(channel, self.root_path)
shape = numpy.shape(data)
dtype = data.dtype
dataset = self.file.create_dataset(
f"{scan_name}/measurement/{channel.alias_or_fullname}",
shape=shape,
dtype=dtype,
compression="gzip",
)
dataset.attrs.modify("fullname", channel.fullname)
dataset.attrs.modify("alias", channel.alias or "None")
dataset.attrs.modify("has_alias", channel.has_alias)
dataset[:] = data
except Exception as e:
pass
#### use scan_meta to fill fields ####
instrument = self.file.create_group(f"{scan_name}/instrument")
instrument.attrs["NX_class"] = "NXinstrument"
positioners = instrument.create_group("positioners")
positioners.attrs["NX_class"] = "NXcollection"
positioners_dial = instrument.create_group("positioners_dial")
positioners_dial.attrs["NX_class"] = "NXcollection"
# copy should be removed once positioners are no longer treated as special case
hdf5_scan_meta = copy.deepcopy(
{cat_name: scan_info.get(cat_name, {}) for cat_name in categories_names()}
)
try:
positioners_dict = hdf5_scan_meta.get("instrument", {}).pop(
"positioners", {}
)
for pname, ppos in positioners_dict.items():
if isinstance(ppos, float):
positioners.create_dataset(pname, dtype="float64", data=ppos)
positioners_dial_dict = hdf5_scan_meta.get("instrument", {}).pop(
"positioners_dial", {}
)
for pname, ppos in positioners_dial_dict.items():
if isinstance(ppos, float):
positioners_dial.create_dataset(pname, dtype="float64", data=ppos)
except Exception:
# dealing with cases where there are no positioners in the session
hdf5_scan_meta.get("instrument", {}).pop("positioners", {})
hdf5_scan_meta.get("instrument", {}).pop("positioners_dial", {})
# pop rest of instrument
instrument_meta = hdf5_scan_meta.pop("instrument")
dicttoh5(instrument_meta, self.file, h5path=f"{scan_name}/instrument")
dicttoh5(hdf5_scan_meta, self.file, h5path=f"{scan_name}/scan_meta")
def close(self):
super(Writer, self).close()
if self.file is not None:
......
import pytest
from bliss.common.tango import DeviceProxy
from bliss.controllers.lima.roi import Roi
@pytest.fixture
def alias_session(beacon, lima_simulator):
session = beacon.get("test_alias")
env_dict = dict()
session.setup(env_dict)
ls = env_dict["lima_simulator"]
rois = ls.roi_counters
dev_name = lima_simulator[0].lower()
roi_dev = DeviceProxy(dev_name.replace("limaccds", "roicounter"))
r1 = Roi(0, 0, 100, 200)
rois["r1"] = r1
r2 = Roi(100, 100, 100, 200)
rois["r2"] = r2
r3 = Roi(200, 200, 200, 200)
rois["r3"] = r3
env_dict["ALIASES"].create_alias("myroi3", "lima_simulator.roi_counters.r3.sum")
yield env_dict, session
session.close()
......@@ -277,3 +277,29 @@ def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--axis-name", help="axis name")
@pytest.fixture
def alias_session(beacon, lima_simulator):
from bliss.common.tango import DeviceProxy
from bliss.controllers.lima.roi import Roi
session = beacon.get("test_alias")
env_dict = dict()
session.setup(env_dict)
ls = env_dict["lima_simulator"]
rois = ls.roi_counters
dev_name = lima_simulator[0].lower()
roi_dev = DeviceProxy(dev_name.replace("limaccds", "roicounter"))
r1 = Roi(0, 0, 100, 200)
rois["r1"] = r1
r2 = Roi(100, 100, 100, 200)
rois["r2"] = r2
r3 = Roi(200, 200, 200, 200)
rois["r3"] = r3
env_dict["ALIASES"].create_alias("myroi3", "lima_simulator.roi_counters.r3.sum")
yield env_dict, session
session.close()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
import numpy
import glob
from bliss.common.scans import loopscan, DEFAULT_CHAIN
def lima_data_view_test_helper(scan):
scan.run()
for node in scan.node.iterator.walk(wait=False, filter="lima"):
image_node = node
lima_data_view = image_node.get(0)
lima_data_view._update()
ref_data = scan.scan_info["instrument"]["lima_simulator"]["lima_parameters"]
lima_files = numpy.array(
lima_data_view._get_filenames(ref_data, *range(0, scan.scan_info["npoints"]))
)
filesystem_files = sorted(glob.glob(ref_data["saving_directory"] + "/*"))
return lima_files, filesystem_files
def lima_data_view_test_assets(lima_files, filesystem_files):
for f in filesystem_files:
assert f in lima_files[:, 0]
for f in set(lima_files[:, 0]):
assert f in filesystem_files
def test_LimaDataView_edf_1_frame_per_edf(beacon, lima_simulator):
simulator = beacon.get("lima_simulator")
scan = loopscan(5, 0.1, simulator, save=True, run=False)
def test_LimaDataView_edf_2_frames_per_edf(beacon, lima_simulator):
simulator = beacon.get("lima_simulator")
scan = loopscan(5, 0.1, simulator, save=True, run=False)
sim_params = scan.acq_chain.nodes_list[1].parameters
sim_params["saving_frame_per_file"] = 2
lima_files, filesystem_files = lima_data_view_test_helper(scan)
lima_data_view_test_assets(lima_files, filesystem_files)
def test_LimaDataView_edf_1_frame_per_hdf5(beacon, lima_simulator):
simulator = beacon.get("lima_simulator")
scan = loopscan(5, 0.1, simulator, save=True, run=False)
sim_params = scan.acq_chain.nodes_list[1].parameters
sim_params["saving_format"] = "HDF5"
sim_params["saving_suffix"] = ".h5"
lima_data_view_test_assets(*lima_data_view_test_helper(scan))
def test_LimaDataView_edf_2_frames_per_hdf5(beacon, lima_simulator):
simulator = beacon.get("lima_simulator")
scan = loopscan(5, 0.1, simulator, save=True, run=False)
sim_params = scan.acq_chain.nodes_list[1].parameters
sim_params["saving_format"] = "HDF5"
sim_params["saving_frame_per_file"] = 2
sim_params["saving_suffix"] = ".h5"
lima_files, filesystem_files = lima_data_view_test_helper(scan)
lima_data_view_test_assets(lima_files, filesystem_files)
......@@ -137,3 +137,84 @@ def test_subscan_in_hdf5(beacon, lima_simulator, dummy_acq_master, dummy_acq_dev
assert f[subscan_name]["measurement"]["timer2:elapsed_time"]
assert f[scan.node.name]["measurement"]["dummy1:nb"]
assert f[subscan_name]["measurement"]["dummy2:nb"]
def test_image_reference_in_hdf5(alias_session, scan_tmpdir):
env_dict, session = alias_session
# put scan file in a tmp directory
env_dict["SCAN_SAVING"].base_path = str(scan_tmpdir)
s = scans.ascan(env_dict["robyy"], 0, 1, 3, .1, env_dict["lima_simulator"])
f = h5py.File(s.writer.filename)
refs = numpy.array(f["1_ascan/measurement/lima_simulator:image"])
assert numpy.array_equal(
refs,
numpy.array(
[
[
"lima_simulator_0000.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0000.edf",
],
[
"lima_simulator_0001.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0001.edf",
],
[
"lima_simulator_0002.edf",
"EDF",
"0",
"",
"scan0001/lima_simulator_0002.edf",
],
],
dtype=object,
),
)
def test_lima_instrument_entry(alias_session, scan_tmpdir):
env_dict, session = alias_session
# put scan file in a tmp directory
env_dict["SCAN_SAVING"].base_path = str(scan_tmpdir)
s = scans.ascan(env_dict["robyy"], 0, 1, 3, .1, env_dict["lima_simulator"])
f = h5py.File(s.writer.filename)
assert (
"saving_frame_per_file"
in f["1_ascan/instrument/lima_simulator/lima_parameters"]
)
assert "acq_mode" in f["1_ascan/instrument/lima_simulator/lima_parameters"]
assert "height" in f["1_ascan/instrument/lima_simulator/roi_counters/r1"]
def test_positioners_in_scan_info(alias_session, scan_tmpdir):
env_dict, session = alias_session
# put scan file in a tmp directory
env_dict["SCAN_SAVING"].base_path = str(scan_tmpdir)
s = scans.ascan(
env_dict["robyy"], 0, 1, 3, .1, env_dict["lima_simulator"], run=False
)
assert "positioners" in s.scan_info["instrument"]
s.run()
assert "positioners" in s.scan_info["instrument"]
......@@ -98,7 +98,7 @@ def test_scan_meta_master_and_device(scan_meta, clean_gevent):
def __init__(self):
super().__init__(self, None, "my_master")
def fill_info(self, scan_meta):
def fill_meta_at_scan_init(self, scan_meta):
scan_meta.instrument.set(self, master_dict)
device_name = "my_slave"
......@@ -117,7 +117,7 @@ def test_scan_meta_master_and_device(scan_meta, clean_gevent):
def __init__(self):
super().__init__(self, None, device_name)
def fill_info(self, scan_meta):
def fill_meta_at_scan_init(self, scan_meta):
scan_meta.instrument.set(self, device_dict)
master = DummyMaster()
......
......@@ -7,8 +7,8 @@
from bliss import setup_globals
from bliss.common.standard import wa, wm, sta, stm
from bliss.shell.cli import repl
from bliss.common.utils import deep_update
repl.ERROR_REPORT.expert_mode = True
......@@ -168,3 +168,30 @@ def test_stm_exception(beacon, capsys):
errmsg = "RuntimeError: Error on motor 'bad': BAD POSITION\n"
assert captured.err[-len(errmsg) :] == errmsg
def test_deep_update():
source = {"hello1": 1}
overrides = {"hello2": 2}
deep_update(source, overrides)
assert source == {"hello1": 1, "hello2": 2}
source = {"hello": "to_override"}
overrides = {"hello": "over"}
deep_update(source, overrides)
assert source == {"hello": "over"}
source = {"hello": {"value": "to_override", "no_change": 1}}
overrides = {"hello": {"value": "over"}}
deep_update(source, overrides)
assert source == {"hello": {"value": "over", "no_change": 1}}
source = {"hello": {"value": "to_override", "no_change": 1}}
overrides = {"hello": {"value": {}}}
deep_update(source, overrides)
assert source == {"hello": {"value": {}, "no_change": 1}}
source = {"hello": {"value": {}, "no_change": 1}}
overrides = {"hello": {"value": 2}}
deep_update(source, overrides)
assert source == {"hello": {"value": 2, "no_change": 1}}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment