Commit ee1bb688 authored by Vincent Michel's avatar Vincent Michel

More FalconX compatibility

parent 11eafc03
Pipeline #1292 passed with stages
in 1 minute and 19 seconds
......@@ -174,12 +174,18 @@ def is_running():
# Statistics
def get_module_statistics(module):
# Get raw data
channels = get_module_channels(module)
# FalconX requires a spectrum read for the statistics to be updated
if get_module_type(module).startswith(u'falconx'):
for channel in channels:
if channel >= 0:
get_spectrum(channel)
# Prepare buffer
data_size = 9 * len(channels)
master = next(c for c in channels if c >= 0)
array = numpy.zeros(data_size, dtype='double')
data = ffi.cast('double *', array.ctypes.data)
# Run handel call
code = handel.xiaGetRunData(master, b'module_statistics_2', data)
check_error(code)
# Parse raw data
......@@ -191,6 +197,9 @@ def get_module_statistics(module):
def get_statistics():
"""Return the statistics for all enabled channels as a dictionary."""
result = {}
# We're not using get_master_channels here.
# That's because each FalconX channels is its own master, even though
# the statistics can be accessed with a single call per module.
for module in get_modules():
result.update(get_module_statistics(module))
return result
......@@ -323,7 +332,7 @@ def get_all_buffer_data(buffer_id):
return merge_buffer_data(*data)
def synchronized_poll_data():
def synchronized_poll_data(done=set()):
"""Convenient helper for buffer management in mapping mode.
It assumes that all the modules are configured with the same number
......@@ -347,12 +356,15 @@ def synchronized_poll_data():
overrun_error = RuntimeError('Buffer overrun!')
# Get info from hardware
current_pixel = get_current_pixel()
full_lst = [x for x in data if all_buffer_full(x)]
full = {x for x in data if all_buffer_full(x)}
# Overrun from hardware
if any_buffer_overrun():
raise overrun_error
# Hack: use the done argument
done &= full
full -= done
# Read data from buffers
for x in full_lst:
for x in full:
data[x] = get_all_buffer_data(x)
# Overrun from set_buffer_done
if set_all_buffer_done(x):
......@@ -537,7 +549,11 @@ def get_channels():
def get_master_channels():
"""Return one active channel for each module."""
"""Return one active channel for each buffer."""
# For the FalconX, each channel has its own buffer
if get_module_type().startswith(u'falconx'):
return get_channels()
# Otherwise, one channel per module is enough
return tuple(
next(channel for channel in groups if channel >= 0)
for groups in get_grouped_channels())
......
......@@ -293,21 +293,25 @@ def test_get_module_statistics(interface):
m.side_effect = side_effect
with mock.patch('handel.interface.get_module_channels') as m2:
m2.return_value = [-1, -1, -1, 8]
# First test
assert interface.get_module_statistics('module3') == expected
m2.assert_called_once_with('module3')
arg = m.call_args[0][2]
m.assert_called_once_with(8, b'module_statistics_2', arg)
# Second test
raw[5] = 4.56 # ICR inconsistency
raw[6] = 1.23 # OCR inconsistency
with pytest.warns(UserWarning) as ctx:
interface.get_module_statistics('module3')
assert ctx[0].message.args[0].startswith(
'ICR buffer inconsistency: 4.56 != 3131.7208')
assert ctx[1].message.args[0].startswith(
'OCR buffer inconsistency: 1.23 != 2724.3282')
with mock.patch('handel.interface.get_module_type') as m3:
with mock.patch('handel.interface.get_spectrum') as m4:
m2.return_value = [-1, -1, -1, 8]
m3.return_value = u'falconxn'
# First test
assert interface.get_module_statistics('module3') == expected
m2.assert_called_once_with('module3')
arg = m.call_args[0][2]
m.assert_called_once_with(8, b'module_statistics_2', arg)
m4.assert_called_once_with(8)
# Second test
raw[5] = 4.56 # ICR inconsistency
raw[6] = 1.23 # OCR inconsistency
with pytest.warns(UserWarning) as ctx:
interface.get_module_statistics('module3')
assert ctx[0].message.args[0].startswith(
'ICR buffer inconsistency: 4.56 != 3131.7208')
assert ctx[1].message.args[0].startswith(
'OCR buffer inconsistency: 1.23 != 2724.3282')
# Make sure errors have been checked
interface.check_error.assert_called_with(0)
......@@ -385,10 +389,12 @@ def test_get_raw_buffer(interface):
arg[0] = 5
return 0
if dtype == b'buffer_a':
arg[0:5] = range(1, 6)
arg[0:5] = buff
return 0
assert False
# First test
buff = range(1, 6)
m.side_effect = side_effect
expected = numpy.array(range(1, 6), dtype='uint32')
diff = interface.get_raw_buffer(1, 'a') == expected
......@@ -399,6 +405,23 @@ def test_get_raw_buffer(interface):
# Make sure errors have been checked
interface.check_error.assert_called_with(0)
# Second test
buff = [2*x + ((2*x+1) << 16) for x in range(1, 6)]
expected = numpy.array(range(2, 12), dtype='uint32')
diff = interface.get_raw_buffer(1, 'a') == expected
assert diff.all()
m.assert_called()
arg = m.call_args[0][2]
m.assert_called_with(1, b'buffer_a', arg)
# Make sure errors have been checked
interface.check_error.assert_called_with(0)
# Second test
buff = [0] * 5
with pytest.raises(RuntimeError) as ctx:
interface.get_raw_buffer(1, 'a')
assert 'The buffer a associated with channel 1 is empty' in str(ctx.value)
def test_get_buffer_data(interface):
with mock.patch('handel.interface.get_raw_buffer') as m1:
......@@ -918,10 +941,18 @@ def test_get_channels(interface):
def test_get_master_channels(interface):
with mock.patch('handel.interface.get_grouped_channels') as m:
m.return_value = [(0, 1, 2, 3), (-1, 5, 6, 7)]
assert interface.get_master_channels() == (0, 5)
m.assert_called_once_with()
with mock.patch('handel.interface.get_grouped_channels') as m1:
with mock.patch('handel.interface.get_module_type') as m2:
m1.return_value = [(0, 1, 2, 3), (-1, 5, 6, 7)]
m2.return_value = u'notfalconx'
assert interface.get_master_channels() == (0, 5)
m1.assert_called_once_with()
with mock.patch('handel.interface.get_channels') as m1:
with mock.patch('handel.interface.get_module_type') as m2:
m1.return_value = (0, 1, 2)
m2.return_value = u'falconxn'
assert interface.get_master_channels() == (0, 1, 2)
m1.assert_called_once_with()
def test_get_trigger_channels(interface):
......@@ -1032,10 +1063,12 @@ def test_apply_acquisition_values(interface):
# Multiple channel
with mock.patch('handel.interface.get_grouped_channels') as m2:
m2.return_value = ((0, ), )
assert interface.apply_acquisition_values() is None
dummy = m.call_args[0][2]
m.assert_called_once_with(0, b'apply', dummy)
with mock.patch('handel.interface.get_module_type') as m3:
m2.return_value = ((0, ), )
m3.return_value = u'notfalconx'
assert interface.apply_acquisition_values() is None
dummy = m.call_args[0][2]
m.assert_called_once_with(0, b'apply', dummy)
# Debugging
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment