Commit 50a191a3 authored by Jose Tiago Coutinho Macara's avatar Jose Tiago Coutinho Macara Committed by Alejandro Homs Puron
Browse files

ct2: fix p201 tests to run from command line

parent 7e0382e2
......@@ -9,12 +9,14 @@ import pytest
def pytest_collection_modifyitems(config, items):
try:
if config.getoption("--pepu"):
return
except ValueError:
return
# Remove pepu tests if no pepu option is provided
for item in list(items):
if "pepu" in item.keywords:
items.remove(item)
devices = ["pepu", "ct2"]
for name in devices:
try:
if config.getoption("--%s" % name):
continue
except ValueError:
continue
# Remove device tests if no option is provided
for item in list(items):
if name in item.keywords:
items.remove(item)
......@@ -5,12 +5,18 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
from __future__ import division
"""P201 hardware tests
* For now only tests that don't require external trigger
* Need a P201 card installed on the PC where tests are run
Run with:
$ pytest -c /dev/null tests/controllers_hw/test_ct2_acq.py -v \
--cov bliss.controllers.ct2 --cov-report html --cov-report term
To change PC replace in `../test-configuration/ct2.yml` the address
of the P201 card
"""
import time
......@@ -23,12 +29,16 @@ import zerorpc
from gevent import sleep, spawn
from gevent.event import Event
from bliss.common import subprocess
from bliss.common.event import dispatcher
from bliss.controllers.ct2.client import create_and_configure_device
from bliss.controllers.ct2.device import AcqMode, AcqStatus, StatusSignal
CT2_PORT = 9909
TEN_KHz = 10000
ERROR_MARGIN = 10E-2 # 100ms
ERROR_MARGIN = 10E-3 # 10ms
MIN_HARD_EXPO_TIME = 10e-6 # 10 us
class EventReceiver(object):
......@@ -56,163 +66,180 @@ class EventReceiver(object):
dispatcher.disconnect(self, sender=self.device)
@pytest.fixture(params=[1, 10, 100], ids=["1 point", "10 points", "100 points"])
def device(request, beacon):
"""
Requirements to run this fixture:
hardware: P201 card installed in lid00c
software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
beacon.reload()
device = beacon.get("p201")
device.timer_freq = 12.5E6
device.acq_nb_points = request.param
yield device
device.close()
def data_tests(device, expected_data):
@pytest.fixture(
params=[1, 30, 853, 9293, 563859],
ids=["1 point", "30 points", "853 points", "9_293 points", "563_859 points"],
)
def ct2(request):
address = request.config.getoption("--ct2")
port = str(CT2_PORT)
tcp_address = "tcp://localhost:" + port
args = ["bliss-ct2-server", "--address=" + address, "--port=" + port]
proc = subprocess.Popen(args)
time.sleep(0.5) # wait for bliss-ct2-server to be ready
assert proc.returncode is None
cfg = {
"name": "p201_01",
"class": "CT2",
"type": "P201",
"address": tcp_address,
"timer": dict(counter_name="sec"),
"channels": [
dict(counter_name="mon", address=3),
dict(counter_name="det", address=5),
dict(counter_name="apd", address=6),
],
}
try:
p201 = create_and_configure_device(cfg)
p201.acq_nb_points = request.param
yield p201
finally:
p201.disconnect()
proc.terminate()
def data_tests(ct2, expected_data):
expected_nb_points = len(expected_data)
# get all data (does not consume it)
data = device.get_data()
data = ct2.get_data()
assert data == pytest.approx(expected_data)
from_index = 3
if expected_nb_points > from_index:
# get data from point nb 3
data_from_3 = device.get_data(3)
data_from_3 = ct2.get_data(3)
assert (data[3:] == data_from_3).all()
# read data: consumes it. Next calls will get no data
data_consume = device.read_data()
data_consume = ct2.read_data()
assert (data == data_consume).all()
data_empty = device.get_data()
data_empty = ct2.get_data()
assert data_empty.size == 0
def soft_trigger_points(device, n, period):
def soft_trigger_points(ct2, n, period):
i, start = 0, time.time()
while i < n:
sleep((start + (i + 1) * period) - time.time())
device.trigger_point()
ct2.trigger_point()
i += 1
def test_internal_trigger_single_wrong_config(device):
def test_internal_trigger_single_wrong_config(ct2):
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
Required hardware: P201 card installed in local PC
"""
device.acq_mode = AcqMode.IntTrigSingle
device.acq_expo_time = 1
device.acq_point_period = None
ct2.acq_mode = AcqMode.IntTrigSingle
ct2.acq_expo_time = 1
ct2.acq_point_period = None
# Should not be able to prepare internal trigger single without
# a point period
with pytest.raises(zerorpc.RemoteError):
device.prepare_acq()
ct2.prepare_acq()
def test_internal_trigger_single(device):
def test_internal_trigger_single(ct2):
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
expo_time = 0.02
point_period = expo_time + 0.01
freq = device.timer_freq
nb_points = device.acq_nb_points
device.acq_mode = AcqMode.IntTrigSingle
device.acq_expo_time = expo_time
device.acq_point_period = point_period
device.acq_channels = (3,)
with EventReceiver(device) as receiver:
device.prepare_acq()
device.start_acq()
nb_points = ct2.acq_nb_points
expo_time = max(MIN_HARD_EXPO_TIME, .02 / nb_points)
point_period = expo_time * 1.1
freq = ct2.timer_freq
ct2.acq_mode = AcqMode.IntTrigSingle
ct2.acq_expo_time = expo_time
ct2.acq_point_period = point_period
ct2.acq_channels = (3,)
with EventReceiver(ct2) as receiver:
ct2.prepare_acq()
ct2.start_acq()
receiver.finish.wait()
elapsed = receiver.end - receiver.start
expected_elapsed = nb_points * point_period
assert elapsed == pytest.approx(expected_elapsed, abs=ERROR_MARGIN)
error = ERROR_MARGIN + 1e-6 * nb_points # 1us per point
assert elapsed == pytest.approx(expected_elapsed, abs=error)
timer_ticks = int(freq * expo_time)
ch_3_value = int(TEN_KHz * expo_time)
expected_data = numpy.array(
[(ch_3_value, timer_ticks, i) for i in range(nb_points)]
)
data_tests(device, expected_data)
data_tests(ct2, expected_data)
def test_internal_trigger_single_stop_acq(device):
def test_internal_trigger_single_stop_acq(ct2):
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
expo_time = 0.02
point_period = expo_time + 0.01
freq = device.timer_freq
nb_points = device.acq_nb_points
device.acq_mode = AcqMode.IntTrigSingle
device.acq_expo_time = expo_time
device.acq_point_period = point_period
device.acq_channels = (3,)
nb_points = ct2.acq_nb_points
expo_time = max(100e-6, .02 / nb_points)
point_period = expo_time * 1.1
freq = ct2.timer_freq
ct2.acq_mode = AcqMode.IntTrigSingle
ct2.acq_expo_time = expo_time
ct2.acq_point_period = point_period
ct2.acq_channels = (3,)
expected_elapsed = nb_points * point_period
with EventReceiver(device) as receiver:
device.prepare_acq()
device.start_acq()
# stop around 10%
sleep(0.1 * expected_elapsed)
device.stop_acq()
with EventReceiver(ct2) as receiver:
ct2.prepare_acq()
ct2.start_acq()
# stop around 10% or 1s
sleep(min(0.1 * expected_elapsed, 1))
ct2.stop_acq()
start_stop = time.time()
receiver.finish.wait()
end_stop = time.time()
assert (end_stop - start_stop) < 10E-3 # allow 10ms for stop
def test_internal_trigger_multi_wrong_config(device):
def test_internal_trigger_multi_wrong_config(ct2):
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
device.acq_mode = AcqMode.IntTrigMulti
device.acq_expo_time = 1
device.acq_point_period = 1.1
device.acq_nb_points = 1
ct2.acq_mode = AcqMode.IntTrigMulti
ct2.acq_expo_time = 1
ct2.acq_point_period = 1.1
ct2.acq_nb_points = 1
# Should not be able to prepare internal trigger multi with a point period
with pytest.raises(zerorpc.RemoteError):
device.prepare_acq()
ct2.prepare_acq()
def test_internal_trigger_multi(device):
def test_internal_trigger_multi(ct2):
"""
Required hardware: Only P201 card is required
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
expo_time = 0.02
nb_points = ct2.acq_nb_points
if nb_points > 1000:
pytest.skip("would take too long")
expo_time = max(MIN_HARD_EXPO_TIME, .02 / nb_points)
soft_point_period = expo_time + 0.01
freq = device.timer_freq
nb_points = device.acq_nb_points
device.acq_mode = AcqMode.IntTrigMulti
device.acq_expo_time = expo_time
device.acq_point_period = None
device.acq_channels = (3,)
freq = ct2.timer_freq
ct2.acq_mode = AcqMode.IntTrigMulti
ct2.acq_expo_time = expo_time
ct2.acq_point_period = None
ct2.acq_channels = (3,)
nb_triggers = nb_points - 1
with EventReceiver(device) as receiver:
device.prepare_acq()
device.start_acq()
trigger_task = spawn(
soft_trigger_points, device, nb_triggers, soft_point_period
)
with EventReceiver(ct2) as receiver:
ct2.prepare_acq()
ct2.start_acq()
trigger_task = spawn(soft_trigger_points, ct2, nb_triggers, soft_point_period)
receiver.finish.wait()
assert trigger_task.ready()
......@@ -220,47 +247,49 @@ def test_internal_trigger_multi(device):
elapsed = receiver.end - receiver.start
expected_elapsed = (nb_points - 1) * soft_point_period + 1 * expo_time
assert elapsed == pytest.approx(expected_elapsed, abs=ERROR_MARGIN)
error = ERROR_MARGIN + 1e-6 * nb_points # 1us per point
assert elapsed == pytest.approx(expected_elapsed, abs=error)
timer_ticks = int(freq * expo_time)
ch_3_value = int(TEN_KHz * expo_time)
expected_data = numpy.array(
[(ch_3_value, timer_ticks, i) for i in range(nb_points)]
)
data_tests(device, expected_data)
data_tests(ct2, expected_data)
def test_software_trigger_readout_wrong_config(device):
def test_software_trigger_readout_wrong_config(ct2):
"""
Required hardware: P201 card installed in lid00c
Required software: Running bliss-ct2-server on tcp::/lid00c:8909
"""
device.acq_mode = AcqMode.SoftTrigReadout
device.acq_expo_time = None
device.acq_point_period = 1.1
ct2.acq_mode = AcqMode.SoftTrigReadout
ct2.acq_expo_time = None
ct2.acq_point_period = 1.1
# Should not be able to prepare soft trigger readout with point period
with pytest.raises(zerorpc.RemoteError):
device.prepare_acq()
ct2.prepare_acq()
def test_software_trigger_readout(device):
def test_software_trigger_readout(ct2):
nb_points = ct2.acq_nb_points
if nb_points > 1000:
pytest.skip("would take too long")
expo_time = None
soft_point_period = 0.11
freq = device.timer_freq
nb_points = device.acq_nb_points
device.acq_mode = AcqMode.SoftTrigReadout
device.acq_expo_time = expo_time
device.acq_point_period = None
device.acq_channels = (3,)
soft_point_period = max(10e-3, .02 / nb_points)
freq = ct2.timer_freq
nb_points = ct2.acq_nb_points
ct2.acq_mode = AcqMode.SoftTrigReadout
ct2.acq_expo_time = expo_time
ct2.acq_point_period = None
ct2.acq_channels = (3,)
nb_triggers = nb_points
with EventReceiver(device) as receiver:
device.prepare_acq()
device.start_acq()
trigger_task = spawn(
soft_trigger_points, device, nb_triggers, soft_point_period
)
with EventReceiver(ct2) as receiver:
ct2.prepare_acq()
ct2.start_acq()
trigger_task = spawn(soft_trigger_points, ct2, nb_triggers, soft_point_period)
receiver.finish.wait()
assert trigger_task.ready()
......@@ -275,4 +304,4 @@ def test_software_trigger_readout(device):
expected_data = numpy.array(
[(ch_3_value, timer_ticks, i) for i in range(nb_points)]
)
data_tests(device, expected_data)
data_tests(ct2, expected_data)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment