Commit 4777626b authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Three metadata protocols: HasMetadataForDataset, HasMetadataForScan and...

Three metadata protocols: HasMetadataForDataset, HasMetadataForScan and NonScannableHasMetadataForScan
parent a5c37cac
......@@ -12,6 +12,7 @@ from abc import ABC
from collections import namedtuple
from types import SimpleNamespace
from typing import Mapping
from typing import Union
class IterableNamespace(SimpleNamespace):
......@@ -104,28 +105,37 @@ class Scannable(ABC):
raise NotImplementedError
class IcatPublisher(ABC):
class HasMetadataForDataset(ABC):
"""
Any controller that has this interface can be used
for metadata collection based on the `icat-mapping`
tag in the session configuration
Any controller which provides metadata intended to be saved
during a dataset life cycle.
The `dataset_metadata` is called by the Bliss session's icat_mapping
object when the session has such a mapping configured.
"""
def metadata(self) -> dict:
def dataset_metadata(self) -> Union[dict, None]:
"""
Return a dict containing metadata
Returning an empty dictionary means the controller has metadata
but no values. `None` means the controller has no metadata.
"""
raise NotImplementedError
class HasMetadataForScan(ABC):
"""
Any controller which provides metadata during a scan life cycle.
Any controller which provides metadata intended to be saved
during a scan life cycle.
The `scan_metadata` method is called by the acquisition chain
objects `AcquisitionObject` (directly or indirectly).
Controllers not involved in the acquisition chain should use
the `NonScannableHasMetadataForScan` protocol.
"""
def metadata_when_prepared(self) -> dict:
def scan_metadata(self) -> Union[dict, None]:
"""
Return a dict containing metadata when the device was prepared by the
scan.
Returning an empty dictionary means the controller has metadata
but no values. `None` means the controller has no metadata.
"""
raise NotImplementedError
......@@ -30,6 +30,7 @@ from bliss.common.utils import UserNamespace
from bliss.common import constants
from bliss.scanning import scan_saving
from bliss.scanning import scan_display
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
_SESSION_IMPORTERS = set()
......@@ -750,6 +751,8 @@ class Session:
setup_ret = self._setup(env_dict)
ret = ret and setup_ret
self.register_metadata_generators(env_dict)
return ret
def setup(
......@@ -838,6 +841,7 @@ class Session:
def close(self):
setup_globals.__dict__.clear()
self.unregister_metadata_generators(self.env_dict)
for obj_name, obj in self.env_dict.items():
if obj is self or obj is self.config:
continue
......@@ -971,6 +975,18 @@ class Session:
self.setup(self.env_dict, verbose)
@staticmethod
def register_metadata_generators(env_dict):
for o in env_dict.values():
if isinstance(o, NonScannableHasMetadataForScan):
o.enable_scan_metadata()
@staticmethod
def unregister_metadata_generators(env_dict):
for o in env_dict.values():
if isinstance(o, NonScannableHasMetadataForScan):
o.disable_scan_metadata()
class DefaultSession(Session):
"""Session without config, setup scripts and data policy
......
......@@ -6,7 +6,7 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.icat.definitions import Definitions
from os.path import commonprefix
from bliss.common.protocols import IcatPublisher
from bliss.common.protocols import HasMetadataForDataset
from collections.abc import MutableSequence
......@@ -19,9 +19,9 @@ class ICATmeta:
self.definitions = Definitions()
for key, obj in self.objects.items():
if not isinstance(obj, IcatPublisher):
if not isinstance(obj, HasMetadataForDataset):
raise RuntimeError(
f"{obj.name} ({key}) is not a valid metadata publisher!"
f"{obj.name} ({key}) does not implement the 'HasMetadataForDataset' protocol"
)
def get_metadata(self):
......@@ -37,13 +37,12 @@ class ICATmeta:
instrumentation = self.definitions.instrumentation._asdict()
for key, device in self.objects.items():
assert key in instrumentation, f"{key} is not a known icat field group"
assert hasattr(
device, "metadata"
), f"{device.name} has no metadata function"
prefix = commonprefix(list(instrumentation[key].fields)).strip("_")
# have to deal with cases where there is no tailing `_` in the prefix
# e.g. attenuator positions
obj_meta = device.metadata()
obj_meta = device.dataset_metadata()
if not obj_meta:
continue
for icat_key in instrumentation[key].fields:
obj_key = icat_key.split(prefix)[-1].strip("_")
if obj_key in obj_meta:
......
......@@ -156,7 +156,7 @@ class Lima(CounterController, HasMetadataForScan):
self, parents_list=["lima", "controllers"], children_list=[self._proxy]
)
def metadata_when_prepared(self) -> dict:
def scan_metadata(self) -> dict:
return {"type": "lima"}
@property
......
......@@ -18,17 +18,17 @@ from bliss.common.logtools import user_print
from bliss.common import timedisplay
from bliss.controllers.counter import counter_namespace
from bliss.scanning.scan_meta import get_user_scan_meta
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
from bliss.scanning.chain import ChainPreset, ChainIterationPreset
from bliss.common import tango
from bliss.common.protocols import IcatPublisher
from bliss.common.protocols import HasMetadataForDataset
from bliss.controllers.tango_attr_as_counter import (
TangoCounterController,
TangoAttrCounter,
)
class MachInfo(BeaconObject, IcatPublisher):
class MachInfo(BeaconObject, NonScannableHasMetadataForScan, HasMetadataForDataset):
""" Access to accelerator information.
- SR_Current
- SR_Lifetime
......@@ -135,59 +135,70 @@ class MachInfo(BeaconObject, IcatPublisher):
self.__check = False
user_print("Removing Wait For Refill on scans")
def dataset_metadata(self):
attributes = ["SR_Mode", "SR_Current"]
attributes = {
attr_name: value
for attr_name, value in zip(attributes, self._read_attributes(attributes))
}
return {
"mode": self.SRMODE(attributes["SR_Mode"]).name,
"current": attributes["SR_Current"],
}
@property
def scan_metadata_name(self):
return "machine"
def scan_metadata(self):
attributes = [
"SR_Mode",
"SR_Filling_Mode",
"SR_Single_Bunch_Current",
"SR_Current",
"Automatic_Mode",
"FE_State",
"SR_Refill_Countdown",
"SR_Operator_Mesg",
]
attributes = {
attr_name: value
for attr_name, value in zip(attributes, self._read_attributes(attributes))
}
# Standard:
meta_dict = {
"@NX_class": "NXsource",
"name": "ESRF",
"type": "Synchrotron",
"mode": self.SRMODE(attributes["SR_Mode"]).name,
"current": attributes["SR_Current"],
"current@units": "mA",
}
# Non-standard:
if attributes["SR_Filling_Mode"] == "1 bunch":
meta_dict["single_bunch_current"] = attributes["SR_Single_Bunch_Current"]
meta_dict["single_bunch_current@units"] = "mA"
meta_dict["filling_mode"] = attributes["SR_Filling_Mode"]
meta_dict["automatic_mode"] = attributes["Automatic_Mode"]
meta_dict["front_end"] = attributes["FE_State"]
meta_dict["refill_countdown"] = attributes["SR_Refill_Countdown"]
meta_dict["refill_countdown@units"] = "s"
meta_dict["message"] = attributes["SR_Operator_Mesg"]
return meta_dict
@BeaconObject.property(default=True)
def metadata(self):
"""
Insert machine info metadata's for any scans
"""
pass
return self.scan_metadata_enabled
@metadata.setter
def metadata(self, flag):
def get_meta(scan):
attributes = [
"SR_Mode",
"SR_Filling_Mode",
"SR_Single_Bunch_Current",
"SR_Current",
"Automatic_Mode",
"FE_State",
"SR_Refill_Countdown",
"SR_Operator_Mesg",
]
attributes = {
attr_name: value
for attr_name, value in zip(
attributes, self._read_attributes(attributes)
)
}
# Standard:
meta_dict = {
"@NX_class": "NXsource",
"name": "ESRF",
"type": "Synchrotron",
"mode": self.SRMODE(attributes["SR_Mode"]).name,
"current": attributes["SR_Current"],
"current@units": "mA",
}
# Non-standard:
if attributes["SR_Filling_Mode"] == "1 bunch":
meta_dict["single_bunch_current"] = attributes[
"SR_Single_Bunch_Current"
]
meta_dict["single_bunch_current@units"] = "mA"
meta_dict["filling_mode"] = attributes["SR_Filling_Mode"]
meta_dict["automatic_mode"] = attributes["Automatic_Mode"]
meta_dict["front_end"] = attributes["FE_State"]
meta_dict["refill_countdown"] = attributes["SR_Refill_Countdown"]
meta_dict["refill_countdown@units"] = "s"
meta_dict["message"] = attributes["SR_Operator_Mesg"]
return {"machine": meta_dict}
if flag:
get_user_scan_meta().instrument.set(self.KEY_NAME, get_meta)
self.enable_scan_metadata()
else:
get_user_scan_meta().instrument.remove(self.KEY_NAME)
self.disable_scan_metadata()
def iter_wait_for_refill(self, checktime, waittime=0., polling_time=1.):
"""
......
......@@ -113,7 +113,7 @@ class BaseMCA(CounterController, HasMetadataForScan):
self.initialize_attributes()
self.initialize_hardware()
def metadata_when_prepared(self) -> dict:
def scan_metadata(self) -> dict:
return {"type": "mca"}
def get_acquisition_object(self, acq_params, ctrl_params, parent_acq_params):
......
......@@ -508,7 +508,7 @@ class Controller:
class CalcController(Controller):
def __init__(self, *args, **kwargs):
Controller.__init__(self, *args, **kwargs)
super().__init__(*args, **kwargs)
self.axis_settings.config_setting["velocity"] = False
self.axis_settings.config_setting["acceleration"] = False
......
......@@ -51,13 +51,13 @@ import time
import math
import gevent
from bliss.scanning.scan_meta import get_user_scan_meta
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
from bliss.controllers.motor import Controller
from bliss.common.axis import AxisState
from bliss.common.tango import DevState, DeviceProxy
from bliss import global_map
from bliss.common.logtools import *
from bliss.common.logtools import log_error, user_print
from bliss.shell.cli.user_dialog import (
UserMsg,
......@@ -72,7 +72,7 @@ from bliss.shell.cli.pt_widgets import display, BlissDialog
__author__ = "Jens Meyer / Gilles Berruyer - ESRF ISDD SOFTGROUP BLISS - June 2019"
class esrf_hexapode(Controller):
class esrf_hexapode(Controller, NonScannableHasMetadataForScan):
""" Class to implement BLISS motor controller of esrf hexapode controlled
via tango device server
"""
......@@ -95,31 +95,17 @@ class esrf_hexapode(Controller):
log_error(self, _err_msg)
raise RuntimeError(_err_msg)
self._init_meta_data_publishing()
@property
def scan_metadata_name(self):
return self.name
def _init_meta_data_publishing(self):
"""this is about metadata publishing to the h5 file"""
if not self.name:
user_warning(
"to publish metadata the hexapode controller needs a name in config"
)
return
scan_meta_obj = get_user_scan_meta()
scan_meta_obj.instrument.set(self, lambda _: {self.name: self.metadata()})
def metadata(self):
"""
this is about metadata publishing to the h5 file AND ICAT
"""
def scan_metadata(self):
meta_dict = {"@NX_class": "NXhexapode"}
meta_dict = dict()
leg_length = self.device.read_attribute("LegLength").value
for i, value in enumerate(leg_length):
meta_dict.update({f"leglength{i+1}": value})
meta_dict["@NX_class"] = "NXhexapode"
return meta_dict
"""
......
......@@ -7,8 +7,8 @@
from bliss.controllers.motor import CalcController
from bliss.common.logtools import user_warning, log_debug
from bliss.scanning.scan_meta import get_user_scan_meta
from bliss.common.protocols import IcatPublisher
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
from bliss.common.protocols import HasMetadataForDataset
"""
example for single VERTICAL slits:
......@@ -86,27 +86,15 @@ example for single HORIZONTAL slits:
"""
class Slits(CalcController, IcatPublisher):
class Slits(CalcController, NonScannableHasMetadataForScan, HasMetadataForDataset):
def __init__(self, *args, **kwargs):
CalcController.__init__(self, *args, **kwargs)
super().__init__(*args, **kwargs)
self.slit_type = self.config.get("slit_type", default="both")
self._init_meta_data_publishing()
def __close__(self):
super().close()
def _init_meta_data_publishing(self):
"""this is about metadata publishing to the h5 file"""
if not self.name:
user_warning(
"to publish metadata the slit controller needs a name in config"
)
return
scan_meta_obj = get_user_scan_meta()
scan_meta_obj.instrument.set(self, lambda _: {self.name: self.metadata()})
def metadata(self):
"""
this is about metadata publishing to the h5 file AND ICAT
"""
def dataset_metadata(self):
cur_pos = self._do_calc_from_real()
meta_dict = dict()
......@@ -122,13 +110,19 @@ class Slits(CalcController, IcatPublisher):
meta_dict.update(
{"vertical_gap": cur_pos["vgap"], "vertical_offset": cur_pos["voffset"]}
)
return meta_dict
meta_dict["@NX_class"] = "NXslit"
@property
def scan_metadata_name(self):
return self.name
def scan_metadata(self):
meta_dict = self.dataset_metadata()
meta_dict["@NX_class"] = "NXslit"
return meta_dict
def initialize_axis(self, axis):
CalcController.initialize_axis(self, axis)
super().initialize_axis(axis)
axis.no_offset = True
def calc_from_real(self, positions_dict):
......
......@@ -75,7 +75,7 @@ import functools
from tabulate import tabulate
from gevent import Timeout
from bliss.common.protocols import IcatPublisher
from bliss.common.protocols import HasMetadataForDataset
from bliss.common.motor_group import Group
from bliss.common.axis import AxisState
from bliss.config.channels import Channel
......@@ -83,10 +83,10 @@ from bliss.common import event
from bliss.common.utils import flatten
from bliss.common.logtools import log_warning, log_error
from bliss import global_map, is_bliss_shell
from bliss.scanning.scan_meta import get_user_scan_meta
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
class MultiplePositions(IcatPublisher):
class MultiplePositions(HasMetadataForDataset, NonScannableHasMetadataForScan):
""" Handle multiple positions.
"""
......@@ -115,23 +115,21 @@ class MultiplePositions(IcatPublisher):
global_map.register(self, tag=name)
self._init_meta_data_publishing()
def _init_meta_data_publishing(self):
"""Publish position in meta data """
scan_meta_obj = get_user_scan_meta()
scan_meta_obj.instrument.set(self, lambda _: {self.name: self.metadata()})
def metadata(self):
if self.position != "unknown":
cur_pos_config = [
x.to_dict()
for x in self._config["positions"]
if x["label"] == self.position
][0]
if "metadata" in cur_pos_config:
return cur_pos_config["metadata"]
# no metadata or in unknown position -> return empty dict
def dataset_metadata(self):
return self._get_position_config().get("dataset_metadata", dict())
def scan_metadata(self):
return self._get_position_config().get("scan_metadata", dict())
@property
def scan_metadata_name(self):
return self.name
def _get_position_config(self):
position = self.position
for x in self._config["positions"]:
if x["label"] == position:
return x
return dict()
def add_label_move_method(self, pos_label):
......
......@@ -79,7 +79,7 @@ from bliss.common.utils import grouped
from bliss.controllers.wago.wago import WagoController, ModulesConfig, get_wago_comm
from bliss.config import channels
from bliss.common.event import dispatcher
from bliss.scanning.scan_meta import get_user_scan_meta
from bliss.scanning.scan_meta import NonScannableHasMetadataForScan
class TfWagoMapping:
......@@ -158,7 +158,7 @@ def _encode(status):
raise ValueError("Invalid position {!r}".format(status))
class Transfocator:
class Transfocator(NonScannableHasMetadataForScan):
"""
The lenses are controlled pneumatically via WAGO output modules.
The position is red from WAGO input modules.
......@@ -216,20 +216,16 @@ class Transfocator:
self.nb_pinhole = len(self.pinhole)
if self.nb_pinhole > 2:
raise ValueError(f"{name}: layout can only have 2 pinholes maximum")
super().__init__()
self._init_meta_data_publishing()
@property
def scan_metadata_name(self):
return self.name
def __del__(self):
# remove meta
scan_meta_obj = get_user_scan_meta()
scan_meta_obj.instrument.remove(self)
def _init_meta_data_publishing(self):
scan_meta_obj = get_user_scan_meta()
scan_meta_obj.instrument.set(
self,
lambda _: {self.name: {**self.status_dict(), "@NX_class": "NXcollection"}},
)
def scan_metadata(self):
metadata = self.status_dict()
metadata["@NX_class"] = "NXcollection"
return metadata
def connect(self):
""" Connect to the WAGO module, if not already done """
......
......@@ -473,7 +473,7 @@ class AcquisitionObject:
"""
device = self.device
if isinstance(device, HasMetadataForScan):
return device.metadata_when_prepared()
return device.scan_metadata()
return None
def fill_meta_at_scan_end(self, scan_meta):
......
......@@ -1421,11 +1421,14 @@ class Scan:
"""Fill metadata from devices using specified method
Method name can be either 'fill_meta_as_scan_start' or 'fill_meta_at_scan_end'
"""
"""
for acq_obj in self.acq_chain.nodes_list:
with KillMask(masked_kill_nb=1):
fill_meta = getattr(acq_obj, method_name)
metadata = fill_meta(self.user_scan_meta)
# There is a difference between None and an empty dict.
# An empty dict shows up as a group in the Nexus file
# while None does not.
if metadata is not None:
node = self.nodes.get(acq_obj)
if node is not None:
......
......@@ -15,8 +15,11 @@ __all__ = ["get_user_scan_meta"]
import copy as copy_module
import enum
import pprint
import weakref
from bliss import global_map
from bliss.common.protocols import HasMetadataForScan
from bliss.common.logtools import user_warning
class META_TIMING(enum.Flag):
......@@ -73,27 +76,50 @@ class ScanMetaCategory:
def timing(self, timing):
self._timing[self.category] = timing
def set(self, name_or_device, values):
def _parse_metadata_name(self, name_or_device):
"""
:param str name_or_device: is the access name must be unique or a device
with a name property
:param callable or dict values: callable needs to return a dictionary
:param name_or_device: string or an object with a name property
:returns str or None:
"""
if isinstance(name_or_device, str):
name = name_or_device
if not name_or_device:
user_warning("A name is required to publish scan metadata")
return None
return name_or_device
else:
name = name_or_device.name
self.metadata[name] = values
try:
name = name_or_device.name
if name:
return name
except AttributeError:
pass
user_warning(
repr(name_or_device) + " needs a name to publish scan metadata"
)
return None
def set(self, name_or_device, values):