Commit 646f8e85 authored by Wout De Nolf's avatar Wout De Nolf Committed by Matias Guijarro
Browse files

[nexus writer tests] adapt to ESRF data policy

parent 68d53e44
......@@ -31,24 +31,25 @@ def test_inhouse_scan_saving(session, esrf_data_policy):
scan_saving_config = esrf_data_policy
assert scan_saving.beamline == scan_saving_config["beamline"]
assert scan_saving.proposal == f"{scan_saving.beamline}{time.strftime('%y%m')}"
assert scan_saving.data_root == scan_saving_config["inhouse_data_root"].replace(
"{beamline}", scan_saving.beamline
assert scan_saving.base_path == scan_saving_config["inhouse_data_root"].format(
beamline=scan_saving.beamline
)
assert scan_saving.sample == "default"
assert scan_saving.sample == "sample"
assert scan_saving.dataset == "0001"
scan_saving.template = "toto"
with pytest.raises(AttributeError):
scan_saving.template = "toto"
assert scan_saving.get_path() == os.path.join(
scan_saving.data_root,
scan_saving.base_path,
scan_saving.proposal,
scan_saving.beamline,
scan_saving.sample,
f"{scan_saving.sample}_{scan_saving.dataset}",
"toto",
)
scan_saving.sample = ""
assert scan_saving.sample == "default"
assert scan_saving.sample == "sample"
scan_saving.dataset = "dataset"
scan_saving.template = ""
with pytest.raises(AttributeError):
scan_saving.template = ""
assert scan_saving.get_path().endswith("dataset")
......@@ -56,7 +57,7 @@ def test_visitor_scan_saving(session, esrf_data_policy):
scan_saving = session.scan_saving
scan_saving_config = esrf_data_policy
scan_saving.proposal = "mx415"
assert scan_saving.data_root == scan_saving_config["visitor_data_root"]
assert scan_saving.base_path == scan_saving_config["visitor_data_root"]
def test_auto_dataset_increment(session, esrf_data_policy):
......@@ -95,7 +96,7 @@ def test_newproposal(
newproposal("mx415") # should reset sample and dataset
assert session.scan_saving.proposal == "mx415"
assert session.scan_saving.sample == "default"
assert session.scan_saving.sample == "sample"
assert session.scan_saving.dataset == "0001"
session.scan_saving.get()
......@@ -128,6 +129,9 @@ def test_data_policy_user_functions(session, esrf_data_policy):
newsample = session.env_dict["newsample"]
newdataset = session.env_dict["newdataset"]
assert scan_saving.proposal == f"{scan_saving.beamline}{time.strftime('%y%m')}"
assert scan_saving.sample == "sample"
assert scan_saving.dataset == "0001"
newproposal("toto")
assert scan_saving.proposal == "toto"
assert scan_saving.sample == "sample"
......@@ -142,5 +146,5 @@ def test_data_policy_user_functions(session, esrf_data_policy):
assert scan_saving.dataset == "tutu"
newproposal()
assert scan_saving.proposal == f"{scan_saving.beamline}{time.strftime('%y%m')}"
assert scan_saving.sample == "default"
assert scan_saving.sample == "sample"
assert scan_saving.dataset == "0001"
......@@ -13,175 +13,163 @@ from gevent import subprocess
from contextlib import contextmanager
from bliss.common import measurementgroup
from bliss.common.tango import DeviceProxy, DevFailed
from nexus_writer_service.utils import data_policy
from nexus_writer_service.subscribers.scan_writer_base import (
cli_saveoptions as base_options
)
from nexus_writer_service.subscribers.scan_writer_config import (
cli_saveoptions as config_options
)
from nexus_writer_service.subscribers.session_writer import all_cli_saveoptions
sys.path.append(os.path.join(os.path.dirname(__file__), "helpers"))
import nxw_test_config
import nxw_test_utils
base_options = dict(base_options)
base_options.pop("copy_non_external")
base_options.pop("enable_profiling")
config_options = dict(config_options)
config_options.pop("copy_non_external")
config_options.pop("enable_profiling")
config_writer_args = (
"--log=info",
"--redirectstdout",
"--redirectstderr",
"--copy_non_external",
)
base_writer_args = config_writer_args + ("--noconfig",)
writer_tango_properties = {"copy_nonhdf5_data": True}
writer_tango_attributes = {"resource_profiling": False}
def cliargs_logfiles(cliargs, tmpdir):
tmpdir = str(tmpdir)
return cliargs + (
"--logfileout={}".format(os.path.join(tmpdir, "writer.stdout.log")),
"--logfileerr={}".format(os.path.join(tmpdir, "writer.stderr.log")),
)
def setup_writer_session(session):
session.setup()
@pytest.fixture
def nexus_base_session(beacon, lima_simulator, lima_simulator2):
session = beacon.get("nexus_writer_base")
setup_writer_session(session)
yield session
session.close()
@pytest.fixture
def nexus_config_session(beacon, lima_simulator, lima_simulator2):
session = beacon.get("nexus_writer_config")
setup_writer_session(session)
def nexus_writer_session(
beacon,
metadata_experiment_tango_server,
metadata_manager_tango_server,
lima_simulator,
lima_simulator2,
):
"""Writer sessions with lots of different detectors and scan types
"""
session = beacon.get("nexus_writer_session")
session.setup()
yield session
session.close()
def scan_saving_nopolicy(session, scan_tmpdir):
scan_saving = session.scan_saving
scan_saving.base_path = str(scan_tmpdir)
scan_saving.data_filename = "{a}_{b}"
scan_saving.add("a", "a")
scan_saving.add("b", "b")
measurementgroup.set_active_name(nxw_test_config.technique["withoutpolicy"] + "MG")
def scan_saving_policy(session, scan_tmpdir):
scan_saving = session.scan_saving
data_policy.newtmpexperiment("prop123", root=str(scan_tmpdir))
scan_saving.add("sample", "sample")
scan_saving.add("dataset", "dataset")
scan_saving.add("technique", nxw_test_config.technique["withpolicy"])
measurementgroup.set_active_name(scan_saving.technique + "MG")
@pytest.fixture
def nexus_base_session_policy(nexus_base_session, scan_tmpdir):
scan_saving_policy(nexus_base_session, scan_tmpdir)
yield nexus_base_session, scan_tmpdir
@pytest.fixture
def nexus_base_session_nopolicy(nexus_base_session, scan_tmpdir):
scan_saving_nopolicy(nexus_base_session, scan_tmpdir)
yield nexus_base_session, scan_tmpdir
@pytest.fixture
def nexus_config_session_policy(nexus_config_session, scan_tmpdir):
scan_saving_policy(nexus_config_session, scan_tmpdir)
yield nexus_config_session, scan_tmpdir
def nexus_writer_base(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=False, alt=False, policy=True
) as info:
yield info
@pytest.fixture
def nexus_config_session_nopolicy(nexus_config_session, scan_tmpdir):
scan_saving_nopolicy(nexus_config_session, scan_tmpdir)
yield nexus_config_session, scan_tmpdir
def nexus_writer_base_nopolicy(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=False, alt=False, policy=False
) as info:
yield info
@pytest.fixture
def nexus_writer_base(nexus_base_session_policy, wait_for_fixture):
session, tmpdir = nexus_base_session_policy
cliargs = cliargs_logfiles(base_writer_args, tmpdir)
with writer_process(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
def nexus_writer_base_alt(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=False, alt=True, policy=True
) as info:
yield info
@pytest.fixture
def nexus_writer_base_nopolicy(nexus_base_session_nopolicy, wait_for_fixture):
session, tmpdir = nexus_base_session_nopolicy
cliargs = cliargs_logfiles(base_writer_args, tmpdir)
with writer_process(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
def nexus_writer_config(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=True, alt=False, policy=True
) as info:
yield info
@pytest.fixture
def nexus_writer_base_alt(nexus_base_session_policy, wait_for_fixture):
session, tmpdir = nexus_base_session_policy
cliargs = cliargs_logfiles(base_writer_args, tmpdir)
cliargs += tuple("--" + k for k in base_options if "--" + k not in cliargs)
with writer_process(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
def nexus_writer_config_nopolicy(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=True, alt=False, policy=False
) as info:
yield info
@pytest.fixture
def nexus_writer_config(nexus_config_session_policy, wait_for_fixture):
session, tmpdir = nexus_config_session_policy
cliargs = cliargs_logfiles(config_writer_args, tmpdir)
with writer_tango(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
def nexus_writer_config_alt(nexus_writer_session, scan_tmpdir):
"""Writer session with a Nexus writer
"""
with nexus_writer(
nexus_writer_session, scan_tmpdir, config=True, alt=True, policy=True
) as info:
yield info
@pytest.fixture
def nexus_writer_config_nopolicy(nexus_config_session_nopolicy, wait_for_fixture):
session, tmpdir = nexus_config_session_nopolicy
cliargs = cliargs_logfiles(config_writer_args, tmpdir)
with writer_tango(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
@contextmanager
def nexus_writer(session, tmpdir, config=True, alt=False, policy=True):
"""Nexus writer for this session
:param session:
:param tmpdir:
:param bool policy:
:param bool config:
:param bool alt:
:returns dict:
"""
info = {
"session": session,
"tmpdir": tmpdir,
"config": config,
"alt": alt,
"policy": policy,
}
prepare_scan_saving(**info)
with writer_tango(**info) as writer:
info["writer"] = writer
yield info
def prepare_scan_saving(session=None, tmpdir=None, policy=True, **kwargs):
"""Initialize scan saving so the tests save in `tmpdir`
with or without policy.
@pytest.fixture
def nexus_writer_config_alt(nexus_config_session_policy, wait_for_fixture):
session, tmpdir = nexus_config_session_policy
cliargs = cliargs_logfiles(config_writer_args, tmpdir)
cliargs += tuple("--" + k for k in config_options if "--" + k not in cliargs)
with writer_process(session, wait_for_fixture, cliargs) as writer:
yield {"session": session, "tmpdir": tmpdir, "writer": writer}
:param session:
:param tmpdir:
:param bool policy:
:param kwargs: ignored
"""
if policy:
tmpdir = str(tmpdir.join(session.name))
session.enable_esrf_data_policy()
scan_saving = session.scan_saving
scan_saving.writer = "nexus"
scan_saving_config = scan_saving.scan_saving_config
for k in ["inhouse_data_root", "visitor_data_root", "tmp_data_root"]:
scan_saving_config[k] = scan_saving_config[k].replace("/tmp/scans", tmpdir)
scan_saving.proposal = "testproposal"
technique = nxw_test_config.technique["withpolicy"]
scan_saving.technique = technique
measurementgroup.set_active_name(technique + "MG")
else:
tmpdir = str(tmpdir)
session.disable_esrf_data_policy()
scan_saving = session.scan_saving
scan_saving.writer = "nexus"
scan_saving.base_path = tmpdir
scan_saving.data_filename = "{a}_{b}"
scan_saving.add("a", "a")
scan_saving.add("b", "b")
technique = nxw_test_config.technique["withoutpolicy"]
measurementgroup.set_active_name(technique + "MG")
@contextmanager
def writer_tango(session, wait_for_fixture, cliargs):
def writer_tango(session=None, tmpdir=None, config=True, alt=False, **kwargs):
"""
Run external writer in a Tango server
Run external writer as a Tango server
:param session:
:param tmpdir:
:param callable wait_for_fixture:
:param sequence cliargs:
:param bool config:
:param bool alt:
:param kwargs: ignored
:returns PopenGreenlet:
"""
session.scan_saving.writer = "nexus"
env = {k: str(v) for k, v in os.environ.items()}
env["GEVENT_MONITOR_THREAD_ENABLE"] = "true"
env["GEVENT_MAX_BLOCKING_TIME"] = "1"
server_instance = "test"
cliargs = ("NexusWriterService", server_instance) + cliargs
env = writer_env()
server_instance = "testwriters"
cliargs = ("NexusWriterService", server_instance) + writer_cli_logargs(tmpdir)
# Register another writer with the TANGO database (testing concurrency):
# device_fqdn = nexus_register_writer.ensure_existence(
# session.name, instance=server_instance, family="dummy", use_existing=False
......@@ -192,7 +180,7 @@ def writer_tango(session, wait_for_fixture, cliargs):
with nxw_test_utils.popencontext(
cliargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
) as greenlet:
with gevent.Timeout(30, RuntimeError("Nexus writer is not running")):
with gevent.Timeout(10, RuntimeError("Nexus writer is not running")):
while True:
try:
dev_proxy = DeviceProxy(device_fqdn)
......@@ -203,34 +191,85 @@ def writer_tango(session, wait_for_fixture, cliargs):
break
gevent.sleep(0.1)
# Changing properties needs Init
dev_proxy.set_timeout_millis(10000)
dev_proxy.put_property(writer_tango_properties)
dev_proxy.Init()
properties, attributes = writer_options(tango=True, config=config, alt=alt)
if properties:
dev_proxy.set_timeout_millis(10000)
dev_proxy.put_property(properties)
dev_proxy.Init()
# Changing attributes does not need Init
for attr, value in writer_tango_attributes.items():
for attr, value in attributes.items():
dev_proxy.write_attribute(attr, value)
yield greenlet
@contextmanager
def writer_process(session, wait_for_fixture, cliargs):
def writer_process(session=None, tmpdir=None, config=True, alt=False, **kwargs):
"""
Run external writer in a python process
Run external writer as a python process
:param session:
:param callable wait_for_fixture:
:param sequence cliargs:
:param tmpdir:
:param bool config:
:param bool alt:
:param kwargs: ignored
:returns PopenGreenlet:
"""
session.scan_saving.writer = "hdf5"
env = {k: str(v) for k, v in os.environ.items()}
env["GEVENT_MONITOR_THREAD_ENABLE"] = "true"
env["GEVENT_MAX_BLOCKING_TIME"] = "1"
cliargs = ("NexusSessionWriter", session.name) + cliargs
env = writer_env()
cliargs = (
("NexusSessionWriter", session.name)
+ writer_cli_logargs(tmpdir)
+ writer_options(tango=False, config=config, alt=alt)
)
with nxw_test_utils.popencontext(
cliargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
) as greenlet:
with gevent.Timeout(30, RuntimeError("Nexus Writer not running")):
with gevent.Timeout(10, RuntimeError("Nexus Writer not running")):
while not greenlet.stdout_contains("Start listening"):
gevent.sleep(0.1)
yield greenlet
def writer_options(tango=True, config=True, alt=False, resource_profiling=False):
"""
:param bool tango: launch writer as process/tango server
:param bool config: writer uses/ignores extra Redis info
:param bool alt: anable all options (all disabled by default)
:param bool resource_profiling:
"""
fixed = ("copy_non_external", "resource_profiling", "noconfig")
options = all_cli_saveoptions(configurable=config)
if tango:
properties = {"copy_non_external": True}
attributes = {}
properties["noconfig"] = not config
attributes["resource_profiling"] = resource_profiling
properties.update({k: alt for k in options if k not in fixed})
else:
cliargs = ["--copy_non_external"]
if not config:
cliargs.append("--noconfig")
if resource_profiling:
cliargs.append("--resource_profiling")
if alt:
cliargs += ["--" + k for k in options if k not in fixed]
if tango:
return properties, attributes
else:
return tuple(cliargs)
def writer_cli_logargs(tmpdir):
return (
"--log=info",
"--redirectstdout",
"--redirectstderr",
"--logfileout={}".format(tmpdir.join("writer.stdout.log")),
"--logfileerr={}".format(tmpdir.join("writer.stderr.log")),
)
def writer_env():
env = {k: str(v) for k, v in os.environ.items()}
env["GEVENT_MONITOR_THREAD_ENABLE"] = "true"
env["GEVENT_MAX_BLOCKING_TIME"] = "1"
return env
......@@ -5,6 +5,7 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
import re
import numpy
from nexus_writer_service.utils import scan_utils
......@@ -24,7 +25,7 @@ def assert_scan_data(scan, **kwargs):
:param bliss.scanning.scan.Scan scan:
:param kwargs: see `validate_scan_data`
"""
# scan_utils.open_data(scan, subscan=subscan, config=config, block=True)
# scan_utils.open_data(scan, subscan=subscan, block=True)
validate_scan_data(scan, **kwargs)
......@@ -33,7 +34,7 @@ def assert_scangroup_data(sequence, **kwargs):
:param bliss.scanning.group.Sequence sequence:
:param kwargs: see `validate_scangroup_data`
"""
# scan_utils.open_data(sequence.scan, subscan=subscan, config=config, block=True)
# scan_utils.open_data(sequence.scan, subscan=subscan, block=True)
validate_scangroup_data(sequence, **kwargs)
......@@ -46,7 +47,7 @@ def validate_scan_data(
master_name="timer",
scan_shape=None,
config=True,
withpolicy=True,
policy=True,
alt=False,
):
"""
......@@ -59,7 +60,7 @@ def validate_scan_data(
:param str master_name: chain master name
:param tuple scan_shape: fast axis first 0D scan by default
:param bool config: configurable writer
:param bool withpolicy: data policy
:param bool policy: data policy
:param bool alt: alternative writer options
"""
# Parse arguments
......@@ -81,7 +82,7 @@ def validate_scan_data(
else:
masters = ("elapsed_time",)
assert len(scan_shape) == len(masters)
if withpolicy:
if policy:
det_technique = nxw_test_config.technique["withpolicy"]
else:
det_technique = nxw_test_config.technique["withoutpolicy"]
......@@ -100,12 +101,12 @@ def validate_scan_data(
# Validate NXentry links
validate_master_links(scan, config=config)
# Validate NXentry content
uri = scan_utils.scan_uri(scan, subscan=subscan, config=config)
uri = scan_utils.scan_uri(scan, subscan=subscan)
with nexus.uriContext(uri) as nxentry:
validate_nxentry(
nxentry,
config=config,
withpolicy=withpolicy,
policy=policy,
technique=scan_technique,
detectors=detectors,
notes=notes,
......@@ -126,7 +127,7 @@ def validate_scan_data(
scan_shape=scan_shape,
config=config,
masters=masters,
withpolicy=withpolicy,
policy=policy,
technique=det_technique,
save_options=save_options,
detectors=detectors,
......@@ -136,7 +137,7 @@ def validate_scan_data(
validate_plots(
nxentry,
config=config,
withpolicy=withpolicy,
policy=policy,
technique=scan_technique,
detectors=detectors,
scan_shape=scan_shape,
......@@ -147,7 +148,7 @@ def validate_scan_data(
nxentry,
technique=scan_technique,
config=config,
withpolicy=withpolicy,
policy=policy,
save_options=save_options,
detectors=detectors,
)
......@@ -162,9 +163,9 @@ def validate_scangroup_data(sequence, config=True, **kwargs):
# Validate NXentry links
validate_master_links(sequence.scan, config=config)
# Validate scan links (currently disabled)
# validate_scangroup_links(sequence, config=config)
# validate_scangroup_links(sequence)
# Validate NXentry content
uri = scan_utils.scan_uri(sequence.scan, config=config)
uri = scan_utils.scan_uri(sequence.scan)
with nexus.uriContext(uri) as nxentry:
# TODO: validate scan group NXentry (custom channels)
pass
......@@ -177,26 +178,29 @@ def validate_master_links(scan, subscan=1, config=True):
:param bliss.scanning.scan.Scan scan:
:param bool config: configurable writer
"""
uri = scan_utils.scan_uri(scan, subscan=subscan, config=config)
uri = scan_utils.scan_uri(scan, subscan=subscan)
uri = nexus.normUri(uri)
for filename in scan_utils.scan_filenames(scan, config=config):
with nexus.File(filename) as nxroot:
for key in nxroot:
if uri == nexus.normUri(nexus.getUri(nxroot[key])):
break
else:
assert False, uri
if config:
for filename in scan_utils.scan_filenames(scan, config=config).values():
with nexus.File(filename) as nxroot:
for key in nxroot:
if uri == nexus.normUri(nexus.getUri(nxroot[key])):
break
else:
assert False, uri
else:
for filename in scan_utils.scan_master_filenames(scan, config=True).values():
assert not os.path.exists(filename), filename
def validate_scangroup_links(sequence, config=True):
def validate_scangroup_links(sequence):
"""
:param bliss.scanning.scan.Scan sequence:
:param bool config: configurable writer