Skip to content
Snippets Groups Projects
Commit 484bc499 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

timescan writer test: takes much longer for a timescan to

progress on CI than before. Reduce the number of detectors.
parent b2f5b4d0
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import time
import gevent
import pytest
from bliss.common import scans
......@@ -44,12 +45,13 @@ def test_nxw_timescan_base_nopolicy(nexus_writer_base_nopolicy):
@nxw_test_utils.writer_stdout_on_exception
def _test_nxw_timescan(session=None, tmpdir=None, writer=None, **kwargs):
nodes = {}
nodes = dict()
started = gevent.event.Event()
nminevents = 5
detectors = ["diode3", "simu1", "lima_simulator"]
session.env_dict["simu1"].block_size = 1
session.env_dict["simu2"].block_size = 1
scan_detectors = [session.env_dict[d] for d in detectors]
scan_detectors.append(session.env_dict["lima_simulator"].bpm)
def listenscan(scannode):
print(f"Listen to scan {scannode.db_name}")
......@@ -78,16 +80,17 @@ def _test_nxw_timescan(session=None, tmpdir=None, writer=None, **kwargs):
g.kill()
glisten = gevent.spawn(listensession)
scan = scans.timescan(0.1, run=False)
scan = scans.timescan(0.1, *scan_detectors, run=False)
gscan = nxw_test_utils.run_scan(scan, runasync=True)
try:
with gevent.Timeout(30) as total_timeout:
# Wait until first channel has data
print("Wait for the first NEW_DATA event ...")
t0 = time.time()
started.wait()
# Wait until all channels have nmin data events
while any(v <= nminevents for v in nodes.values()):
while any(v < nminevents for v in nodes.values()):
lst = set(nodes.values())
nmin = min(lst)
nmax = max(lst)
......@@ -104,7 +107,9 @@ def _test_nxw_timescan(session=None, tmpdir=None, writer=None, **kwargs):
continue
else:
raise RuntimeError("Scan stopped without exception")
print(f"All {len(nodes)} channels have at least {nminevents} data events.")
print(
f"All {len(nodes)} channels have at least {nminevents} data events (took {time.time()-t0:.02f} seconds)."
)
finally:
try:
# Stop scan and listener
......@@ -126,5 +131,9 @@ def _test_nxw_timescan(session=None, tmpdir=None, writer=None, **kwargs):
print("Verify data ...")
nxw_test_utils.wait_scan_data_finished([scan], writer=writer)
nxw_test_data.assert_scan_data(
scan, scan_shape=(0,), positioners=[["elapsed_time", "epoch"]], **kwargs
scan,
scan_shape=(0,),
positioners=[["elapsed_time", "epoch"]],
detectors=detectors,
**kwargs,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment