Commit e3a0ac68 authored by Valentin Valls's avatar Valentin Valls
Browse files

Rework the whole tests as walker/observer

parent 276aaa7a
......@@ -7,8 +7,8 @@
import gevent
import contextlib
import gevent.event
import numpy
import pytest
from bliss import setup_globals
from bliss.controllers.lima.lima_base import Lima
from bliss.scanning.toolbox import ChainBuilder
......@@ -19,11 +19,10 @@ from bliss.scanning.acquisition.mca import McaAcquisitionSlave
from bliss.scanning.acquisition.motor import LinearStepTriggerMaster
from bliss.scanning.scan import Scan
from bliss.scanning.scan_info import ScanInfo
from bliss.data.scan import watch_session_scans, ScansObserver, ScansWatcher
from bliss.data.scan import ScansObserver, ScansWatcher
from bliss.scanning.chain import AcquisitionChain
from bliss.common import scans
from bliss.scanning.group import Sequence
from bliss.config.streaming import DataStreamReaderStopHandler
@pytest.fixture
......@@ -81,53 +80,25 @@ def watching(session, observer, exclude_groups=False):
session_watcher.kill()
def test_simple_continuous_scan_with_session_watcher(session, scan_saving, mocker):
def test_simple_continuous_scan_with_session_watcher(session, test_observer):
m1 = getattr(setup_globals, "m1")
counter = getattr(setup_globals, "diode")
scan_saving.template = "toto"
master = SoftwarePositionTriggerMaster(m1, 0, 1, 10, time=1)
acq_dev = SamplingCounterAcquisitionSlave(counter, count_time=0.01, npoints=10)
chain = AcquisitionChain()
chain.add(master, acq_dev)
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
callbacks.scan_end_callback(*args)
watcher_ready_event = gevent.event.Event()
session_watcher = gevent.spawn(
watch_session_scans,
scan_saving.session,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=watcher_ready_event,
exclude_existing_scans=False,
)
try:
assert watcher_ready_event.wait(3)
with watching(session, test_observer):
scan = Scan(chain, save=False)
scan.run()
assert end_scan_event.wait(3)
finally:
session_watcher.kill()
callbacks.scan_new_callback.assert_called_once()
callbacks.scan_new_child_callback.assert_called()
callbacks.scan_data_callback.assert_called()
callbacks.scan_end_callback.assert_called_once()
scan_info = callbacks.scan_new_callback.call_args[0][0]
assert scan_info["session_name"] == scan_saving.session
assert scan_info["user_name"] == scan_saving.user_name
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_scalar_data_received.assert_called()
test_observer.on_scan_finished.assert_called_once()
scan_info = test_observer.on_scan_created__scan_info()
acquisition_chain = scan_info["acquisition_chain"][master.name]
assert len(acquisition_chain["devices"]) == 2
assert acquisition_chain["scalars"] == [
......@@ -147,77 +118,38 @@ def test_simple_continuous_scan_with_session_watcher(session, scan_saving, mocke
},
"%s:m1" % master.name: {"display_name": "m1", "dim": 0},
}
scan_data_m1 = []
for call_args in test_observer.on_scalar_data_received.call_args_list:
kwargs = call_args[1]
if kwargs["channel_name"] == "axis:m1":
scan_data_m1.append(kwargs["data_bunch"])
for call_args in callbacks.scan_data_callback.call_args_list:
dtype, master_name, data = call_args[0][0], call_args[0][1], call_args[0][2]
assert dtype == "0d"
assert master_name == master.name
scan_data_m1 = data["data"]["axis:m1"]
scan_data_m1 = numpy.concatenate(scan_data_m1)
assert numpy.allclose(scan_data_m1, master._positions, atol=1e-1)
def test_mca_with_watcher(session, mocker):
def test_mca_with_watcher(session, test_observer):
m0 = session.config.get("roby")
# Get mca
simu = session.config.get("simu1")
mca_device = McaAcquisitionSlave(*simu.counters, npoints=3, preset_time=0.1)
# Create chain
chain = AcquisitionChain()
chain.add(LinearStepTriggerMaster(3, m0, 0, 1), mca_device)
# Run scan
scan = Scan(chain, "mca_test", save=False)
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
callbacks.scan_end_callback(*args)
watcher_ready_event = gevent.event.Event()
session_watcher = gevent.spawn(
watch_session_scans,
session.name,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=watcher_ready_event,
exclude_existing_scans=False,
)
try:
assert watcher_ready_event.wait(3)
with watching(session, test_observer):
scan.run()
assert end_scan_event.wait(3)
finally:
session_watcher.kill()
callbacks.scan_new_callback.assert_called_once()
callbacks.scan_new_child_callback.assert_called()
callbacks.scan_data_callback.assert_called()
callbacks.scan_end_callback.assert_called_once()
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_ndim_data_received.assert_called()
test_observer.on_scan_finished.assert_called_once()
def test_limatake_with_watcher(session, lima_simulator, mocker):
def test_limatake_with_watcher(session, lima_simulator, test_observer):
lima_simulator = session.config.get("lima_simulator")
ff = lima_simulator.saving.file_format
lima_simulator.saving.file_format = "HDF5"
scan_info = {
"npoints": 1,
"count_time": 0.01,
"type": "loopscan",
"save": False,
"title": "limatake",
"sleep_time": None,
"start": [],
"stop": [],
"saving_statistics_history": 1,
}
lima_simulator.saving.file_format = ff
lima_params = {
"acq_nb_frames": 1,
......@@ -230,46 +162,18 @@ def test_limatake_with_watcher(session, lima_simulator, mocker):
chain = AcquisitionChain(parallel_prepare=True)
builder = ChainBuilder([lima_simulator])
for node in builder.get_nodes_by_controller_type(Lima):
node.set_parameters(acq_params=lima_params)
chain.add(node)
scan = Scan(chain, name="limatake", save=False)
scan = Scan(chain, scan_info=scan_info, name="limatake", save=False)
lima_simulator.saving.file_format = ff
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
callbacks.scan_end_callback(*args)
watcher_ready_event = gevent.event.Event()
session_watcher = gevent.spawn(
watch_session_scans,
session.name,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=watcher_ready_event,
exclude_existing_scans=False,
)
try:
assert watcher_ready_event.wait(3)
with watching(session, test_observer):
scan.run()
assert end_scan_event.wait(3)
finally:
session_watcher.kill()
callbacks.scan_new_callback.assert_called_once()
callbacks.scan_new_child_callback.assert_called()
callbacks.scan_data_callback.assert_called()
callbacks.scan_end_callback.assert_called_once()
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_lima_ref_received.assert_called()
test_observer.on_scan_finished.assert_called_once()
def test_data_watch_callback(session, diode_acq_device_factory, mocker):
......@@ -303,7 +207,7 @@ def test_data_watch_callback(session, diode_acq_device_factory, mocker):
assert all([cb.SCAN_NEW, cb.SCAN_DATA, cb.SCAN_END])
def test_parallel_scans(default_session, mocker):
def test_parallel_scans(default_session, test_observer):
diode = default_session.config.get("diode")
sim_ct_gauss = default_session.config.get("sim_ct_gauss")
robz = default_session.config.get("robz")
......@@ -311,205 +215,85 @@ def test_parallel_scans(default_session, mocker):
s1 = scans.loopscan(20, .1, diode, run=False)
s2 = scans.ascan(robz, 0, 10, 25, .09, sim_ct_gauss, run=False)
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
ready_event = gevent.event.Event()
def end(*args):
callbacks.scan_end_callback(*args)
if len(callbacks.scan_end_callback.call_args_list) == 2:
end_scan_event.set()
session_watcher = gevent.spawn(
watch_session_scans,
default_session.name,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=ready_event,
exclude_existing_scans=False,
)
assert ready_event.wait(3.)
g1 = gevent.spawn(s1.run)
g2 = gevent.spawn(s2.run)
gs = [g1, g2]
try:
with watching(default_session, test_observer):
g1 = gevent.spawn(s1.run)
g2 = gevent.spawn(s2.run)
gs = [g1, g2]
gevent.joinall(gs, raise_error=True)
assert end_scan_event.wait(3.)
finally:
session_watcher.kill()
assert len(callbacks.scan_new_callback.call_args_list) == 2
callbacks.scan_new_child_callback.assert_called()
callbacks.scan_data_callback.assert_called()
assert len(callbacks.scan_end_callback.call_args_list) == 2
assert len(test_observer.on_scan_created.call_args_list) == 2
assert len(test_observer.on_scan_started.call_args_list) == 2
assert len(test_observer.on_scan_finished.call_args_list) == 2
loopscan_data = [
args[0][2]["data"]
for args in callbacks.scan_data_callback.call_args_list
if args[0][2]["scan_info"]["type"] == "loopscan"
]
ascan_data = [
args[0][2]["data"]
for args in callbacks.scan_data_callback.call_args_list
if args[0][2]["scan_info"]["type"] == "ascan"
]
scan_info_list = [c[0][1] for c in test_observer.on_scan_created.call_args_list]
expected_keys = [
"timer:epoch",
"timer:elapsed_time",
"axis:robz",
"simulation_counter_controller:sim_ct_gauss",
]
assert ascan_data[-1].keys() == set(expected_keys)
loopscan_scan_info = [si for si in scan_info_list if si["type"] == "loopscan"][0]
loopscan_id = loopscan_scan_info["node_name"]
ascan_scan_info = [si for si in scan_info_list if si["type"] == "ascan"][0]
ascan_id = ascan_scan_info["node_name"]
expected_keys = [
"timer:epoch",
"timer:elapsed_time",
"simulation_diode_sampling_controller:diode",
]
assert loopscan_data[-1].keys() == set(expected_keys)
loopscan_data = test_observer.on_scalar_data_received__get_data(
channel_name="simulation_diode_sampling_controller:diode",
scan_db_name=loopscan_id,
)
for name, array in ascan_data[-1].items():
assert len(array) == 26, name
for name, array in loopscan_data[-1].items():
assert len(array) == 20, name
ascan_data = test_observer.on_scalar_data_received__get_data(
channel_name="simulation_counter_controller:sim_ct_gauss", scan_db_name=ascan_id
)
assert len(ascan_data) == 26
assert len(loopscan_data) == 20
def test_sequence_scans(default_session, mocker):
diode = default_session.config.get("diode")
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
ready_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
callbacks.scan_end_callback(*args)
session_watcher = gevent.spawn(
watch_session_scans,
default_session.name,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=ready_event,
exclude_existing_scans=False,
)
def test_scan_sequence(default_session, mocker, test_observer):
diode = default_session.config.get("diode")
assert ready_event.wait(timeout=3.)
try:
with watching(default_session, test_observer):
seq = Sequence()
with seq.sequence_context() as scan_seq:
s1 = scans.loopscan(5, .1, diode, run=False)
scan_seq.add(s1)
s1.run()
finally:
session_watcher.kill()
# check that end of group is not received
# assert len(end_scan_args) ==1
gevent.sleep(.5)
session_watcher.get()
assert len(callbacks.scan_new_callback.call_args_list) == 1
def test_stop_handler(session, scan_saving, diode_acq_device_factory):
chain = AcquisitionChain()
acquisition_device_1, _ = diode_acq_device_factory.get(count_time=0.1, npoints=1)
master = SoftwareTimerMaster(0.1, npoints=1)
chain.add(master, acquisition_device_1)
watcher_ready_event = gevent.event.Event()
end_scan_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
stop_handler = DataStreamReaderStopHandler()
session_watcher = gevent.spawn(
watch_session_scans,
scan_saving.session,
lambda *args: None,
lambda *args: None,
lambda *args: None,
end,
ready_event=watcher_ready_event,
stop_handler=stop_handler,
exclude_existing_scans=False,
)
try:
assert watcher_ready_event.wait(3)
scan = Scan(chain, save=False)
scan.run()
assert end_scan_event.wait(3)
finally:
try:
with gevent.Timeout(seconds=3):
stop_handler.stop()
session_watcher.join()
except gevent.Timeout:
session_watcher.kill()
assert False, "A kill is not expected here"
assert len(test_observer.on_scan_created.call_args_list) == 2
assert len(test_observer.on_scan_started.call_args_list) == 2
assert len(test_observer.on_scan_finished.call_args_list) == 2
def test_scan_groups(default_session, mocker):
def test_scan_sequence_excluding_groups(default_session, test_observer):
diode = default_session.config.get("diode")
callbacks = mocker.Mock()
end_scan_event = gevent.event.Event()
ready_event = gevent.event.Event()
def end(*args):
end_scan_event.set()
callbacks.scan_end_callback(*args)
session_watcher = gevent.spawn(
watch_session_scans,
default_session.name,
callbacks.scan_new_callback,
callbacks.scan_new_child_callback,
callbacks.scan_data_callback,
end,
ready_event=ready_event,
exclude_existing_scans=False,
watch_scan_group=True,
)
assert ready_event.wait(timeout=3.)
try:
with watching(default_session, test_observer, exclude_groups=True):
seq = Sequence()
with seq.sequence_context() as scan_seq:
s1 = scans.loopscan(5, .1, diode, run=False)
scan_seq.add(s1)
s1.run()
finally:
session_watcher.kill()
# check that end of group is not received
gevent.sleep(.5)
session_watcher.get()
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_scan_finished.assert_called_once()
assert len(callbacks.scan_new_callback.call_args_list) == 2
def test_watcher_stop(session, diode_acq_device_factory, test_observer):
chain = AcquisitionChain()
acquisition_device_1, _ = diode_acq_device_factory.get(count_time=0.1, npoints=100)
master = SoftwareTimerMaster(0.1, npoints=1)
chain.add(master, acquisition_device_1)
observer = mocker.Mock(spec=ScansObserver)
watcher = ScansWatcher(session.name)
watcher.set_exclude_existing_scans(True)
watcher.set_watch_scan_group(True)
watcher.set_observer(observer)
try:
with watching(session, test_observer) as watcher:
scan = Scan(chain, save=False)
g = gevent.spawn(scan.run)
gevent.sleep(3)
watcher.stop()
test_observer.on_scan_created.assert_called_once()
test_observer.on_scan_started.assert_called_once()
test_observer.on_scan_finished.assert_not_called()
finally:
g.kill()
session_watcher = gevent.spawn(watcher.run)
watcher.wait_ready(timeout=3)
def test_scan_observer(
session, scan_saving, diode_acq_device_factory, test_observer, mocker
......@@ -540,6 +324,7 @@ def test_scan_observer(
assert scan_info["user_name"] == scan_saving.user_name
assert "positioners_start" in scan_info["positioners"]
assert "start_timestamp" in scan_info
assert device_key in scan_info["devices"]
scan_info = test_observer.on_scan_started__scan_info()
assert "kind" in scan_info["devices"][device_key]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment