Commit d5f9ae3b authored by Vincent Michel's avatar Vincent Michel

Update buffer API and add buffer parsing

parent 184ff884
Pipeline #1036 passed with stages
in 1 minute and 18 seconds
......@@ -10,7 +10,7 @@ import numpy
from .error import check_error
from ._cffi import handel, ffi
from .parser import parse_xia_ini_file
from .parser import parse_xia_ini_file, parse_mapping_buffer
__all__ = ['init', 'init_handel', 'exit',
'new_detector', 'get_num_detectors', 'get_detectors',
......@@ -19,7 +19,8 @@ __all__ = ['init', 'init_handel', 'exit',
'get_spectrum_length', 'get_spectrum', 'get_spectrums',
'is_channel_running', 'is_running',
'get_module_statistics', 'get_statistics',
'get_buffer_length', 'get_buffer_full', 'get_buffer', 'buffer_done',
'get_buffer_length', 'get_raw_buffer', 'get_buffer',
'is_buffer_full', 'set_buffer_done',
'load_system', 'save_system', 'start_system',
'enable_log_output', 'disable_log_output',
'set_log_output', 'set_log_level', 'close_log',
......@@ -233,7 +234,7 @@ def get_buffer_length(channel):
return length[0]
def get_buffer_full(channel, buffer_id):
def is_buffer_full(channel, buffer_id):
bid = to_buffer_id(buffer_id)
command = b'buffer_full_%c' % bid
result = ffi.new('unsigned short *')
......@@ -242,23 +243,60 @@ def get_buffer_full(channel, buffer_id):
return bool(result[0])
def get_buffer(channel, buffer_id):
def is_overrun(channel):
result = ffi.new('unsigned short *')
code = handel.xiaGetRunData(channel, b'buffer_overrun', result)
check_error(code)
return bool(result[0])
def get_raw_buffer(channel, buffer_id):
bid = to_buffer_id(buffer_id)
command = b'buffer_%c' % bid
length = get_buffer_length(channel)
array = numpy.zeros(length, dtype='uint32')
array = numpy.zeros(length * 2, dtype='uint16')
data = ffi.cast('uint32_t *', array.ctypes.data)
code = handel.xiaGetRunData(channel, command, data)
check_error(code)
return array
return array[::2]
def get_buffer(channel, buffer_id):
raw = get_raw_buffer(channel, buffer_id)
return parse_mapping_buffer(raw)
def get_current_pixel(channel):
current = ffi.new('unsigned long *')
code = handel.xiaGetRunData(channel, b'current_pixel', current)
check_error(code)
return current[0]
def buffer_done(channel, buffer_id):
def set_buffer_done(channel, buffer_id):
bid = to_buffer_id(buffer_id)
code = handel.xiaBoardOperation(channel, b'buffer_done', bid)
check_error(code)
# Baseline
def get_baseline_length(channel):
length = ffi.new('unsigned long *')
code = handel.xiaGetRunData(channel, b'baseline_length', length)
check_error(code)
return length[0]
def get_baseline(channel):
length = get_baseline_length(channel)
array = numpy.zeros(length, dtype='uint32')
data = ffi.cast('uint32_t *', array.ctypes.data)
code = handel.xiaGetRunData(channel, b'baseline', data)
check_error(code)
return array
# Not exposed
# int xiaDoSpecialRun(int detChan, char *name, void *info);
......
......@@ -57,3 +57,68 @@ def parse_xia_ini_file(content):
raise ValueError('Line not recognized: {!r}'.format(line))
# Return result
return dct
def dword(array, index, bitsize=16):
return array[index] | array[index+1] << bitsize
def parse_mapping_buffer(raw):
spectrums, statistics = {}, {}
# Header advance
header = raw[0:256]
current = 256
# Header parsing
assert header[0] == 0x55aa
assert header[1] == 0xaa55
assert header[2] == 0x100
mapping_mode = header[3]
buffer_index = dword(header, 4)
buffer_id = ['a', 'b'][header[7]]
pixel_number = header[8]
starting_pixel = dword(header, 9)
module_serial_number = header[11]
dc1, de1 = header[12:14]
dc2, de2 = header[14:16]
dc3, de3 = header[16:18]
dc4, de4 = header[18:20]
sizes = header[20:24]
# Check
assert mapping_mode == 1 # MCA mapping mode
# Iterate over pixels
for _ in range(pixel_number):
# Pixel header advance
pixel_header = raw[current:current+256]
current += 256
# Pixel header parsing
assert pixel_header[0] == 0x33cc
assert pixel_header[1] == 0xcc33
assert pixel_header[2] == 0x100
assert pixel_header[3] == mapping_mode
pixel = dword(pixel_header, 4)
total_size = dword(pixel_header, 6)
sizes = pixel_header[8:12]
# Iterate over channels
for i in range(4):
# Get statistics
stats = [dword(header, 32 + 8 * i + 2 * j) for j in range(4)]
# Set data
lst = statistics.setdefault(pixel, [])
lst.append(tuple(stats))
# Iterate over channels
remaining = total_size - 256
for size in sizes:
# Update remaining size
assert remaining >= 0
if remaining == 0:
break
remaining -= size
# Sectrum Advance
spectrum = raw[current:current+size]
current += size
# Set data
lst = spectrums.setdefault(pixel, [])
lst.append(spectrum)
assert remaining == 0
# Return result
return spectrum, statistics
......@@ -352,9 +352,9 @@ def test_get_buffer_length(interface):
interface.check_error.assert_called_with(0)
def test_get_buffer_full(interface):
def test_is_buffer_full(interface):
with pytest.raises(ValueError) as context:
interface.get_buffer_full(1, 'very wrong')
interface.is_buffer_full(1, 'very wrong')
assert 'very wrong' in str(context.value)
m = interface.handel.xiaGetRunData
......@@ -364,14 +364,14 @@ def test_get_buffer_full(interface):
return 0
m.side_effect = side_effect
assert interface.get_buffer_full(1, 'a') is True
assert interface.is_buffer_full(1, 'a') is True
arg = m.call_args[0][2]
m.assert_called_once_with(1, b'buffer_full_a', arg)
# Make sure errors have been checked
interface.check_error.assert_called_with(0)
def test_get_buffer(interface):
def test_get_raw_buffer(interface):
m = interface.handel.xiaGetRunData
def side_effect(channel, dtype, arg):
......@@ -386,7 +386,7 @@ def test_get_buffer(interface):
m.side_effect = side_effect
expected = numpy.array(range(10), dtype='uint32')
diff = interface.get_buffer(1, 'a') == expected
diff = interface.get_raw_buffer(1, 'a') == expected
assert diff.all()
m.assert_called()
arg = m.call_args[0][2]
......@@ -395,10 +395,10 @@ def test_get_buffer(interface):
interface.check_error.assert_called_with(0)
def test_buffer_done(interface):
def test_set_buffer_done(interface):
m = interface.handel.xiaBoardOperation
m.return_value = 0
assert interface.buffer_done(1, 'b') is None
assert interface.set_buffer_done(1, 'b') is None
m.assert_called_with(1, b'buffer_done', b'b')
# Make sure errors have been checked
interface.check_error.assert_called_with(0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment