Commit 2cac2316 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'add-device-meta-to-scan_info2' into 'master'

Add device metadata to scan_info

Closes #2705

See merge request !3660
parents 82c697f8 50a46050
Pipeline #46730 failed with stages
in 97 minutes and 14 seconds
......@@ -28,6 +28,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update to silx 0.15
- Demo
- Added regulation mock to the demo session
- Scan publication
- Added device/channel metadata to the `scan_info`
- Added a `PREPARED` event with an updated scan_info
- `AcqObj.fill_meta_at_scan_start` is used to fill to `scan_info`
- Added metadata `type` for Lima detector and MCAs
- Added `ScansWatcher` and `ScansObserver` to replace `watch_session_scans`
### Changed
......@@ -60,7 +66,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Wago modules catalogue: 750-342,352,363,515
- Writer
- explicit exception if parent node is missing
- OFF and RUNNING states, now means respecitively "not listening to events and resources released" and "writer alive"
- OFF and RUNNING states, now means respectively "not listening to events and resources released" and "writer alive"
### Changed
......
......@@ -122,8 +122,8 @@ class _Base:
def wait_ready(self):
return self.device.wait_ready()
def fill_meta_at_scan_init(self, scan_meta):
return self.device.fill_meta_at_scan_init(scan_meta)
def fill_meta_at_scan_start(self, scan_meta):
return self.device.fill_meta_at_scan_start(scan_meta)
def fill_meta_at_scan_end(self, scan_meta):
return self.device.fill_meta_at_scan_end(scan_meta)
......
......@@ -116,3 +116,16 @@ class IcatPublisher(ABC):
Return a dict containing metadata
"""
raise NotImplementedError
class HasMetadataForScan(ABC):
"""
Any controller which provides metadata during a scan life cycle.
"""
def metadata_when_prepared(self) -> dict:
"""
Return a dict containing metadata when the device was prepared by the
scan.
"""
raise NotImplementedError
......@@ -15,6 +15,7 @@ from bliss.common.tango import DeviceProxy, DevFailed, Database, DevState
from bliss.config import settings
from bliss.config.beacon_object import BeaconObject
from bliss.common.logtools import log_debug
from bliss.common.protocols import HasMetadataForScan
from bliss.controllers.counter import CounterController, counter_namespace
from bliss import current_session
......@@ -80,7 +81,7 @@ class ChangeTangoTimeout(object):
self.__device.set_timeout_millis(self.__back_timeout)
class Lima(CounterController):
class Lima(CounterController, HasMetadataForScan):
"""
Lima controller.
Basic configuration:
......@@ -155,6 +156,9 @@ class Lima(CounterController):
self, parents_list=["lima", "controllers"], children_list=[self._proxy]
)
def metadata_when_prepared(self) -> dict:
return {"type": "lima"}
@property
def disable_bpm(self):
return self._disable_bpm
......
......@@ -21,6 +21,7 @@ import gevent
from bliss.controllers.mca.roi import RoiConfig
from bliss.common.logtools import log_debug
from bliss.common.protocols import HasMetadataForScan
from bliss.common.utils import autocomplete_property
from bliss.config.beacon_object import BeaconObject
from bliss.controllers.counter import CounterController
......@@ -91,7 +92,7 @@ class MCABeaconObject(BeaconObject):
# Base class
class BaseMCA(CounterController):
class BaseMCA(CounterController, HasMetadataForScan):
"""Generic MCA controller."""
# Life cycle
......@@ -107,6 +108,9 @@ class BaseMCA(CounterController):
self.initialize_attributes()
self.initialize_hardware()
def metadata_when_prepared(self) -> dict:
return {"type": "mca"}
def get_acquisition_object(self, acq_params, ctrl_params, parent_acq_params):
from bliss.scanning.acquisition.mca import (
......
......@@ -42,7 +42,7 @@ class ScanNode(DataNodeContainer):
def dataset(self):
return self.parent
def prepared(self):
def prepared(self, scan_info):
"""Publish PREPARED event in Redis
"""
if not self.new_node:
......@@ -51,6 +51,7 @@ class ScanNode(DataNodeContainer):
# TODO: what does the comment above mean?
with settings.pipeline(self._prepared_stream, self._info):
event = PreparedScanEvent()
self._info.update(scan_info)
self._prepared_stream.add_event(event)
def end(self, exception=None):
......
......@@ -121,7 +121,8 @@ class ScansObserver:
Arguments:
scan_db_name: Identifier of the scan
scan_info: Dictionary containing scan metadata
scan_info: Dictionary containing scan metadata updated with metadata
prepared metadata from controllers
"""
pass
......@@ -200,8 +201,10 @@ class ScansObserver:
Arguments:
scan_db_name: Identifier of the parent scan
scan_info: Dictionary containing scan metadata. It can be different
from the one at start.
scan_info: Dictionary containing scan metadata updated with
prepared and finished metadata from controllers
Other fields like positioners and datetime are also
updated.
"""
pass
......
This diff is collapsed.
......@@ -216,7 +216,7 @@ class ScanManager(bliss_scan.ScansObserver):
managed)."""
return scan_db_name in self.__cache
def on_scan_created(self, scan_db_name: str, scan_info: Dict):
def on_scan_started(self, scan_db_name: str, scan_info: Dict):
_logger.info("Scan started: %s", scan_info.get("title", scan_db_name))
_logger.debug("on_scan_created %s", scan_db_name)
if scan_db_name in self.__cache:
......
......@@ -15,6 +15,7 @@ import collections
import gevent
from treelib import Tree
from bliss.common.protocols import HasMetadataForScan
from bliss.common.event import dispatcher
from bliss.common.alias import CounterAlias
from bliss.common.cleanup import capture_exceptions
......@@ -463,13 +464,16 @@ class AcquisitionObject:
In this method, acquisition device should collect and meta data
related to this device and prepare it for publishing. it is called
during the scan initialization.
This can be used in two ways:
1) attaching meta data to the scan_meta object and publishing it in scan_info
i.e: scan_meta.instrument.set(self,{"timing mode":"fast"})
2) the return value of this function is used to fill the meta data of the
node attached to this AcqObj
"""
device = self.device
if isinstance(device, HasMetadataForScan):
return device.metadata_when_prepared()
return None
def fill_meta_at_scan_end(self, scan_meta):
......@@ -477,7 +481,7 @@ class AcquisitionObject:
In this method, acquisition device should collect and meta data
related to this device and prepare it for publishing. it is called
at the end of the scan.
This can be used in two ways:
1) attaching meta data to the scan_meta object and publishing it in scan_info
......
......@@ -1278,7 +1278,10 @@ class Scan:
self._prepare_devices(devices_tree)
self.writer.prepare(self)
self.node.prepared()
self._fill_meta("fill_meta_at_scan_start")
# The scan info was updated with device metadata
self.node.prepared(self._scan_info)
self._axes_in_scan = self._get_data_axes(include_calc_reals=True)
with execute_pre_scan_hooks(self._axes_in_scan):
......@@ -1413,16 +1416,16 @@ class Scan:
Method name can be either 'fill_meta_as_scan_start' or 'fill_meta_at_scan_end'
"""
for dev in self.acq_chain.nodes_list:
node = self.nodes.get(dev)
if node is None:
# prepare has not finished ?
continue
for acq_obj in self.acq_chain.nodes_list:
with KillMask(masked_kill_nb=1):
meth = getattr(dev, method_name)
tmp = meth(self.user_scan_meta)
if tmp:
update_node_info(node, tmp)
fill_meta = getattr(acq_obj, method_name)
metadata = fill_meta(self.user_scan_meta)
if metadata is not None:
node = self.nodes.get(acq_obj)
if node is not None:
update_node_info(node, metadata)
if method_name == "fill_meta_at_scan_start":
self._scan_info._set_device_meta(acq_obj, metadata)
def run(self):
"""Run the scan
......@@ -1531,8 +1534,6 @@ class Scan:
# starting the scan
self._set_state(ScanState.STARTING)
self._fill_meta("fill_meta_at_scan_start")
self._execute_preset("start")
# this execute iterations
......
......@@ -80,6 +80,40 @@ For now:
So reaching name for devices or channels can be done the following way:
`name = key.rsplit(":", 1)[-1]`.
Device metadata
===============
Metadata can be exposed per devices. It can be feed by the controller or by the
acquisition object during the preparation of the controllers.
BLISS provides a type metadata only 2 kind of objects: `mca` and `lima`
=================== =========== =========== ========================
Name Type Flag Description
=================== =========== =========== ========================
`channels` list[str] optional If set, list of channels exposed
by this device
`triggered_devices` list[str] optional If set, list of sub devices
triggered by this device
`type` str optional One of `lima` or `mca`
(other values could be used
but are not yet normalized)
=================== =========== =========== ========================
This can be used to infer sub devices and channels.
- A device typed as `mca`
- contains 1D channels which are spectrums
- A device typed as `lima`
- can contain an 2D channel which is the image of the detector
- can contain a device named `bpm`
- can contain a device named `roi_counters`
- this device contains a bunch of 0D channels associated to a BMP
- can contain a device named `roi_profiles`
- this device contains a bunch of 1D channels associated to ROIs
- can contain a device named `roi_collection`
- this device contains a bunch of 1D channels, each value is associated to a single ROI
Channel metadata
================
......@@ -88,12 +122,16 @@ the scan_info.
The following metadata are automatically generated.
- `display_name` (optional, str): Expected displayed name
- `unit` (optional, str): SI unit used by the data
- `dim` (mandatory, int): Dimensionality of the data
(0: scalar, 1: 1D data, 2: 2D data)
============== ====== =========== ========================
Name Type Flag Description
============== ====== =========== ========================
`display_name` str optional Expected displayed name
`unit` str optional SI unit used by the data
`dim` int mandatory Dimensionality of the data
(0: scalar, 1: 1D data, 2: 2D data)
============== ====== =========== ========================
There is other metadata which can be feed by the scan designers.
Other metadata can be feed by the scan designers.
Take a look at :meth:`ScanInfo.set_channel_meta`.
......@@ -263,6 +301,24 @@ class ScanInfo(dict):
info = self._scan_info.setdefault("sequence-info", {})
info["scan-count"] = int(scan_count)
def _get_key_from_acq_obj(self, acq_obj):
"""Used device key from an acq_obj"""
return f"{str(id(acq_obj))}:{acq_obj.name}"
def _set_device_meta(self, acq_obj, metadata: typing.Dict):
"""Protected function called by the scan to feed the device info after
the device preparation.
Argument:
acq_obj: A device though its acquisition object
metadata: A dictionary of basic python types.
Few reserved keys could be ignored.
"""
devices = self.setdefault("devices", {})
name = self._get_key_from_acq_obj(acq_obj)
device = devices.setdefault(name, {})
device.update(metadata)
def _get_channels_dict(self, acq_object, channels_dict):
scalars = channels_dict.setdefault("scalars", [])
spectra = channels_dict.setdefault("spectra", [])
......
......@@ -1131,6 +1131,8 @@ def expected_detector_content(name, config=True, save_images=True):
datasets = {"type", "data", "acq_parameters", "ctrl_parameters"}
else:
datasets = {"type", "acq_parameters", "ctrl_parameters"}
elif name.startswith("simu1_") or name.startswith("simu2_"):
datasets = {"data", "type"}
else:
datasets = {"data"}
else:
......@@ -1145,20 +1147,34 @@ def expected_detector_content(name, config=True, save_images=True):
datasets = {"data"}
else:
if save_images:
datasets = {"data", "acq_parameters", "ctrl_parameters"}
datasets = {"data", "acq_parameters", "ctrl_parameters", "type"}
else:
datasets = {"acq_parameters", "ctrl_parameters"}
datasets = {"acq_parameters", "ctrl_parameters", "type"}
elif name == "image":
if save_images:
datasets = {"data", "acq_parameters", "ctrl_parameters"}
datasets = {"data", "acq_parameters", "ctrl_parameters", "type"}
else:
datasets = {"acq_parameters", "ctrl_parameters"}
datasets = {"acq_parameters", "ctrl_parameters", "type"}
elif re.match("roi[1-3]_(sum|avg|std|min|max)", name):
datasets = {"data", "roi1", "roi2", "roi3"}
elif name == "roi4":
datasets = {"data", "roi4"}
elif name == "roi_collection_counter":
datasets = {"data", "roi_collection_counter"}
elif re.match("roi[1-3]", name):
# Lima
datasets = {"data", "type"}
elif name.startswith("simu1_") or name.startswith("simu2_"):
# MCAs
datasets = {"data", "type"}
elif (
name.endswith("_det0")
or name.endswith("_det1")
or name.endswith("_det2")
or name.endswith("_det3")
):
# MCAs
datasets = {"data", "type"}
else:
datasets = {"data"}
return datasets
......
......@@ -43,20 +43,26 @@ SCAN_INFO = {
SCAN_INFO_LIMA_ROIS = {
"acquisition_chain": {"timer": {"devices": ["master", "slave"]}},
"acquisition_chain": {
"timer": {"devices": ["master", "beamviewer", "beamviewer:roi_counters"]}
},
"devices": {
"master": {
"channels": ["timer:elapsed_time", "timer:epoch"],
"triggered_devices": ["slave"],
"triggered_devices": ["beamviewer"],
},
"slave": {
"beamviewer": {
"type": "lima",
"triggered_devices": ["beamviewer:roi_counters"],
"channels": ["beamviewer:image"],
},
"beamviewer:roi_counters": {
"channels": [
"beamviewer:roi_counters:roi1_sum",
"beamviewer:roi_counters:roi1_avg",
"beamviewer:roi_counters:roi4_sum",
"beamviewer:roi_counters:roi4_avg",
"beamviewer:roi_counters:roi5_avg",
"beamviewer:image",
]
},
},
......@@ -301,6 +307,7 @@ def test_amesh_scan_with_image_and_mca():
"triggered_devices": ["slave"],
},
"slave": {
"type": "mca",
"channels": [
"timer:elapsed_time",
"timer:epoch",
......@@ -315,7 +322,7 @@ def test_amesh_scan_with_image_and_mca():
"mca1:deadtime_det0",
"mca1:spectrum_det0",
"tomocam:image",
]
],
},
},
"channels": {
......
......@@ -35,8 +35,8 @@ SCAN_INFO_3 = {
class MockedScanManager(scan_manager.ScanManager):
def emit_scan_created(self, scan_info):
self.on_scan_created(scan_info["node_name"], scan_info)
def emit_scan_started(self, scan_info):
self.on_scan_started(scan_info["node_name"], scan_info)
def emit_scan_finished(self, scan_info):
self.on_scan_finished(scan_info["node_name"], scan_info)
......@@ -67,12 +67,12 @@ def test_interleaved_scans():
scans = manager.get_alive_scans()
assert len(scans) == 0
manager.emit_scan_created(scan_info_1)
manager.emit_scan_started(scan_info_1)
scans = manager.get_alive_scans()
assert len(scans) == 1
assert scans[0].scanInfo() == scan_info_1
manager.emit_scan_created(scan_info_2)
manager.emit_scan_started(scan_info_2)
manager.emit_scalar_updated(scan_info_1, "axis:roby", numpy.arange(2))
manager.emit_scalar_updated(scan_info_2, "axis:robz", numpy.arange(3))
manager.wait_data_processed()
......@@ -95,7 +95,7 @@ def test_sequencial_scans():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_created(scan_info_1)
manager.emit_scan_started(scan_info_1)
manager.emit_scalar_updated(scan_info_1, "axis:roby", numpy.arange(2))
manager.wait_data_processed()
scans = manager.get_alive_scans()
......@@ -104,7 +104,7 @@ def test_sequencial_scans():
assert manager.get_alive_scans() == []
assert scans[0].scanInfo() == scan_info_1
manager.emit_scan_created(scan_info_2)
manager.emit_scan_started(scan_info_2)
manager.emit_scalar_updated(scan_info_2, "axis:robz", numpy.arange(3))
manager.wait_data_processed()
scans = manager.get_alive_scans()
......@@ -119,7 +119,7 @@ def test_bad_sequence__end_before_new():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_finished(scan_info_1)
manager.emit_scan_created(scan_info_1)
manager.emit_scan_started(scan_info_1)
# FIXME What to do anyway then? The manager is locked
......@@ -170,7 +170,7 @@ def test_image__default():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_created(scan_info_3)
manager.emit_scan_started(scan_info_3)
scan = manager.get_alive_scans()[0]
image = numpy.arange(1).reshape(1, 1)
......@@ -191,7 +191,7 @@ def test_image__disable_video():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_created(scan_info_3)
manager.emit_scan_started(scan_info_3)
scan = manager.get_alive_scans()[0]
image = numpy.arange(1).reshape(1, 1)
......@@ -216,7 +216,7 @@ def test_image__decoding_error():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_created(scan_info_3)
manager.emit_scan_started(scan_info_3)
scan = manager.get_alive_scans()[0]
image = numpy.arange(1).reshape(1, 1)
......@@ -241,7 +241,7 @@ def test_prefered_user_refresh():
manager = MockedScanManager(flintModel=None)
manager.emit_scan_created(scan_info_3)
manager.emit_scan_started(scan_info_3)
scan = manager.get_alive_scans()[0]
channel = scan.getChannelByName("lima:image")
channel.setPreferedRefreshRate("foo", 500)
......@@ -275,7 +275,7 @@ def test_scalar_data_lost():
manager = MockedScanManager(flintModel=None)
# Disabled async consumption
manager.emit_scan_created(scan_info_1)
manager.emit_scan_started(scan_info_1)
scans = manager.get_alive_scans()
assert len(scans) == 1
assert scans[0].scanInfo() == scan_info_1
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment