Commit fa505425 authored by Vincent Michel's avatar Vincent Michel
Browse files

Add and update some API functions

parent 861cbab4
Pipeline #962 passed with stages
in 1 minute and 20 seconds
......@@ -13,7 +13,10 @@ from .parser import parse_xia_ini_file
__all__ = ['init', 'init_handel', 'exit',
'new_detector', 'get_num_detectors', 'get_detectors',
'get_detector_from_channel',
'start_run', 'stop_run', 'get_run_data_length', 'get_run_data',
'start_run', 'stop_run',
'get_spectrum_length', 'get_spectrum', 'get_spectrums',
'is_channel_running', 'is_running',
'get_module_statistics', 'get_statistics',
'get_buffer_length', 'get_buffer_full', 'get_buffer', 'buffer_done',
'load_system', 'save_system', 'start_system',
'enable_log_output', 'disable_log_output',
......@@ -103,25 +106,29 @@ def get_detector_from_channel(channel):
# Run control
def start_run(channel, resume=False):
def start_run(channel=None, resume=False):
if channel is None:
channel = -1 # All channels
code = handel.xiaStartRun(channel, resume)
check_error(code)
def stop_run(channel):
def stop_run(channel=None):
if channel is None:
channel = -1 # All channels
code = handel.xiaStopRun(channel)
check_error(code)
def get_run_data_length(channel):
def get_spectrum_length(channel):
length = ffi.new('unsigned long *')
code = handel.xiaGetRunData(channel, b'mca_length', length)
check_error(code)
return length[0]
def get_run_data(channel):
length = get_run_data_length(channel)
def get_spectrum(channel):
length = get_spectrum_length(channel)
array = numpy.zeros(length, dtype='uint32')
data = ffi.cast('uint32_t *', array.ctypes.data)
code = handel.xiaGetRunData(channel, b'mca', data)
......@@ -129,6 +136,46 @@ def get_run_data(channel):
return array
def get_spectrums():
"""Return the spectrums for all enabled channels as a dictionary."""
return {channel: get_spectrum(channel) for channel in get_channels()}
def is_channel_running(channel):
running = ffi.new('short *')
code = handel.xiaGetRunData(channel, b'run_active', running)
check_error(code)
return bool(running[0])
def is_running():
"""Return True if any channel is running, False otherwise."""
return any(map(is_channel_running, get_channels()))
# Statistics
def get_module_statistics(module):
channels = get_module_channels(module)
data_size = 7 * len(channels)
master = next(c for c in channels if c != -1)
array = numpy.zeros(data_size, dtype='double')
data = ffi.cast('double *', array.ctypes.data)
code = handel.xiaGetRunData(master, b'module_statistics', data)
check_error(code)
return {channel: array[index:index+7].tolist()
for index, channel in enumerate(channels)
if channel != -1}
def get_statistics():
"""Return the statistics for all enabled channels as a dictionary."""
result = {}
for module in get_modules():
result.update(get_module_statistics(module))
return result
# Buffer
def get_buffer_length(channel):
......@@ -272,6 +319,8 @@ def get_module_interface(alias):
return ffi.string(value).decode()
# Channels
def get_module_number_of_channels(alias):
alias = to_bytes(alias)
value = ffi.new('int *')
......@@ -331,28 +380,40 @@ def get_channels():
# Parameters
def set_acquisition_value(channel, name, value):
def get_acquisition_value(name, channel):
name = to_bytes(name)
pointer = ffi.new('double *', value)
code = handel.xiaSetAcquisitionValues(channel, name, pointer)
pointer = ffi.new('double *')
code = handel.xiaGetAcquisitionValues(channel, name, pointer)
check_error(code)
return pointer[0]
def get_acquisition_value(channel, name):
def set_acquisition_value(name, value, channel=None):
if channel is None:
channel = -1 # All channels
name = to_bytes(name)
pointer = ffi.new('double *')
code = handel.xiaGetAcquisitionValues(channel, name, pointer)
pointer = ffi.new('double *', value)
code = handel.xiaSetAcquisitionValues(channel, name, pointer)
check_error(code)
return pointer[0]
def remove_acquisition_value(channel, name):
def remove_acquisition_value(name, channel=None):
if channel is None:
channel = -1 # All channels
name = to_bytes(name)
code = handel.xiaRemoveAcquisitionValues(channel, name)
check_error(code)
def apply_acquisition_values(channel):
def apply_acquisition_values(channel=None):
# Apply all
if channel is None:
# Only one apply operation by module is required
grouped = get_grouped_channels()
for master in map(max, grouped):
apply_acquisition_values(master)
return
# Apply single
dummy = ffi.new('int *')
code = handel.xiaBoardOperation(channel, b'apply', dummy)
check_error(code)
......
......@@ -155,7 +155,7 @@ def test_stop_run(interface):
interface.check_error.assert_called_once_with(0)
def test_get_run_data_length(interface):
def test_get_spectrum_length(interface):
m = interface.handel.xiaGetRunData
def side_effect(channel, dtype, arg):
......@@ -163,7 +163,7 @@ def test_get_run_data_length(interface):
return 0
m.side_effect = side_effect
assert interface.get_run_data_length(1) == 10
assert interface.get_spectrum_length(1) == 10
m.assert_called_once()
arg = m.call_args[0][2]
m.assert_called_once_with(1, b'mca_length', arg)
......@@ -171,7 +171,7 @@ def test_get_run_data_length(interface):
interface.check_error.assert_called_once_with(0)
def test_get_run_data(interface):
def test_get_spectrum(interface):
m = interface.handel.xiaGetRunData
def side_effect(channel, dtype, arg):
......@@ -186,7 +186,7 @@ def test_get_run_data(interface):
m.side_effect = side_effect
expected = numpy.array(range(10), dtype='uint32')
diff = interface.get_run_data(1) == expected
diff = interface.get_spectrum(1) == expected
assert diff.all()
m.assert_called()
arg = m.call_args[0][2]
......@@ -525,20 +525,8 @@ def test_get_channels(interface):
interface.check_error.assert_called_with(0)
# Parameters
def test_set_acquisition_value(interface):
m = interface.handel.xiaSetAcquisitionValues
m.return_value = 0
assert interface.set_acquisition_value(1, 'test', 2.3) is None
arg = m.call_args[0][2]
m.assert_called_once_with(1, b'test', arg)
assert arg[0] == 2.3
# Make sure errors have been checked
interface.check_error.assert_called_once_with(0)
def test_get_acquistion_value(interface):
m = interface.handel.xiaGetAcquisitionValues
......@@ -547,30 +535,70 @@ def test_get_acquistion_value(interface):
return 0
m.side_effect = side_effect
assert interface.get_acquisition_value(1, 'test') == 2.3
assert interface.get_acquisition_value('test', channel=1) == 2.3
arg = m.call_args[0][2]
m.assert_called_once_with(1, b'test', arg)
# Make sure errors have been checked
interface.check_error.assert_called_once_with(0)
def test_set_acquisition_value(interface):
m = interface.handel.xiaSetAcquisitionValues
m.return_value = 0
# Single channel
assert interface.set_acquisition_value('test', 2.3, channel=1) is None
arg = m.call_args[0][2]
m.assert_called_once_with(1, b'test', arg)
assert arg[0] == 2.3
# Make sure errors have been checked
interface.check_error.assert_called_once_with(0)
m.reset_mock()
# All channels
assert interface.set_acquisition_value('test', 2.3) is None
arg = m.call_args[0][2]
m.assert_called_once_with(-1, b'test', arg)
assert arg[0] == 2.3
def test_remove_acquisition_value(interface):
m = interface.handel.xiaRemoveAcquisitionValues
m.return_value = 0
assert interface.remove_acquisition_value(1, 'test') is None
# Single channel
assert interface.remove_acquisition_value('test', channel=1) is None
m.assert_called_once_with(1, b'test')
# Make sure errors have been checked
interface.check_error.assert_called_once_with(0)
m.reset_mock()
# Multiple channels
assert interface.remove_acquisition_value('test') is None
m.assert_called_once_with(-1, b'test')
def test_apply_acquisition_values(interface):
m = interface.handel.xiaBoardOperation
m.return_value = 0
assert interface.apply_acquisition_values(1) is None
# Single channel
assert interface.apply_acquisition_values(channel=1) is None
dummy = m.call_args[0][2]
m.assert_called_once_with(1, b'apply', dummy)
# Make sure errors have been checked
interface.check_error.assert_called_once_with(0)
m.reset_mock()
# Multiple channel
with mock.patch('handel.interface.get_grouped_channels') as m2:
m2.return_value = ((0, ), )
assert interface.apply_acquisition_values() is None
dummy = m.call_args[0][2]
m.assert_called_once_with(0, b'apply', dummy)
# Debugging
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment