Commit 2398b1c5 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Fix metadata timing issues: introduce TIMING.PREPARED

Note: controllers of the acq. chain publish not in
scan_info but the controller node. This node only exists
after the scan is prepared (node creation is part of preparation)
parent a33bdd2c
......@@ -125,6 +125,9 @@ class _Base:
def fill_meta_at_scan_start(self):
return self.device.fill_meta_at_scan_start()
def fill_meta_at_scan_prepared(self):
return self.device.fill_meta_at_scan_prepared()
def fill_meta_at_scan_end(self):
return self.device.fill_meta_at_scan_end()
......
......@@ -398,8 +398,8 @@ class FakeAcquisitionSlave(AcquisitionSlave):
# self.channels.update_from_array(data)
# self.channels.update({self.chname: self.positions})
def fill_meta_at_scan_start(self):
tmp_dict = super().fill_meta_at_scan_start()
def fill_meta_at_scan_prepared(self):
tmp_dict = super().fill_meta_at_scan_prepared()
for cnt in self._counters:
if isinstance(cnt, HasMetadataForScan):
mdata = cnt.scan_metadata()
......
......@@ -18,6 +18,7 @@ from bliss.data.events import (
PreparedScanEvent,
)
from bliss.config import settings
from bliss.common.utils import update_node_info
class ScanNode(DataNodeContainer):
......@@ -51,10 +52,10 @@ class ScanNode(DataNodeContainer):
# TODO: what does the comment above mean?
with settings.pipeline(self._prepared_stream, self._info):
event = PreparedScanEvent()
self._info.update(scan_info)
update_node_info(self, scan_info)
self._prepared_stream.add_event(event)
def end(self, exception=None):
def end(self, scan_info, exception=None):
"""Publish END event in Redis
"""
if not self.new_node:
......@@ -63,12 +64,10 @@ class ScanNode(DataNodeContainer):
# TODO: what does the comment above mean?
with settings.pipeline(self._end_stream, self._info):
event = EndScanEvent()
add_info = {
"end_time": event.time,
"end_time_str": event.strftime,
"end_timestamp": event.timestamp,
}
self._info.update(add_info)
scan_info["end_time"] = event.time
scan_info["end_time_str"] = event.strftime
scan_info["end_timestamp"] = event.timestamp
update_node_info(self, scan_info)
self._end_stream.add_event(event)
def decode_raw_events(self, events):
......
......@@ -68,8 +68,8 @@ class BaseCounterAcquisitionSlave(AcquisitionSlave):
def _emit_new_data(self, data):
self.channels.update_from_iterable(data)
def fill_meta_at_scan_start(self):
tmp_dict = super().fill_meta_at_scan_start()
def fill_meta_at_scan_prepared(self):
tmp_dict = super().fill_meta_at_scan_prepared()
for cnt in self._counters:
if isinstance(cnt, HasMetadataForScan):
mdata = cnt.scan_metadata()
......
......@@ -462,7 +462,17 @@ class AcquisitionObject:
def fill_meta_at_scan_start(self):
"""
In this method, acquisition device should collect any meta data
related to this device. It is called during the scan initialization.
related to this device. It is called after scan start.
The return value of this function is used to fill the meta data of the
node attached to this AcqObj
"""
return None
def fill_meta_at_scan_prepared(self):
"""
In this method, acquisition device should collect any meta data
related to this device. It is called after all devices are prepared.
The return value of this function is used to fill the meta data of the
node attached to this AcqObj
......
......@@ -32,6 +32,7 @@ from bliss.common import plot as plot_mdl
from bliss.common.utils import periodic_exec, deep_update
from bliss.scanning.scan_meta import (
META_TIMING,
ScanMeta,
get_user_scan_meta,
get_controllers_scan_meta,
)
......@@ -648,9 +649,9 @@ class Scan:
self._init_acq_chain(chain)
self._init_scan_saving(scan_saving)
self._init_scan_display()
self._init_scan_info(scan_info=scan_info, save=save)
self._init_writer(save=save, save_images=save_images)
self._init_flint()
self._metadata_at_scan_instantiation(scan_info=scan_info, save=save)
def _init_scan_saving(self, scan_saving):
if scan_saving is None:
......@@ -692,22 +693,6 @@ class Scan:
for node in self._acq_chain._tree.is_branch(node):
self._uniquify_chan_name(node, names)
def _init_scan_info(self, scan_info=None, save=True):
"""Initialize `scan_info`"""
if scan_info is not None:
self._scan_info.update(scan_info)
scan_saving = self.__scan_saving
self._scan_info.setdefault("title", self.__name)
self._scan_info["session_name"] = scan_saving.session
self._scan_info["user_name"] = scan_saving.user_name
self._scan_info["data_writer"] = scan_saving.writer
self._scan_info["data_policy"] = scan_saving.data_policy
self._scan_info["shadow_scan_number"] = self._shadow_scan_number
self._scan_info["save"] = save
self._scan_info["publisher"] = "Bliss"
self._scan_info["publisher_version"] = publisher_version
self._scan_info.set_acquisition_chain_info(self._acq_chain)
def _init_writer(self, save=True, save_images=None):
"""Initialize the data writer if needed"""
scan_config = self.__scan_saving.get()
......@@ -755,11 +740,27 @@ class Scan:
return True
def _create_data_node(self, node_name):
"""Create the data node in Redis
@property
def _node_name(self):
node_name = str(self.__scan_number) + "_" + self.name
if self._shadow_scan_number:
return "_" + node_name
else:
return node_name
def _create_scan_node(self):
"""Create the scan node in Redis
"""
# The root nodes will not have caching
self.root_node = self.__scan_saving.get_parent_node()
# The scan node and its children will have caching
if self._REDIS_CACHING:
self._scan_connection = get_redis_proxy(db=1, caching=True, shared=False)
else:
self._scan_connection = self.root_connection
self.__node = create_node(
node_name,
self._node_name,
node_type=self._NODE_TYPE,
parent=self.root_node,
info=self._scan_info,
......@@ -794,15 +795,30 @@ class Scan:
def _prepare_node(self):
if self.node is not None:
return
# The root nodes will not have caching
self.root_node = self.__scan_saving.get_parent_node()
# The scan node and its children will have caching
if self._REDIS_CACHING:
self._scan_connection = get_redis_proxy(db=1, caching=True, shared=False)
else:
self._scan_connection = self.root_connection
# Note: _init_scan_number need to be done before the rest!
self._init_scan_number()
self._metadata_at_scan_start()
self._create_scan_node()
self._init_pipeline_mgr()
def _end_node(self):
self._cleanup_pipeline_mgr()
with capture_exceptions(raise_index=0) as capture:
_exception, _, _ = sys.exc_info()
### order is important in the next lines...
with capture():
self._metadata_at_scan_end()
with capture():
self.node.end(self._scan_info, exception=_exception)
with capture():
self.set_ttl()
self._update_scan_info_in_redis()
def _init_scan_number(self):
self.writer.template.update(
{
"scan_name": self.name,
......@@ -810,27 +826,10 @@ class Scan:
"scan_number": "{scan_number}",
}
)
self.__scan_number = self._next_scan_number()
self.writer.template["scan_number"] = self.scan_number
self._scan_info["scan_nb"] = self.__scan_number
# this has to be done when the writer is ready
self._scan_info["filename"] = self.writer.filename
start_timestamp = time.time()
start_time = datetime.datetime.fromtimestamp(start_timestamp)
self._scan_info["start_time"] = start_time
start_time_str = start_time.strftime("%a %b %d %H:%M:%S %Y")
self._scan_info["start_time_str"] = start_time_str
self._scan_info["start_timestamp"] = start_timestamp
node_name = str(self.__scan_number) + "_" + self.name
if self._shadow_scan_number:
node_name = "_" + node_name
self._create_data_node(node_name)
def _init_pipeline_mgr(self):
if self._USE_PIPELINE_MGR:
# Channel data will be emitted and the associated `trigger_data_watch_callback`
# calls executed, when one of the following things happens:
......@@ -855,25 +854,12 @@ class Scan:
self._current_pipeline_stream = self.root_connection.pipeline()
self._pending_watch_callback = weakref.WeakKeyDictionary()
def _end_node(self):
def _cleanup_pipeline_mgr(self):
if self._USE_PIPELINE_MGR:
self._rotating_pipeline_mgr = None
else:
self._current_pipeline_stream = None
with capture_exceptions(raise_index=0) as capture:
_exception, _, _ = sys.exc_info()
with capture():
# Store end event before setting the ttl
self.node.end(exception=_exception)
with capture():
self.set_ttl()
self._scan_info["end_time"] = self.node.info["end_time"]
self._scan_info["end_time_str"] = self.node.info["end_time_str"]
self._scan_info["end_timestamp"] = self.node.info["end_timestamp"]
def __repr__(self):
number = self.__scan_number
if self._shadow_scan_number:
......@@ -1285,9 +1271,9 @@ class Scan:
self._prepare_devices(devices_tree)
self.writer.prepare(self)
self._prepare_scan_meta()
# The scan info was updated with device metadata
# Publishes the metadata of a "prepared" scan
# in Redis
self._metadata_at_scan_prepared()
self.node.prepared(self._scan_info)
self._axes_in_scan = self._get_data_axes(include_calc_reals=True)
......@@ -1371,48 +1357,60 @@ class Scan:
else:
event.connect(dev, "new_data", self._channel_event)
@property
def user_scan_meta(self):
if self.__user_scan_meta is None:
self.__user_scan_meta = get_user_scan_meta().copy()
return self.__user_scan_meta
def _metadata_at_scan_instantiation(self, scan_info=None, save=True):
"""Metadata of an "instantiated" scan. Saved in Redis when creating the scan node.
"""
if scan_info is not None:
self._scan_info.update(scan_info)
scan_saving = self.__scan_saving
self._scan_info.setdefault("title", self.__name)
self._scan_info["session_name"] = scan_saving.session
self._scan_info["user_name"] = scan_saving.user_name
self._scan_info["data_writer"] = scan_saving.writer
self._scan_info["data_policy"] = scan_saving.data_policy
self._scan_info["shadow_scan_number"] = self._shadow_scan_number
self._scan_info["save"] = save
self._scan_info["publisher"] = "Bliss"
self._scan_info["publisher_version"] = publisher_version
self._scan_info.set_acquisition_chain_info(self._acq_chain)
@property
def _controllers_scan_meta(self):
if self.__controllers_scan_meta is None:
filtered_controller_names = []
for acq_obj in self.acq_chain.nodes_list:
# we do not want to collect controller metadata for controllers
# which are involved in the scan, since they will report their
# metadata via 'fill_meta' methods. So, we make a list of
# controller names to filter out.
# Nb: the acquisition object name == underlying device name normally
# /!\ this may be different from 'scan_metadata_name' in controllers metadata,
# so if someone tries hard it is possible to break the logic here
filtered_controller_names.append(acq_obj.name)
self.__controllers_scan_meta = get_controllers_scan_meta(
filtered_names=filtered_controller_names
)
return self.__controllers_scan_meta
def _metadata_at_scan_start(self):
"""Metadata of a "started" scan. Saved in Redis when creating the scan node.
So this is the first scan_info any subscriber sees.
"""
self._scan_info["scan_nb"] = self.__scan_number
def _update_scan_info_with_scan_meta(self, meta_timing):
with KillMask(masked_kill_nb=1):
deep_update(
self._scan_info,
self._controllers_scan_meta.to_dict(self, timing=meta_timing),
)
deep_update(
self._scan_info, self.user_scan_meta.to_dict(self, timing=meta_timing)
)
original = set(self._scan_info.get("scan_meta_categories", []))
extra1 = set(self._controllers_scan_meta.used_categories_names())
extra2 = set(self.user_scan_meta.used_categories_names())
self._scan_info["scan_meta_categories"] = list(original | extra1 | extra2)
# this has to be done when the writer is ready
self._scan_info["filename"] = self.writer.filename
# update scan info in redis
update_node_info(self.node, dict(self._scan_info))
start_timestamp = time.time()
start_time = datetime.datetime.fromtimestamp(start_timestamp)
self._scan_info["start_time"] = start_time
start_time_str = start_time.strftime("%a %b %d %H:%M:%S %Y")
self._scan_info["start_time_str"] = start_time_str
self._scan_info["start_timestamp"] = start_timestamp
self._metadata_of_plot()
def _prepare_scan_meta(self):
self._metadata_of_acq_controllers(META_TIMING.START)
self._metadata_of_nonacq_controllers(META_TIMING.START)
self._metadata_of_user(META_TIMING.START)
def _metadata_at_scan_prepared(self):
"""Metadata of a "prepared" scan. Saved in Redis by `ScanNode.prepared`
"""
self._metadata_of_acq_controllers(META_TIMING.PREPARED)
self._metadata_of_nonacq_controllers(META_TIMING.PREPARED)
self._metadata_of_user(META_TIMING.PREPARED)
def _metadata_at_scan_end(self):
"""Metadata of a "finished" scan. Saved in Redis by `ScanNode.end`
"""
self._metadata_of_acq_controllers(META_TIMING.END)
self._metadata_of_nonacq_controllers(META_TIMING.END)
self._metadata_of_user(META_TIMING.END)
def _metadata_of_plot(self):
# Plot metadata
display_extra = {}
displayed_channels = self.__scan_display.displayed_channels
......@@ -1430,11 +1428,82 @@ class Scan:
if len(display_extra) > 0:
self._scan_info["_display_extra"] = display_extra
# Collect exclusive scan metadata
self._fill_meta("fill_meta_at_scan_start")
def _metadata_of_user(self, meta_timing):
"""Update scan_info with user scan metadata.
"""
self._evaluate_scan_meta(self._user_scan_meta, meta_timing)
def _metadata_of_nonacq_controllers(self, meta_timing):
"""Update scan_info with controller scan metadata.
"""
self._evaluate_scan_meta(self._controllers_scan_meta, meta_timing)
# Update with controllers metadata and user metadata, and push to redis
self._update_scan_info_with_scan_meta(META_TIMING.START)
def _metadata_of_acq_controllers(self, meta_timing):
"""Update the controller Redis nodes with metadata.
"""
if meta_timing == META_TIMING.START:
method_name = "fill_meta_at_scan_start"
elif meta_timing == META_TIMING.PREPARED:
method_name = "fill_meta_at_scan_prepared"
elif meta_timing == META_TIMING.END:
method_name = "fill_meta_at_scan_end"
else:
return
for acq_obj in self.acq_chain.nodes_list:
with KillMask(masked_kill_nb=1):
fill_meta = getattr(acq_obj, method_name)
metadata = fill_meta()
# There is a difference between None and an empty dict.
# An empty dict shows up as a group in the Nexus file
# while None does not.
if metadata is None:
continue
node = self.nodes.get(acq_obj)
if node is not None:
update_node_info(node, metadata)
if meta_timing == META_TIMING.START:
self._scan_info._set_device_meta(acq_obj, metadata)
def _evaluate_scan_meta(self, scan_meta, meta_timing):
"""Evaluate the metadata generators of a ScanMeta instance
and update scan_info.
"""
assert isinstance(scan_meta, ScanMeta)
with KillMask(masked_kill_nb=1):
metadata = scan_meta.to_dict(self, timing=meta_timing)
if not metadata:
return
deep_update(self._scan_info, metadata)
original = set(self._scan_info.get("scan_meta_categories", []))
extra = set(scan_meta.used_categories_names())
self._scan_info["scan_meta_categories"] = list(original | extra)
@property
def _user_scan_meta(self):
if self.__user_scan_meta is None:
self.__user_scan_meta = get_user_scan_meta().copy()
return self.__user_scan_meta
@property
def _controllers_scan_meta(self):
if self.__controllers_scan_meta is None:
# The metadata of controllers involved in the scan is collected
# directly by `_metadata_of_acq_controllers`
scan_controller_names = []
for acq_obj in self.acq_chain.nodes_list:
scan_controller_names.append(acq_obj.name)
# Register metadata generators for controllers not involved
# in the scan (filter out the scan_controller_names).
self.__controllers_scan_meta = get_controllers_scan_meta(
filtered_controller_names=scan_controller_names
)
return self.__controllers_scan_meta
def _update_scan_info_in_redis(self):
"""Publish changes to the local copy of scan_info
"""
update_node_info(self.node, self._scan_info)
def disconnect_all(self):
for dev in self._devices:
......@@ -1457,26 +1526,6 @@ class Scan:
self._scan_info["state"] = state
self.__state_change.set()
def _fill_meta(self, method_name):
"""Fill metadata from devices using specified method
Method name can be either 'fill_meta_as_scan_start' or 'fill_meta_at_scan_end'
"""
for acq_obj in self.acq_chain.nodes_list:
with KillMask(masked_kill_nb=1):
fill_meta = getattr(acq_obj, method_name)
metadata = fill_meta()
# There is a difference between None and an empty dict.
# An empty dict shows up as a group in the Nexus file
# while None does not.
if metadata is not None:
node = self.nodes.get(acq_obj)
if node is not None:
update_node_info(node, metadata)
self._controllers_scan_meta.instrument.set(acq_obj.name, metadata)
if method_name == "fill_meta_at_scan_start":
self._scan_info._set_device_meta(acq_obj, metadata)
def run(self):
"""Run the scan
......@@ -1608,14 +1657,6 @@ class Scan:
killed = True
raise
with capture():
# check if there is any master or device that would like
# to provide meta data at the end of the scan.
self._fill_meta("fill_meta_at_scan_end")
with capture():
self._update_scan_info_with_scan_meta(META_TIMING.END)
# wait the end of publishing
# (should be already finished)
with capture():
......
......@@ -15,7 +15,6 @@ __all__ = ["get_user_scan_meta"]
import copy as copy_module
import enum
import pprint
import weakref
from bliss import global_map
from bliss.common.protocols import HasMetadataForScan, HasMetadataForScanExclusive
......@@ -25,21 +24,12 @@ from bliss.common.logtools import user_warning
class META_TIMING(enum.Flag):
START = enum.auto()
END = enum.auto()
PREPARED = enum.auto()
USER_SCAN_META = None
def get_user_scan_meta():
global USER_SCAN_META
if USER_SCAN_META is None:
USER_SCAN_META = ScanMeta()
USER_SCAN_META.instrument.set("@NX_class", {"@NX_class": "NXinstrument"})
USER_SCAN_META.instrument.timing = META_TIMING.END
USER_SCAN_META.technique.set("@NX_class", {"@NX_class": "NXcollection"})
return USER_SCAN_META
class ScanMetaCategory:
"""Provides an API part of the metadata belonging to one category
"""
......@@ -132,39 +122,6 @@ class ScanMetaCategory:
return f"{self.__class__.__name__}{self.name}: \n " + s
def get_controllers_scan_meta(filtered_names=None):
scan_meta = ScanMeta()
scan_meta.instrument.set("@NX_class", {"@NX_class": "NXinstrument"})
scan_meta.positioners.set("positioners", fill_positioners)
scan_meta.positioners.timing = META_TIMING.START | META_TIMING.END
for obj in global_map.instance_iter("controllers"):
if isinstance(obj, HasMetadataForScan):
if isinstance(obj, HasMetadataForScanExclusive):
# metadata for this controller has to be gathered by acq. chain
continue
if not obj.scan_metadata_enabled:
continue
if filtered_names and obj.scan_metadata_name in filtered_names:
# this object is filtered out
continue
def metadata_generator(scan, obj=obj):
"""
Metadata generator registred with the instrument category
of user scan metadata.
"""
metadata_name = obj.scan_metadata_name
if not metadata_name:
user_warning(f"{repr(obj)} needs a name to publish scan metadata")
return {}
else:
return {metadata_name: obj.scan_metadata()}
scan_meta.instrument.set(obj, metadata_generator)
return scan_meta
class ScanMeta:
"""Register metadata for all scans. The `Scan` object will call `ScanMeta.to_dict`
to generate the metadata.
......@@ -307,3 +264,52 @@ def fill_positioners(scan):
rd["positioners_units"] = units
return rd
def get_user_scan_meta():
"""A single instance is used for the lifetime of the process.
"""
global USER_SCAN_META
if USER_SCAN_META is None:
USER_SCAN_META = ScanMeta()
USER_SCAN_META.instrument.set("@NX_class", {"@NX_class": "NXinstrument"})
USER_SCAN_META.instrument.timing = META_TIMING.END
USER_SCAN_META.technique.set("@NX_class", {"@NX_class": "NXcollection"})
return USER_SCAN_META
def get_controllers_scan_meta(filtered_controller_names=None):
"""A new instance is created for every scan.
"""
scan_meta = ScanMeta()
scan_meta.instrument.set("@NX_class", {"@NX_class": "NXinstrument"})
scan_meta.positioners.set("positioners", fill_positioners)
scan_meta.positioners.timing = META_TIMING.START | META_TIMING.END
for obj in global_map.instance_iter("controllers"):
if isinstance(obj, HasMetadataForScan):
if isinstance(obj, HasMetadataForScanExclusive):
# metadata for this controller has to be gathered by acq. chain
continue
if not obj.scan_metadata_enabled:
continue
if filtered_controller_names and obj.name in filtered_controller_names:
# Controllers from which we need metadata, whether it is part of
# a scan or not, derive from HasMetadataForScan. When part of the
# scan the name will be in filtered_controller_names.
continue
def metadata_generator(scan, obj=obj):
"""
Metadata generator registred with the instrument category
of user scan metadata.