Commit cfb7aba5 authored by Benoit Formet's avatar Benoit Formet Committed by Matias Guijarro
Browse files

[test] enabling the ESRF data policy will set the proposal when no proposal is set.

This makes sense but wasn't the case before so tests needed to adapt.
parent 2cbc66e6
......@@ -11,7 +11,6 @@ import warnings
import collections
import functools
import inspect
import copy
import contextlib
from treelib import Tree
......@@ -21,6 +20,7 @@ from weakref import WeakKeyDictionary
from bliss import setup_globals, global_map, is_bliss_shell
from bliss.config import static
from bliss.config.settings import SimpleSetting
from bliss.config.channels import EventChannel
from bliss.config.conductor.client import get_text_file, get_python_modules, get_file
from bliss.common.proxy import Proxy
from bliss.common.logtools import log_warning
......@@ -169,6 +169,7 @@ class Session:
self.__scans = collections.deque(maxlen=20)
self.__user_script_homedir = SimpleSetting("%s:user_script_homedir" % self.name)
self._script_source_cache = WeakKeyDictionary()
self.__data_policy_events = EventChannel(f"{self.name}:esrf_data_policy")
self.init(config_tree)
......@@ -408,6 +409,12 @@ class Session:
def env_dict(self):
return self.__env_dict
def _emit_event(self, event, **kwargs):
if event in scan_saving.ESRFDataPolicyEvent:
self.__data_policy_events.post(dict(event_type=event, value=kwargs))
else:
raise NotImplementedError
def _set_scan_saving(self, cls=None):
scan_saving.set_scan_saving_class(cls)
self.scan_saving = scan_saving.ScanSaving(self.name)
......@@ -421,9 +428,17 @@ class Session:
def enable_esrf_data_policy(self):
self._set_scan_saving(cls=scan_saving.ESRFScanSaving)
self._emit_event(
scan_saving.ESRFDataPolicyEvent.Enable,
data_path=self.scan_saving.get_path(),
)
def disable_esrf_data_policy(self):
self._set_scan_saving()
self._emit_event(
scan_saving.ESRFDataPolicyEvent.Disable,
data_path=self.scan_saving.get_path(),
)
def _cache_script_source(self, obj):
""" Store source code of obj in cache for prdef """
......
......@@ -19,6 +19,7 @@ import traceback
from functools import wraps
import logging
import datetime
import enum
from bliss import current_session
from bliss.config.settings import ParametersWardrobe
......@@ -43,6 +44,12 @@ def set_scan_saving_class(klass):
_SCAN_SAVING_CLASS = klass
class ESRFDataPolicyEvent(enum.Enum):
Enable = "enabled"
Disable = "disabled"
Change = "changed"
logger = logging.getLogger(__name__)
......@@ -1159,27 +1166,35 @@ class ESRFScanSaving(BasicScanSaving):
def newproposal(self, proposal_name):
# beware: self.proposal getter and setter do different actions
self.proposal = proposal_name
lprint(f"Proposal set to '{self.proposal}'\nData path: {self.get_path()}")
lprint(f"Proposal set to '{self.proposal}'\nData path: {self.root_path}")
self._on_data_policy_changed(f"Proposal set to '{self.proposal}'")
def newsample(self, sample_name):
# beware: self.sample getter and setter do different actions
self.sample = sample_name
lprint(f"Sample set to '{self.sample}'\nData path: {self.root_path}")
self._on_data_policy_changed(f"Sample set to '{self.sample}'")
def newdataset(self, dataset_name):
# beware: self.dataset getter and setter do different actions
self.dataset = dataset_name
lprint(f"Dataset set to '{self.dataset}'\nData path: {self.root_path}")
self._on_data_policy_changed(f"Dataset set to '{self.dataset}'")
def endproposal(self):
"""Close the active dataset (if any) and go to the default inhouse proposal
"""
self.enddataset()
self._enddataset()
self._reset_proposal()
self._on_data_policy_changed(f"Proposal set to '{self.proposal}'")
def enddataset(self):
"""Close the active dataset (if any) and go the the next dataset
"""
self._enddataset()
self._on_data_policy_changed(f"Dataset set to '{self.dataset}'")
def _enddataset(self):
self.dataset = None
def _store_dataset(self):
......@@ -1194,3 +1209,8 @@ class ESRFScanSaving(BasicScanSaving):
dataset = self.dataset
self.icat_proxy.store_dataset(proposal, sample, dataset, path)
self._dataset = ""
def _on_data_policy_changed(self, event):
current_session._emit_event(
ESRFDataPolicyEvent.Change, message=event, data_path=self.root_path
)
......@@ -7,15 +7,16 @@
import pytest
import os
from bliss.common.session import set_current_session
from bliss.scanning.scan_saving import ESRFScanSaving, ESRFDataPolicyEvent
def _esrf_data_policy(session):
tmpdir = session.scan_saving.base_path
session.enable_esrf_data_policy()
def modify_esrf_policy_mount_points(scan_saving, base_path):
# Make sure all data saving mount points
# have tmpdir as root
scan_saving_config = session.scan_saving.scan_saving_config
# have base_path as root in the session's
# scan saving config (in memory)
assert isinstance(scan_saving, ESRFScanSaving)
scan_saving_config = scan_saving.scan_saving_config
roots = ["inhouse_data_root", "visitor_data_root", "tmp_data_root"]
for root in roots:
for prefix in ["", "icat_"]:
......@@ -24,18 +25,39 @@ def _esrf_data_policy(session):
if mount_points is None:
continue
elif isinstance(mount_points, str):
scan_saving_config[key] = mount_points.replace("/tmp/scans", tmpdir)
scan_saving_config[key] = mount_points.replace("/tmp/scans", base_path)
else:
for mp in mount_points:
mount_points[mp] = mount_points[mp].replace("/tmp/scans", tmpdir)
mount_points[mp] = mount_points[mp].replace("/tmp/scans", base_path)
def _esrf_data_policy(session):
# SCAN_SAVING uses the `current_session`
set_current_session(session, force=True)
assert session.name == session.scan_saving.session
# TODO: cannot use enable_esrf_data_policy directly because
# we need to modify the in-memory config before setting the proposal.
# If enable_esrf_data_policy changes however, we are in trouble.
tmpdir = session.scan_saving.base_path
session._set_scan_saving(cls=ESRFScanSaving)
modify_esrf_policy_mount_points(session.scan_saving, tmpdir)
# session.scan_saving.get_path() set the proposal to the default
# proposal and notify ICAT. When using the `icat_subscriber` fixture,
# this will be the first event.
session._emit_event(
ESRFDataPolicyEvent.Enable, data_path=session.scan_saving.get_path()
)
yield scan_saving_config
yield session.scan_saving.scan_saving_config
session.disable_esrf_data_policy()
@pytest.fixture
def esrf_data_policy(session):
def esrf_data_policy(session, metaexp_with_backend, metamgr_with_backend):
yield from _esrf_data_policy(session)
......@@ -49,5 +71,5 @@ def session2(beacon, scan_tmpdir):
@pytest.fixture
def esrf_data_policy2(session2):
def esrf_data_policy2(session2, metaexp_with_backend, metamgr_with_backend):
yield from _esrf_data_policy(session2)
......@@ -8,10 +8,13 @@
import pytest
import time
import os
import gevent
from bliss.common.standard import loopscan, mv
from bliss.common.utils import rounder
from bliss.common.tango import DevFailed
from bliss.common.session import set_current_session
from bliss.scanning.scan_saving import ESRFDataPolicyEvent
from bliss.config import channels
from bliss.shell.standard import (
newproposal,
newsample,
......@@ -71,9 +74,9 @@ def assert_logbook_received(
):
if not category:
category = "info"
print("\nWaiting of ICAT logbook message ...")
logbook_received = icat_logbook_subscriber.get(timeout=timeout)
print(f"Validating ICAT logbook message: {logbook_received}")
assert logbook_received["category"] == category
......@@ -95,7 +98,11 @@ def assert_logbook_received(
assert message in adict["text"]
def test_stomp_server(icat_publisher, icat_subscriber):
def assert_icat_received_current_proposal(scan_saving, icat_subscriber):
assert_icat_received(icat_subscriber, icat_info(scan_saving))
def test_stomp(icat_publisher, icat_subscriber):
icat_publisher.sendall(b"MYMESSAGE1\nMYMESSAGE2\n")
assert icat_subscriber.get(timeout=5) == "MYMESSAGE1"
assert icat_subscriber.get(timeout=5) == "MYMESSAGE2"
......@@ -106,27 +113,22 @@ def test_jolokia_server(jolokia_server):
pass
def test_icat_logbook_server(icat_logbook_server):
def test_icat_logbook_server(icat_logbook_subscriber):
# TODO: send test request
pass
def test_icat_backend(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
icat_logbook_subscriber,
def test_icat_backends(
session, icat_subscriber, icat_logbook_subscriber, esrf_data_policy
):
scan_saving = session.scan_saving
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.writer = "hdf5"
diode = session.config.get("diode")
newproposal("totoproposal")
expected = icat_info(scan_saving)
assert_icat_received(icat_subscriber, expected)
assert_logbook_received(icat_logbook_subscriber, "Proposal set to")
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
newdataset()
loopscan(1, .1, diode)
......@@ -136,18 +138,15 @@ def test_icat_backend(
assert_icat_received(icat_subscriber, expected)
def test_inhouse_scan_saving(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
):
def test_inhouse_scan_saving(session, icat_subscriber, esrf_data_policy):
scan_saving = session.scan_saving
scan_saving_config = esrf_data_policy
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
for bset in [False, True]:
if bset:
scan_saving.proposal = "blc123"
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
assert scan_saving.beamline == scan_saving_config["beamline"]
if bset:
assert scan_saving.proposal == "blc123"
......@@ -162,36 +161,25 @@ def test_inhouse_scan_saving(
"inhouse_data_root"
].format(beamline=scan_saving.beamline)
assert_default_sample_dataset(scan_saving)
expected = icat_info(scan_saving)
assert_icat_received(icat_subscriber, expected)
def test_visitor_scan_saving(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
):
def test_visitor_scan_saving(session, icat_subscriber, esrf_data_policy):
scan_saving = session.scan_saving
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.mount_point = "fs1"
scan_saving_config = esrf_data_policy
scan_saving.proposal = "mx415"
assert scan_saving.base_path == scan_saving_config["visitor_data_root"]["fs1"]
assert scan_saving.icat_base_path == scan_saving_config["visitor_data_root"]["fs1"]
assert_default_sample_dataset(scan_saving)
expected = icat_info(scan_saving)
assert_icat_received(icat_subscriber, expected)
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
def test_tmp_scan_saving(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
):
def test_tmp_scan_saving(session, icat_subscriber, esrf_data_policy):
scan_saving = session.scan_saving
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.mount_point = "fs1"
scan_saving_config = esrf_data_policy
scan_saving.proposal = "test123"
......@@ -204,8 +192,7 @@ def test_tmp_scan_saving(
)
assert scan_saving.icat_base_path == expected
assert_default_sample_dataset(scan_saving)
expected = icat_info(scan_saving)
assert_icat_received(icat_subscriber, expected)
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
def assert_default_sample_dataset(scan_saving):
......@@ -238,15 +225,9 @@ def create_dataset(scan_saving):
os.makedirs(path)
def test_auto_dataset_increment(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
):
def test_auto_dataset_increment(session, icat_subscriber, esrf_data_policy):
scan_saving = session.scan_saving
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected_dataset = icat_info(scan_saving, dataset=True)
assert scan_saving.dataset == "0001"
......@@ -280,28 +261,31 @@ def test_auto_dataset_increment(
def test_data_policy_scan_check_servers(
session,
icat_subscriber,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
nexus_writer_service,
icat_subscriber,
):
scan_saving = session.scan_saving
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
mdexp_dev_fqdn, mdexp_dev = metaexp_with_backend
mdmgr_dev_fqdn, mdmgr_dev = metamgr_with_backend
diode = session.env_dict["diode"]
default_proposal = f"{scan_saving.beamline}{time.strftime('%y%m')}"
expected = {
"proposal": "",
"proposal": default_proposal,
"sample": None,
"dataset": None,
"path": None,
"state": "OFF",
"state": "STANDBY",
}
assert_servers(mdexp_dev, mdmgr_dev, **expected)
scan_saving.proposal = "proposal1"
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected["proposal"] = "proposal1"
expected["state"] = "STANDBY"
assert_servers(mdexp_dev, mdmgr_dev, **expected)
......@@ -343,7 +327,7 @@ def test_data_policy_scan_check_servers(
assert_servers(mdexp_dev, mdmgr_dev, **expected)
scan_saving.proposal = "proposal2"
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected["proposal"] = "proposal2"
expected["path"] = None
expected["sample"] = None
......@@ -382,15 +366,10 @@ def assert_servers(
def test_data_policy_user_functions(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
icat_logbook_subscriber,
session, icat_subscriber, icat_logbook_subscriber, esrf_data_policy
):
scan_saving = session.scan_saving
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected_dataset = icat_info(scan_saving, dataset=True)
default_proposal = f"{scan_saving.beamline}{time.strftime('%y%m')}"
......@@ -402,7 +381,7 @@ def test_data_policy_user_functions(
newproposal("toto")
assert_logbook_received(icat_logbook_subscriber, "toto")
assert_icat_received(icat_subscriber, expected_dataset)
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected_dataset = icat_info(scan_saving, dataset=True)
assert scan_saving.proposal == "toto"
assert scan_saving.sample == "sample"
......@@ -430,7 +409,7 @@ def test_data_policy_user_functions(
newproposal()
assert_logbook_received(icat_logbook_subscriber, default_proposal)
assert_icat_received(icat_subscriber, expected_dataset)
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected_dataset = icat_info(scan_saving, dataset=True)
assert scan_saving.proposal == default_proposal
assert scan_saving.sample == "sample"
......@@ -456,7 +435,7 @@ def test_data_policy_user_functions(
newproposal("toto")
assert_logbook_received(icat_logbook_subscriber, "toto")
assert_icat_received(icat_subscriber, expected_dataset)
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
expected_dataset = icat_info(scan_saving, dataset=True)
assert scan_saving.proposal == "toto"
assert scan_saving.sample == "sample"
......@@ -465,15 +444,13 @@ def test_data_policy_user_functions(
endproposal()
assert_icat_received(icat_subscriber, expected_dataset)
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
assert scan_saving.proposal == default_proposal
assert scan_saving.sample == "sample"
assert scan_saving.dataset == "0005"
def test_data_policy_name_validation(
session, esrf_data_policy, metaexp_with_backend, metamgr_with_backend
):
def test_data_policy_name_validation(session, esrf_data_policy):
scan_saving = session.scan_saving
for name in ("with,", "with:", "with;"):
......@@ -497,9 +474,7 @@ def test_data_policy_name_validation(
assert scan_saving.dataset == "dataset_Name"
def test_session_scan_saving_clone(
session, esrf_data_policy, metaexp_with_backend, metamgr_with_backend
):
def test_session_scan_saving_clone(session, esrf_data_policy):
scan_saving = session.scan_saving
# just to create a tango dev proxy in scan saving
......@@ -519,9 +494,7 @@ def test_session_scan_saving_clone(
assert scan_saving2.proposal == "toto"
def test_mount_points(
session, esrf_data_policy, metaexp_with_backend, metamgr_with_backend
):
def test_mount_points(session, esrf_data_policy):
scan_saving = session.scan_saving
scan_saving_config = esrf_data_policy
......@@ -617,19 +590,15 @@ def test_mount_points(
def test_session_ending(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
icat_logbook_subscriber,
session, icat_subscriber, icat_logbook_subscriber, esrf_data_policy
):
scan_saving = session.scan_saving
default_proposal = f"{scan_saving.beamline}{time.strftime('%y%m')}"
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.newproposal("hg123")
assert_logbook_received(icat_logbook_subscriber, "hg123")
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.newsample("sample1")
assert_logbook_received(icat_logbook_subscriber, "sample1")
create_dataset(scan_saving)
......@@ -648,19 +617,61 @@ def test_session_ending(
scan_saving.endproposal()
assert_icat_received(icat_subscriber, expected_dataset)
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
assert scan_saving.proposal == default_proposal
assert scan_saving.sample == "sample"
assert scan_saving.dataset == "0001"
def test_date_in_basepath(
session,
esrf_data_policy,
metaexp_with_backend,
metamgr_with_backend,
icat_logbook_subscriber,
):
def test_data_policy_event(session, esrf_data_policy):
e = channels.EventChannel(f"{session.name}:esrf_data_policy")
full_event_list = list()
called_cbk = {"nb": 0}
called = gevent.event.Event()
def f(events_list):
# global last_event
called_cbk["nb"] += 1
full_event_list.extend(events_list)
called.set()
e.register_callback(f)
def wait_for_event_callback():
# give the hand to gevent loop and wait callback to be called
with gevent.Timeout(1):
called.wait()
called.clear()
scan_saving = session.scan_saving
wait_for_event_callback()
assert called_cbk["nb"] == 1
assert full_event_list[-1]["event_type"] == ESRFDataPolicyEvent.Enable
scan_saving.newproposal("hg123")
wait_for_event_callback()
assert called_cbk["nb"] == 2
assert full_event_list[-1]["event_type"] == ESRFDataPolicyEvent.Change
assert full_event_list[-1]["value"]["message"] == "Proposal set to 'hg123'"
scan_saving.newsample("sample1")
scan_saving.newdataset("42")
create_dataset(scan_saving)
scan_saving.enddataset()
create_dataset(scan_saving)
scan_saving.endproposal()
wait_for_event_callback()
assert len(full_event_list) == 5
wait_for_event_callback()
session.disable_esrf_data_policy()
wait_for_event_callback()
assert full_event_list[-1]["event_type"] == ESRFDataPolicyEvent.Disable
def test_date_in_basepath(session, icat_logbook_subscriber, esrf_data_policy):
# Put date in base path template:
scan_saving = session.scan_saving
new_base_path = os.path.join(scan_saving.base_path, "{date}")
......@@ -699,15 +710,12 @@ def test_date_in_basepath(
def test_parallel_sessions(
beacon,
session,
session2,
esrf_data_policy,
esrf_data_policy2,
metaexp_with_backend,
metamgr_with_backend,
icat_subscriber,
icat_logbook_subscriber,
esrf_data_policy,
esrf_data_policy2,
):
# SCAN_SAVING uses the `current_session`
......@@ -722,23 +730,23 @@ def test_parallel_sessions(
scan_saving = get_scan_saving1()
assert scan_saving.session == "test_session"
assert scan_saving.dataset == "0001"
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving = get_scan_saving2()
assert scan_saving.session == "test_session2"
assert scan_saving.dataset == "0002"
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving = get_scan_saving1()
scan_saving.newproposal("blc123")
assert_logbook_received(icat_logbook_subscriber, "blc123")
assert_icat_received(icat_subscriber, icat_info(scan_saving))
assert_icat_received_current_proposal(scan_saving, icat_subscriber)
assert scan_saving.dataset == "0001"
scan_saving = get_scan_saving2()