Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Bliss
python-handel
Commits
a4a98f2b
Commit
a4a98f2b
authored
Sep 22, 2017
by
Vincent Michel
Browse files
Improve overrun detection
parent
14dda057
Pipeline
#1062
passed with stages
in 1 minute and 17 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
handel/interface.py
View file @
a4a98f2b
...
...
@@ -54,6 +54,20 @@ def to_buffer_id(bid):
raise
ValueError
(
msg
.
format
(
bid
))
def
merge_buffer_data
(
*
data
):
if
not
data
:
return
{},
{}
# Use first argument as result
result
,
data
=
data
[
0
],
data
[
1
:]
# Copy other arguments into the result
for
sources
in
data
:
for
source
,
dest
in
zip
(
sources
,
result
):
for
key
,
dct
in
source
.
items
():
dest
.
setdefault
(
key
,
{})
dest
[
key
].
update
(
dct
)
return
result
# Initializing handel
def
init
(
*
path
):
...
...
@@ -238,54 +252,102 @@ def get_buffer_current_pixel(master):
def
set_buffer_done
(
master
,
buffer_id
):
"""Flag the the buffer as read and return an overrun detection.
False means no overrun have been detected.
True means an overrun have been detected.
"""
bid
=
to_buffer_id
(
buffer_id
)
code
=
handel
.
xiaBoardOperation
(
master
,
b
'buffer_done'
,
bid
)
check_error
(
code
)
other
=
b
'b'
if
bid
==
b
'a'
else
b
'a'
return
is_buffer_full
(
master
,
other
)
and
is_channel_running
(
master
)
# Synchronized run
def
any_buffer_overrun
():
"""Return True if an overrun has been detected by the hardware on any
module, False otherwise.
"""
return
any
(
is_buffer_overrun
(
master
)
for
master
in
get_master_channels
())
def
all_buffer_full
(
buffer_id
):
"""Return True if all the given buffers are full and ready to be read,
False otherwise.
"""
return
all
(
is_buffer_full
(
master
,
buffer_id
)
for
master
in
get_master_channels
())
def
set_all_buffer_done
(
buffer_id
):
for
master
in
get_master_channels
():
set_buffer_done
(
master
,
buffer_id
)
"""Flag all the given buffers as read and return an overrun detection.
False means no overrun have been detected.
True means an overrun have been detected.
"""
overruns
=
[
set_buffer_done
(
master
,
buffer_id
)
for
master
in
get_master_channels
()]
return
any
(
overruns
)
def
get_current_pixel
():
"""Get the current pixel reported by the hardware."""
return
max
(
get_buffer_current_pixel
(
master
)
for
master
in
get_master_channels
())
def
get_all_buffer_data
(
buffer_id
):
result
=
{},
{}
for
master
in
get_master_channels
():
sources
=
get_buffer_data
(
master
,
buffer_id
)
for
source
,
dest
in
zip
(
sources
,
result
):
for
key
,
dct
in
source
.
items
():
dest
.
setdefault
(
key
,
{})
dest
[
key
].
update
(
dct
)
return
result
"""Get and merge all the buffer data from the different channels.
Return a tuple (spectrums, statistics) where both values are dictionaries
of dictionaries, first indexed by pixel and then by channel."""
data
=
[
get_buffer_data
(
master
,
buffer_id
)
for
master
in
get_master_channels
()]
return
merge_buffer_data
(
*
data
)
def
synchronized_poll_data
():
if
any_buffer_overrun
():
raise
RuntimeError
(
'Buffer overrun!'
)
"""Convenient helper for buffer management in mapping mode.
It assumes that all the modules are configured with the same number
of pixels per buffer.
It includes:
- Hardware overrun detection
- Software overrun detection
- Current pixel readout
- Buffer readout and data parsing
- Buffer flaging after readout
If an overrun is detected, a RuntimeError exception is raised.
Return a tuple (current_pixel, spectrums, statistics) where both
the spectrums and statistics values are dictionaries of dictionaries,
first indexed by pixel and then by channel. If there is no data to
report, those values are empty dicts.
"""
data
=
{
'a'
:
None
,
'b'
:
None
}
overrun_error
=
RuntimeError
(
'Buffer overrun!'
)
# Get info from hardware
current_pixel
=
get_current_pixel
()
for
bid
in
(
'a'
,
'b'
):
if
all_buffer_full
(
bid
):
spectrums
,
statistics
=
get_all_buffer_data
(
bid
)
set_all_buffer_done
(
bid
)
return
current_pixel
,
spectrums
,
statistics
return
current_pixel
,
None
,
None
full_lst
=
[
x
for
x
in
data
if
all_buffer_full
(
x
)]
# Overrun from hardware
if
any_buffer_overrun
():
raise
overrun_error
# Read data from buffers
for
x
in
full_lst
:
data
[
x
]
=
get_all_buffer_data
(
x
)
# Overrun from set_buffer_done
if
set_all_buffer_done
(
x
):
raise
overrun_error
# Extract data
args
=
filter
(
None
,
data
.
values
())
spectrums
,
stats
=
merge_buffer_data
(
*
args
)
# Return
return
current_pixel
,
spectrums
,
stats
# Baseline
...
...
tests/test_interface.py
View file @
a4a98f2b
...
...
@@ -434,12 +434,18 @@ def test_get_buffer_current_pixel(interface):
def
test_set_buffer_done
(
interface
):
m
=
interface
.
handel
.
xiaBoardOperation
m
.
return_value
=
0
assert
interface
.
set_buffer_done
(
1
,
'b'
)
is
None
m
.
assert_called_with
(
1
,
b
'buffer_done'
,
b
'b'
)
# Make sure errors have been checked
interface
.
check_error
.
assert_called_with
(
0
)
with
mock
.
patch
(
'handel.interface.is_channel_running'
)
as
m1
:
with
mock
.
patch
(
'handel.interface.is_buffer_full'
)
as
m2
:
m
=
interface
.
handel
.
xiaBoardOperation
m
.
return_value
=
0
m1
.
return_value
=
True
m2
.
return_value
=
True
assert
interface
.
set_buffer_done
(
1
,
'b'
)
is
True
m
.
assert_called_with
(
1
,
b
'buffer_done'
,
b
'b'
)
m1
.
assert_called_once_with
(
1
)
m2
.
assert_called_once_with
(
1
,
b
'a'
)
# Make sure errors have been checked
interface
.
check_error
.
assert_called_once_with
(
0
)
# Synchronized run
...
...
@@ -468,7 +474,8 @@ def test_set_all_buffer_done(interface):
with
mock
.
patch
(
'handel.interface.get_master_channels'
)
as
m1
:
with
mock
.
patch
(
'handel.interface.set_buffer_done'
)
as
m2
:
m1
.
return_value
=
[
0
,
4
]
assert
interface
.
set_all_buffer_done
(
'b'
)
is
None
m2
.
side_effect
=
lambda
x
,
y
:
x
==
0
assert
interface
.
set_all_buffer_done
(
'b'
)
is
True
m1
.
assert_called_once_with
()
m2
.
assert_called_with
(
4
,
'b'
)
...
...
@@ -500,45 +507,70 @@ def test_get_all_buffer_data(interface):
def
test_synchronized_poll_data
(
interface
):
with
mock
.
patch
.
multiple
(
'handel.interface'
,
any_buffer_overrun
=
mock
.
DEFAULT
,
get_current_pixel
=
mock
.
DEFAULT
,
any_buffer_overrun
=
mock
.
DEFAULT
,
all_buffer_full
=
mock
.
DEFAULT
,
get_all_buffer_data
=
mock
.
DEFAULT
,
set_all_buffer_done
=
mock
.
DEFAULT
)
as
ms
:
# Overrun
# First hard overrun
ms
[
'any_buffer_overrun'
].
return_value
=
True
with
pytest
.
raises
(
RuntimeError
)
as
ctx
:
interface
.
synchronized_poll_data
()
assert
'Buffer overrun!'
in
str
(
ctx
.
value
)
ms
[
'get_current_pixel'
].
assert_called_once_with
()
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
ms
[
'get_current_pixel'
].
assert_not_called
()
ms
[
'all_buffer_full'
].
assert_not_called
()
assert
ms
[
'all_buffer_full'
].
call_args_list
==
[((
'a'
,),),
((
'b'
,),)]
ms
[
'get_all_buffer_data'
].
assert_not_called
()
ms
[
'set_all_buffer_done'
].
assert_not_called
()
# Reset
for
m
in
ms
.
values
():
m
.
reset_mock
()
# Data not ready
ms
[
'any_buffer_overrun'
].
return_value
=
False
ms
[
'get_current_pixel'
].
return_value
=
10
ms
[
'any_buffer_overrun'
].
return_value
=
False
ms
[
'all_buffer_full'
].
return_value
=
False
assert
interface
.
synchronized_poll_data
()
==
(
10
,
None
,
None
)
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
assert
interface
.
synchronized_poll_data
()
==
(
10
,
{},
{})
ms
[
'get_current_pixel'
].
assert_called_once_with
()
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
assert
ms
[
'all_buffer_full'
].
call_args_list
==
[((
'a'
,),),
((
'b'
,),)]
ms
[
'get_all_buffer_data'
].
assert_not_called
()
ms
[
'set_all_buffer_done'
].
assert_not_called
()
# Reset
for
m
in
ms
.
values
():
m
.
reset_mock
()
# Data ready
# Soft overrun
ms
[
'get_current_pixel'
].
return_value
=
20
ms
[
'any_buffer_overrun'
].
return_value
=
False
ms
[
'all_buffer_full'
].
side_effect
=
lambda
x
:
x
==
'a'
ms
[
'get_all_buffer_data'
].
return_value
=
'spectrums'
,
'stats'
ms
[
'set_all_buffer_done'
].
return_value
=
True
with
pytest
.
raises
(
RuntimeError
)
as
ctx
:
interface
.
synchronized_poll_data
()
assert
'Buffer overrun!'
in
str
(
ctx
.
value
)
ms
[
'get_current_pixel'
].
assert_called_once_with
()
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
assert
ms
[
'all_buffer_full'
].
call_args_list
==
[((
'a'
,),),
((
'b'
,),)]
ms
[
'get_all_buffer_data'
].
assert_called_once_with
(
'a'
)
ms
[
'set_all_buffer_done'
].
assert_called_once_with
(
'a'
)
# Reset
for
m
in
ms
.
values
():
m
.
reset_mock
()
# Data ready
ms
[
'get_current_pixel'
].
return_value
=
20
ms
[
'any_buffer_overrun'
].
return_value
=
False
ms
[
'all_buffer_full'
].
side_effect
=
lambda
x
:
x
==
'b'
ms
[
'get_all_buffer_data'
].
return_value
=
'spectrums'
,
'stats'
ms
[
'set_all_buffer_done'
].
return_value
=
False
assert
interface
.
synchronized_poll_data
()
==
(
20
,
'spectrums'
,
'stats'
)
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
ms
[
'get_current_pixel'
].
assert_called_once_with
()
ms
[
'any_buffer_overrun'
].
assert_called_once_with
()
assert
ms
[
'all_buffer_full'
].
call_args_list
==
[((
'a'
,),),
((
'b'
,),)]
ms
[
'get_all_buffer_data'
].
assert_called_once_with
(
'b'
)
ms
[
'set_all_buffer_done'
].
assert_called_once_with
(
'b'
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment