Commit 4fea200b authored by Wout De Nolf's avatar Wout De Nolf
Browse files

data policy helpers for the entire test suite

parent 972b29b7
from bliss.common.session import set_current_session
from bliss.scanning.scan_saving import ESRFScanSaving, ESRFDataPolicyEvent
def set_esrf_config(scan_saving, base_path):
# Make sure all data saving mount points
# have base_path as root in the session's
# scan saving config (in memory)
assert isinstance(scan_saving, ESRFScanSaving)
scan_saving_config = scan_saving.scan_saving_config
roots = ["inhouse_data_root", "visitor_data_root", "tmp_data_root"]
for root in roots:
for prefix in ["", "icat_"]:
key = prefix + root
mount_points = scan_saving_config.get(key, None)
if mount_points is None:
continue
elif isinstance(mount_points, str):
scan_saving_config[key] = mount_points.replace("/tmp/scans", base_path)
else:
for mp in mount_points:
mount_points[mp] = mount_points[mp].replace("/tmp/scans", base_path)
def set_esrf_data_policy(session):
# SCAN_SAVING uses the `current_session`
set_current_session(session, force=True)
assert session.name == session.scan_saving.session
# TODO: cannot use enable_esrf_data_policy directly because
# we need to modify the in-memory config before setting the proposal.
# If enable_esrf_data_policy changes however, we are in trouble.
tmpdir = session.scan_saving.base_path
session._set_scan_saving(cls=ESRFScanSaving)
set_esrf_config(session.scan_saving, tmpdir)
# session.scan_saving.get_path() set the proposal to the default
# proposal and notify ICAT. When using the `icat_subscriber` fixture,
# this will be the first event.
session._emit_event(
ESRFDataPolicyEvent.Enable, data_path=session.scan_saving.get_path()
)
def set_basic_data_policy(session):
session.disable_esrf_data_policy()
def set_data_policy(session, policy):
if policy == "basic":
set_basic_data_policy(session)
elif policy == "esrf":
set_esrf_data_policy(session)
else:
ValueError(policy, "Unsupported data policy")
......@@ -6,63 +6,17 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
from bliss.common.session import set_current_session
from bliss.scanning.scan_saving import ESRFScanSaving, ESRFDataPolicyEvent
def modify_esrf_policy_mount_points(scan_saving, base_path):
# Make sure all data saving mount points
# have base_path as root in the session's
# scan saving config (in memory)
assert isinstance(scan_saving, ESRFScanSaving)
scan_saving_config = scan_saving.scan_saving_config
roots = ["inhouse_data_root", "visitor_data_root", "tmp_data_root"]
for root in roots:
for prefix in ["", "icat_"]:
key = prefix + root
mount_points = scan_saving_config.get(key, None)
if mount_points is None:
continue
elif isinstance(mount_points, str):
scan_saving_config[key] = mount_points.replace("/tmp/scans", base_path)
else:
for mp in mount_points:
mount_points[mp] = mount_points[mp].replace("/tmp/scans", base_path)
def _esrf_data_policy(session):
# SCAN_SAVING uses the `current_session`
set_current_session(session, force=True)
assert session.name == session.scan_saving.session
# TODO: cannot use enable_esrf_data_policy directly because
# we need to modify the in-memory config before setting the proposal.
# If enable_esrf_data_policy changes however, we are in trouble.
tmpdir = session.scan_saving.base_path
session._set_scan_saving(cls=ESRFScanSaving)
modify_esrf_policy_mount_points(session.scan_saving, tmpdir)
# session.scan_saving.get_path() set the proposal to the default
# proposal and notify ICAT. When using the `icat_subscriber` fixture,
# this will be the first event.
session._emit_event(
ESRFDataPolicyEvent.Enable, data_path=session.scan_saving.get_path()
)
yield session.scan_saving.scan_saving_config
session.disable_esrf_data_policy()
from ..data_policies import set_data_policy
@pytest.fixture
def esrf_data_policy(session, icat_backend):
yield from _esrf_data_policy(session)
set_data_policy(session, "esrf")
@pytest.fixture
def esrf_data_policy_tango(session, icat_tango_backend):
yield from _esrf_data_policy(session)
set_data_policy(session, "esrf")
@pytest.fixture
......@@ -76,9 +30,9 @@ def session2(beacon, scan_tmpdir):
@pytest.fixture
def esrf_data_policy2(session2, icat_backend):
yield from _esrf_data_policy(session2)
set_data_policy(session2, "esrf")
@pytest.fixture
def esrf_data_policy2_tango(session2, icat_tango_backend):
yield from _esrf_data_policy(session2)
set_data_policy(session2, "esrf")
......@@ -78,7 +78,7 @@ def test_icat_backends(
def test_inhouse_scan_saving(session, icat_subscriber, esrf_data_policy):
scan_saving = session.scan_saving
scan_saving_config = esrf_data_policy
scan_saving_config = scan_saving.scan_saving_config
icat_test_utils.assert_icat_received_current_proposal(scan_saving, icat_subscriber)
for bset in [False, True]:
......@@ -109,7 +109,7 @@ def test_visitor_scan_saving(session, icat_subscriber, esrf_data_policy):
icat_test_utils.assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.mount_point = "fs1"
scan_saving_config = esrf_data_policy
scan_saving_config = scan_saving.scan_saving_config
scan_saving.proposal_name = "mx415"
assert scan_saving.base_path == scan_saving_config["visitor_data_root"]["fs1"]
assert scan_saving.icat_base_path == scan_saving_config["visitor_data_root"]["fs1"]
......@@ -122,7 +122,7 @@ def test_tmp_scan_saving(session, icat_subscriber, esrf_data_policy):
icat_test_utils.assert_icat_received_current_proposal(scan_saving, icat_subscriber)
scan_saving.mount_point = "fs1"
scan_saving_config = esrf_data_policy
scan_saving_config = scan_saving.scan_saving_config
scan_saving.proposal_name = "test123"
expected = scan_saving_config["tmp_data_root"]["fs1"].format(
beamline=scan_saving.beamline
......@@ -913,7 +913,7 @@ def test_session_scan_saving_clone(session, esrf_data_policy):
def test_mount_points(session, esrf_data_policy):
scan_saving = session.scan_saving
scan_saving_config = esrf_data_policy
scan_saving_config = scan_saving.scan_saving_config
# Test setting mount points
assert scan_saving.mount_points == {"", "fs1", "fs2", "fs3"}
......
......@@ -8,15 +8,15 @@
import pytest
import os
import gevent
from gevent import subprocess
from contextlib import contextmanager
from bliss.common import measurementgroup
from bliss.common.tango import DevState, Database
from nexus_writer_service.subscribers.session_writer import all_cli_saveoptions
from bliss.tango.clients.utils import wait_tango_device
from tests.nexus_writer.helpers import nxw_test_config
from tests.nexus_writer.helpers import nxw_test_utils
from .helpers import nxw_test_config
from .helpers import nxw_test_utils
from ..data_policies import set_esrf_config
@pytest.fixture
......@@ -189,22 +189,8 @@ def prepare_scan_saving(session=None, tmpdir=None, policy=True, **kwargs):
tmpdir = str(tmpdir.join(session.name))
session.enable_esrf_data_policy()
scan_saving = session.scan_saving
scan_saving.writer = "nexus"
scan_saving_config = scan_saving.scan_saving_config
roots = ["inhouse_data_root", "visitor_data_root", "tmp_data_root"]
for root in roots:
for prefix in ["", "icat_"]:
key = prefix + root
mount_points = scan_saving_config.get(key, None)
if mount_points is None:
continue
elif isinstance(mount_points, str):
scan_saving_config[key] = mount_points.replace("/tmp/scans", tmpdir)
else:
for mp in mount_points:
mount_points[mp] = mount_points[mp].replace(
"/tmp/scans", tmpdir
)
set_esrf_config(scan_saving, tmpdir)
assert scan_saving.writer == "nexus"
scan_saving.proposal_name = "testproposal"
technique = nxw_test_config.technique["withpolicy"]
scan_saving.proposal.all.definition = technique
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment