Commit 276aaa7a authored by Valentin Valls's avatar Valentin Valls
Browse files

Refactor test as reusable components

parent 8b953608
......@@ -6,6 +6,7 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import gevent
import contextlib
import gevent.event
import numpy
from bliss import setup_globals
......@@ -25,6 +26,61 @@ from bliss.scanning.group import Sequence
from bliss.config.streaming import DataStreamReaderStopHandler
@pytest.fixture
def test_observer(mocker):
"""Helper to check post mortem the events from the scans watcher"""
observer = mocker.Mock(spec=ScansObserver)
def read_scan_info(observer, method_name):
call = getattr(observer, method_name).call_args_list[0]
scan_info = call[0][1]
return scan_info
def read_scalar_data(observer, channel_name, scan_db_name):
data = []
for call_args in observer.on_scalar_data_received.call_args_list:
kwargs = call_args[1]
if channel_name is not None and kwargs["channel_name"] != channel_name:
continue
if scan_db_name is not None and kwargs["scan_db_name"] != scan_db_name:
continue
data.append(kwargs["data_bunch"])
if len(data) == 0:
return []
return numpy.concatenate(data)
observer.on_scan_created__scan_info = lambda: read_scan_info(
observer, "on_scan_created"
)
observer.on_scan_started__scan_info = lambda: read_scan_info(
observer, "on_scan_started"
)
observer.on_scan_finished__scan_info = lambda: read_scan_info(
observer, "on_scan_finished"
)
observer.on_scalar_data_received__get_data = lambda channel_name, scan_db_name: read_scalar_data(
observer, channel_name, scan_db_name
)
yield observer
@contextlib.contextmanager
def watching(session, observer, exclude_groups=False):
watcher = ScansWatcher(session.name)
watcher.set_exclude_existing_scans(True)
if not exclude_groups:
watcher.set_watch_scan_group(True)
watcher.set_observer(observer)
session_watcher = gevent.spawn(watcher.run)
watcher.wait_ready(timeout=3)
try:
yield watcher
finally:
gevent.sleep(0.5)
session_watcher.kill()
def test_simple_continuous_scan_with_session_watcher(session, scan_saving, mocker):
m1 = getattr(setup_globals, "m1")
......@@ -446,7 +502,6 @@ def test_scan_groups(default_session, mocker):
assert len(callbacks.scan_new_callback.call_args_list) == 2
def test_scan_observer(session, diode_acq_device_factory, mocker):
observer = mocker.Mock(spec=ScansObserver)
watcher = ScansWatcher(session.name)
watcher.set_exclude_existing_scans(True)
......@@ -456,6 +511,9 @@ def test_scan_observer(session, diode_acq_device_factory, mocker):
session_watcher = gevent.spawn(watcher.run)
watcher.wait_ready(timeout=3)
def test_scan_observer(
session, scan_saving, diode_acq_device_factory, test_observer, mocker
):
chain = AcquisitionChain()
acquisition_device_1, _ = diode_acq_device_factory.get(count_time=0.1, npoints=1)
master = SoftwareTimerMaster(0.1, npoints=1)
......@@ -465,31 +523,27 @@ def test_scan_observer(session, diode_acq_device_factory, mocker):
meta = {"kind": "foo"}
acquisition_device_1.fill_meta_at_scan_start = mocker.Mock(return_value=meta)
try:
with watching(session, test_observer):
scan.run()
finally:
gevent.sleep(0.5)
session_watcher.kill()
# TODO check the order of the received events
observer.on_scan_created.assert_called_once()
observer.on_scan_started.assert_called_once()
observer.on_scan_finished.assert_called_once()
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_scan_finished.assert_called_once()
# Check that the scan_info looks like what it is expected
sf = ScanInfo()
device_key = sf._get_key_from_acq_obj(acquisition_device_1)
call = observer.on_scan_created.call_args_list[0]
scan_info = call[0][1]
scan_info = test_observer.on_scan_created__scan_info()
assert scan_info["session_name"] == scan_saving.session
assert scan_info["user_name"] == scan_saving.user_name
assert "positioners_start" in scan_info["positioners"]
assert "start_timestamp" in scan_info
call = observer.on_scan_started.call_args_list[0]
scan_info = call[0][1]
scan_info = test_observer.on_scan_started__scan_info()
assert "kind" in scan_info["devices"][device_key]
call = observer.on_scan_finished.call_args_list[0]
scan_info = call[0][1]
scan_info = test_observer.on_scan_finished__scan_info()
assert "end_timestamp" in scan_info
assert "positioners_end" in scan_info["positioners"]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment