Commit c3523622 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch '2792-support-for-icat-definition-field' into 'master'

Resolve "Support for icat "definition" field"

Closes #2792

See merge request !3758
parents f61f69be 9b39fecb
Pipeline #48343 failed with stages
in 101 minutes and 59 seconds
......@@ -676,7 +676,7 @@ class DataNode(metaclass=DataNodeMetaClass):
1. exists: the principal Redis key is created in Redis
2. initialized: all Redis keys are created and initialized
3. supported: initialized + version can be handled by the current implementation
Use the utility methods `get_node`, `get_nodes`, ... to instantiate
a `DataNode` depending on its state.
"""
......
......@@ -36,7 +36,6 @@ from bliss.scanning.scan_meta import (
get_user_scan_meta,
get_controllers_scan_meta,
)
from bliss.common.motor_group import is_motor_group
from bliss.common.utils import Null, update_node_info, round
from bliss.controllers.motor import Controller, get_real_axes
from bliss.config.conductor.client import get_redis_proxy
......@@ -665,6 +664,10 @@ class Scan:
scan_saving = current_session.scan_saving
self.__scan_saving = scan_saving.clone()
@property
def scan_saving(self):
return self.__scan_saving
def _init_scan_display(self):
self.__scan_display = current_session.scan_display.clone()
......@@ -943,15 +946,9 @@ class Scan:
@property
def get_channels_dict(self):
"""A dictionary of all channels used in this scan
"""
returns a dict containing all channels used in this scan
identified by their fullname
"""
flatten = lambda l: [item for sublist in l for item in sublist]
return {
c.name: c for c in flatten([n.channels for n in self.acq_chain.nodes_list])
}
return {c.name: c for n in self.acq_chain.nodes_list for c in n.channels}
def add_preset(self, preset):
"""
......@@ -1220,18 +1217,14 @@ class Scan:
def _pipeline_execute(self, pipeline, trigger_func):
while True:
try:
pipeline.execute()
for event in self._pending_watch_callback.get(pipeline, list()):
trigger_func(*event)
except:
raise
else:
if not len(self._current_pipeline_stream):
break
new_pipeline = self.root_connection.pipeline()
pipeline = self._current_pipeline_stream
self._current_pipeline_stream = new_pipeline
pipeline.execute()
for ev in self._pending_watch_callback.get(pipeline, list()):
trigger_func(*ev)
if not len(self._current_pipeline_stream):
break
new_pipeline = self.root_connection.pipeline()
pipeline = self._current_pipeline_stream
self._current_pipeline_stream = new_pipeline
def _swap_pipeline(self):
with self._stream_pipeline_lock:
......@@ -1582,7 +1575,7 @@ class Scan:
with capture_exceptions(raise_index=0) as capture:
with capture():
# check that icat metadata has been colleted for the dataset
# check that icat metadata has been collected for the dataset
self.__scan_saving.on_scan_run(not self._shadow_scan_number)
# create scan node in redis
......@@ -1673,7 +1666,7 @@ class Scan:
except KeyboardInterrupt as e:
killed = killed_by_user = True
raise ScanAbort from e
except BaseException as e:
except BaseException:
killed = True
raise
......
......@@ -749,7 +749,6 @@ class ESRFScanSaving(BasicScanSaving):
"date_format": "%Y%m%d",
"scan_number_format": "%04d",
"dataset_number_format": "%04d",
"technique": "",
# saved properties in Redis:
"_writer_module": "nexus",
"_proposal": "",
......@@ -796,12 +795,16 @@ class ESRFScanSaving(BasicScanSaving):
self._remove_deprecated()
def _remove_deprecated(self):
"""Remove deprecated items from existing Redis databases"""
stored = self.to_dict()
if "_sample" in stored:
# Deprecated in Bliss 1.7
# Deprecated in Bliss > 1.7.0
value = stored["_sample"]
self.remove("._sample")
self._collection = value
if "technique" in stored:
# Deprecated in Bliss > 1.8.0
self.remove("technique")
def __dir__(self):
keys = super().__dir__()
......
......@@ -90,7 +90,8 @@ class Writer(FileWriter):
self._retry(
self.scan_writer_started,
timeout_msg="Cannot check Nexus writer scan state",
fail_msg=f"Nexus writing for {self._scan_name} not started before {{time}}",
fail_msg=f"Nexus writer did not receive scan '{self._scan_name}' before {{time}}",
raise_on_timeout=True,
)
self._retry(
self.check_writer_permissions,
......
......@@ -108,7 +108,6 @@ DEMO [5]: SCAN_SAVING
.date_format = '%Y%m%d'
.scan_number_format = '%04d'
.dataset_number_format = '%04d'
.technique = ''
.session = 'demo'
.date = '20200208'
.scan_name = '{scan_name}'
......
......@@ -12,14 +12,14 @@ class Session:
self.env_dict["SCAN_SAVING"] = self.scan_saving
```
Creating a custom data policy means deriving a class from `bliss.scanning.scan_saving.BaseScanSaving`:
Creating a custom data policy means deriving a class from `bliss.scanning.scan_saving.BasicScanSaving`:
```python
class CustomScanSaving(BaseScanSaving):
class CustomScanSaving(BasicScanSaving):
DEFAULT_VALUES = {
# default and not removable values
"technique": "",
"scan_number_format": "%04d",
...
# saved properties in Redis:
"_proposal": "",
......
......@@ -13,7 +13,6 @@
Register metadata generators for a configurable writer
"""
import enum
from bliss.scanning import scan_meta
from .subscribers import scan_writer_publish
......
......@@ -13,7 +13,7 @@
original and Bliss will not patch it.
"""
import bliss
import bliss # unused but needed
import subprocess
from nexus_writer_service.patching.gevent import unpatch_module
......
......@@ -16,7 +16,6 @@ import logging
from gevent.time import time
from gevent import sleep
from collections import OrderedDict
from contextlib import contextmanager
from fabio.edfimage import EdfImage
from silx.io import dictdump
from .base_proxy import BaseProxy
......
......@@ -13,8 +13,6 @@
Basic Nexus writer listening to Redis events of a scan
"""
import gevent
import os
import numpy
import traceback
import logging
......
......@@ -31,6 +31,7 @@ from bliss.controllers.lima.roi import (
)
from bliss.common.counter import SamplingCounter
from ..utils import config_utils
from ..utils import session_utils
from ..utils import scan_utils
......@@ -58,15 +59,10 @@ def fill_instrument_name(scan):
:param bliss.scanning.scan.Scan scan:
"""
logger.debug("fill instrument name")
instrument = config_utils.institute()
beamline = config_utils.beamline()
beamline = config_utils.scan_saving_get("beamline", beamline)
if beamline:
if instrument:
instrument += ": " + beamline
else:
instrument = beamline
return {"instrument": instrument}
default = session_utils.scan_saving_get(
"beamline", scan_saving=scan.scan_saving, default=""
)
return {"instrument": config_utils.instrument(default=default)}
def fill_technique_info(scan):
......@@ -74,7 +70,7 @@ def fill_technique_info(scan):
:param bliss.scanning.scan.Scan scan:
"""
logger.debug("fill technique info")
return {"technique": current_technique_definition()}
return {"technique": current_technique_definition(scan.scan_saving)}
def fill_masterfiles(scan):
......@@ -213,7 +209,11 @@ def writer_config():
:returns dict:
"""
return config_utils.static_root_find("nexus_definitions", default={})
cfgnode = config_utils.static_root_find("nexus_definitions")
if cfgnode is None:
return dict()
else:
return cfgnode.to_dict()
def writer_config_get(name, default=None):
......@@ -252,13 +252,15 @@ def default_technique():
return technique_info_get("default", "undefined")
def current_technique():
def current_technique(scan_saving=None):
"""
Active technique from the session's scan saving object
:returns str:
"""
return config_utils.scan_saving_get("technique", default_technique())
return session_utils.dataset_get(
"definition", default_technique(), scan_saving=scan_saving
)
def techniques():
......@@ -288,13 +290,13 @@ def technique_definition(technique):
"plots": plots,
"plotselect": "",
}
technique_info = technique_info_get("techniques", {}).get(technique, {})
if technique_info is None:
technique_info = {}
info = technique_info_get("techniques", {}).get(technique, {})
if info is None:
info = {}
# Get the application definitions selected for this technique
applicationdict = technique_info_get("applications", {})
for name in technique_info.get("applications", []):
for name in info.get("applications", []):
definition = applicationdict.get(name, {})
# for example {'xrf':{'I0': 'iodet',
# 'It': 'idet',
......@@ -306,7 +308,7 @@ def technique_definition(technique):
# Get the plots selected for this technique
plotdict = technique_info_get("plots", {})
plotselect = "" # this first one is the default
for name in technique_info.get("plots", []):
for name in info.get("plots", []):
plotdefinition = plotdict.get(name, {})
# for examples:
# {'personal_name': 'counters', 'items': ['iodet', 'xmap1:deadtime_det2', ...]}
......@@ -326,10 +328,10 @@ def technique_definition(technique):
return ret
def current_technique_definition():
def current_technique_definition(scan_saving=None):
"""
Current technique definition from the technique info
:returns dict(dict): technique:definition (str:dict)
"""
return technique_definition(current_technique())
return technique_definition(current_technique(scan_saving=scan_saving))
......@@ -10,57 +10,41 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Bliss session configuration utilities
Static Bliss configuration utilities
"""
import os
import re
from functools import wraps
from bliss import current_session
from bliss.config import static
from bliss.scanning.scan_saving import ScanSaving, with_eval_dict
def static_config():
"""
Get static session configuration
:returns bliss.config.static.Config:
"""
return static.get_config()
def static_root():
"""
Get static session configuration
:returns bliss.config.static.ConfigNode:
"""
return static_config().root
def static_root_get(name, default=None):
def static_root(root=None):
"""
Get attribute from the static session configuration
:returns str:
:returns ConfigNode:
"""
return static_root().get(name, default)
if root is None:
return static_config().root
else:
return root
def static_root_find(name, default=None, parent=None):
def static_root_find(name, root=None):
"""
:param bliss.config.static.ConfigNode parent:
:returns dict:
:returns ConfigNode:
"""
if parent is None:
parent = static_root()
if parent.children:
for node in parent.children:
root = static_root(root=root)
if root.children:
for node in root.children:
if node.get("name", None) == name:
return node.to_dict()
return node
nodes = []
for node in parent.values():
for node in root.values():
if isinstance(node, static.ConfigNode):
nodes.append(node)
elif isinstance(node, list):
......@@ -69,77 +53,45 @@ def static_root_find(name, default=None, parent=None):
nodes.append(nodei)
for node in nodes:
if node.get("name", None) == name:
return node.to_dict()
ret = static_root_find(name, parent=node)
if ret:
return node
ret = static_root_find(name, root=node)
if ret is not None:
return ret
return {}
def with_scan_saving(func):
"""Pass the current session's SCAN_SAVING instance as a named argument
:param callable func:
:returns callable:
"""
@wraps(func)
def wrapper(*args, **kwargs):
scan_saving = kwargs.get("scan_saving")
if scan_saving is None:
kwargs["scan_saving"] = ScanSaving(current_session.name)
return func(*args, **kwargs)
return wrapper
@with_scan_saving
def scan_saving_get(attr, default=None, scan_saving=None):
"""
Get attribute from the session's scan saving object
:param str attr:
:param default:
:param bliss.scanning.scan.ScanSaving scan_saving:
:returns str:
"""
return getattr(scan_saving, attr, default)
@with_eval_dict
@with_scan_saving
def scan_saving_eval(template, scan_saving=None, eval_dict=None):
"""
Evaluate template with SCAN_SAVING attributes and properties.
:param str template:
:param bliss.scanning.scan.ScanSaving scan_saving:
:param dict eval_dict:
:returns str:
"""
return scan_saving.eval_template(template, eval_dict=eval_dict)
return None
def beamline():
def beamline(root=None, default="id00"):
"""
:returns str:
"""
name = "id00"
name = default
for k in "BEAMLINENAME", "BEAMLINE":
name = os.environ.get(k, name)
name = static_root_get("beamline", name)
scan_saving = static_root_get("scan_saving", {})
name = scan_saving.get("beamline", name)
root = static_root(root=root)
name = root.get("beamline", name)
scan_saving = static_root_find("scan_saving", root=root)
if scan_saving is not None:
name = scan_saving.get("beamline", name)
return name.lower()
def institute():
def institute(root=None, default=""):
"""
:returns str:
"""
root = static_root()
name = ""
root = static_root(root=root)
name = default
name = root.get("institute", name)
name = root.get("laboratory", name)
name = root.get("synchrotron", name)
return name
def instrument(root=None, default=""):
"""
:returns str:
"""
root = static_root(root=root)
name = institute(root=root, default=default)
name = root.get("instrument", name)
return name
......@@ -15,7 +15,6 @@ else:
import os
from io import StringIO
import logging
from contextlib import contextmanager, ExitStack
from time import time
from .logging_utils import log
......
......@@ -11,7 +11,7 @@
import os
import gevent
from . import config_utils
from . import session_utils
from . import data_policy
from .logging_utils import print_out
......@@ -98,7 +98,7 @@ def scan_filename(scan):
return scan_info_get(scan, "filename")
@config_utils.with_scan_saving
@session_utils.with_scan_saving
def session_filename(scan_saving=None):
"""
Name of the file that contains the scan data of the current BLISS session
......@@ -106,7 +106,7 @@ def session_filename(scan_saving=None):
:param bliss.scanning.scan.ScanSaving scan_saving:
:returns str or None:
"""
return config_utils.scan_saving_get(
return session_utils.scan_saving_get(
"filename", default=None, scan_saving=scan_saving
)
......@@ -125,7 +125,7 @@ def scan_master_filenames(scan, config=True):
return info.get("masterfiles", {})
@config_utils.with_scan_saving
@session_utils.with_scan_saving
def session_master_filenames(scan_saving=None, config=True):
"""
Names of the files that contain links to the scan data of the current BLISS session
......@@ -165,7 +165,7 @@ def scan_filenames(scan, config=True):
return filenames
@config_utils.with_scan_saving
@session_utils.with_scan_saving
def session_filenames(scan_saving=None, config=True):
"""
Names of the files that contain links to the scan data (raw or as links) of the current BLISS session
......
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Activate Bliss session utilities
"""
from functools import wraps
from bliss import current_session
def with_scan_saving(func):
"""Pass the current session's SCAN_SAVING instance as a named argument
:param callable func:
:returns callable:
"""
@wraps(func)
def wrapper(*args, **kwargs):
scan_saving = kwargs.get("scan_saving")
if scan_saving is None:
if current_session:
kwargs["scan_saving"] = current_session.scan_saving
else:
raise RuntimeError("No activate Bliss session")
return func(*args, **kwargs)
return wrapper
@with_scan_saving
def scan_saving_get(attr, default=None, scan_saving=None):
"""Get attribute from the session's scan saving object
:param str attr:
:param default:
:param bliss.scanning.scan.ScanSaving scan_saving:
:returns str:
"""
return getattr(scan_saving, attr, default)
@with_scan_saving
def dataset_get(attr, default=None, scan_saving=None):
"""Get attribute from the session's dataset object
:param str attr:
:param default:
:param bliss.scanning.scan.ScanSaving scan_saving:
:returns str:
"""
try:
return scan_saving.dataset[attr]
except (AttributeError, KeyError):
return default
......@@ -1142,6 +1142,6 @@ def nexus_writer_service(ports):
device_fqdn = "tango://localhost:{}/{}".format(ports.tango_port, device_name)
with start_tango_server(
"NexusWriterService", "testwriters", "--log", "info", device_fqdn=device_fqdn
"NexusWriterService", "testwriters", "--log", "warning", device_fqdn=device_fqdn
) as dev_proxy:
yield device_fqdn, dev_proxy
......@@ -1377,36 +1377,52 @@ def test_parallel_scans(session, esrf_data_policy):
@pytest.mark.parametrize(
"missing_dataset, missing_collection, missing_proposal",
list(itertools.product(*[[True, False]] * 3)),
"missing_dataset, missing_collection, missing_proposal, policymethod",
list(