Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Benoit Rousselle
bliss
Commits
4d01391f
Commit
4d01391f
authored
May 15, 2019
by
GUILLOU Perceval
Browse files
add fast scan test and listener handles data chunk
parent
65850a9a
Changes
2
Hide whitespace changes
Inline
Side-by-side
bliss/data/display.py
View file @
4d01391f
...
...
@@ -302,6 +302,10 @@ class ScanDataListener:
self
.
exit_read_fd
=
exit_read_fd
self
.
scan_display
=
ScanDisplay
(
self
.
session_name
)
# self.start_time = 0
# self.last_time = 0
# self.stop_time = 0
def
update_counter_selection
(
self
):
self
.
counter_selection
=
self
.
scan_display
.
counters
...
...
@@ -317,7 +321,7 @@ class ScanDataListener:
return
selection
def
on_scan_new
(
self
,
scan_info
):
# self.start_time = time.time()
# Skip other session
if
scan_info
.
get
(
"session_name"
)
!=
self
.
session_name
:
# print(f"{scan_info.get('session_name')} != {self.session_name}")
...
...
@@ -487,58 +491,80 @@ class ScanDataListener:
return
# Skip if partial data
for
channel_name
in
channel_info
[
"data"
]:
# for channel_name in channel_info["data"]: # ??? for channel_name in self.channel_names: ???
for
channel_name
in
self
.
channel_names
:
if
len
(
channel_info
[
"data"
][
channel_name
])
<
self
.
scan_steps_index
:
return
# Get data for the current scan step
values_dict
=
{}
for
channel_name
in
channel_info
[
"data"
]:
if
channel_name
in
self
.
channel_names
:
# Check if we receive more than one scan points (i.e. lines) per 'scan_data' event
data_lens
=
[
len
(
channel_info
[
"data"
][
channel_name
])
for
channel_name
in
channel_info
[
"data"
]
]
bsize
=
min
(
data_lens
)
# if bsize != self.scan_steps_index:
# print("receive", bsize, "lines at step", self.scan_steps_index-1, "printing", bsize - self.scan_steps_index+1, "lines" )
for
i
in
range
(
bsize
-
self
.
scan_steps_index
+
1
):
# Get data for the current scan step
values_dict
=
{}
# for channel_name in channel_info["data"]: # ??? for channel_name in self.channel_names: ???
# if channel_name in self.channel_names:
for
channel_name
in
self
.
channel_names
:
values_dict
[
channel_name
]
=
channel_info
[
"data"
][
channel_name
][
self
.
scan_steps_index
-
1
]
# Extract time data
elapsed_time_col
=
[]
if
"timer:elapsed_time"
in
values_dict
:
elapsed_time_col
.
append
(
values_dict
.
pop
(
"timer:elapsed_time"
))
# Extract time data
elapsed_time_col
=
[]
if
"timer:elapsed_time"
in
values_dict
:
elapsed_time_col
.
append
(
values_dict
.
pop
(
"timer:elapsed_time"
))
# Build data line
values
=
elapsed_time_col
+
[
values_dict
[
channel_name
]
for
channel_name
in
values_dict
]
# Build data line
values
=
elapsed_time_col
+
[
values_dict
[
channel_name
]
for
channel_name
in
values_dict
]
# Format output line
if
scan_type
==
"ct"
:
# ct is actually a timescan(npoints=1).
norm_values
=
numpy
.
array
(
values
)
/
scan_info
[
"count_time"
]
col_len
=
max
(
map
(
len
,
self
.
col_labels
))
+
2
template
=
"{{label:>{0}}} = {{value: >12}} ({{norm: 12}}/s)"
.
format
(
col_len
)
lines
=
"
\n
"
.
join
(
[
template
.
format
(
label
=
label
,
value
=
v
,
norm
=
nv
)
for
label
,
v
,
nv
in
zip
(
self
.
col_labels
[
1
:],
values
,
norm_values
)
]
)
end_time_str
=
datetime
.
datetime
.
now
().
strftime
(
"%a %b %d %H:%M:%S %Y"
)
msg
=
"{0}
\n\n
{1}"
.
format
(
end_time_str
,
lines
)
print
(
msg
)
else
:
values
.
insert
(
0
,
self
.
_point_nb
)
self
.
_point_nb
+=
1
line
=
" "
.
join
(
[
self
.
col_templ
[
i
].
format
(
v
)
for
i
,
v
in
enumerate
(
values
)]
)
# Format output line
if
scan_type
==
"ct"
:
# ct is actually a timescan(npoints=1).
norm_values
=
numpy
.
array
(
values
)
/
scan_info
[
"count_time"
]
col_len
=
max
(
map
(
len
,
self
.
col_labels
))
+
2
template
=
"{{label:>{0}}} = {{value: >12}} ({{norm: 12}}/s)"
.
format
(
col_len
)
lines
=
"
\n
"
.
join
(
[
template
.
format
(
label
=
label
,
value
=
v
,
norm
=
nv
)
for
label
,
v
,
nv
in
zip
(
self
.
col_labels
[
1
:],
values
,
norm_values
)
]
)
end_time_str
=
datetime
.
datetime
.
now
().
strftime
(
"%a %b %d %H:%M:%S %Y"
)
msg
=
"{0}
\n\n
{1}"
.
format
(
end_time_str
,
lines
)
print
(
msg
)
else
:
values
.
insert
(
0
,
self
.
_point_nb
)
self
.
_point_nb
+=
1
line
=
" "
.
join
(
[
self
.
col_templ
[
i
].
format
(
v
)
for
i
,
v
in
enumerate
(
values
)]
)
print
(
line
)
print
(
line
)
self
.
scan_steps_index
+=
1
self
.
scan_steps_index
+=
1
# self.last_time = time.time()
# print("dt_last = ",self.last_time -self.start_time)
def
on_scan_end
(
self
,
scan_info
):
# self.stop_time = time.time()
# print("stop_time ", self.stop_time)
# print("dt_stop = ",self.stop_time -self.start_time)
if
scan_info
.
get
(
"session_name"
)
!=
self
.
session_name
:
return
...
...
tests/scans/test_scan_display.py
View file @
4d01391f
...
...
@@ -8,11 +8,16 @@
import
os
import
sys
import
numpy
from
bliss.data
import
start_listener
from
bliss.common
import
scans
from
bliss.scanning.scan
import
ScanDisplay
from
bliss.scanning.scan
import
Scan
,
StepScanDataWatch
from
bliss.scanning.chain
import
AcquisitionChain
,
AcquisitionDevice
from
bliss.scanning.channel
import
AcquisitionChannel
from
bliss.scanning.acquisition
import
timer
import
subprocess
import
gevent
import
pytest
...
...
@@ -21,6 +26,75 @@ import pytest
# repl.ERROR_REPORT.expert_mode = True
def
test_fast_scan
(
nb
=
1234
,
chunk
=
20
):
class
BlockDataDevice
(
AcquisitionDevice
):
def
__init__
(
self
,
npoints
,
chunk
):
super
().
__init__
(
None
,
"block_data_device"
,
npoints
=
npoints
,
prepare_once
=
True
,
start_once
=
True
,
)
self
.
event
=
gevent
.
event
.
Event
()
self
.
channels
.
append
(
AcquisitionChannel
(
self
,
"block_data"
,
numpy
.
int
,
()))
self
.
pending_trigger
=
0
self
.
chunk
=
chunk
def
prepare
(
self
):
pass
def
start
(
self
):
pass
def
stop
(
self
):
pass
def
trigger
(
self
):
self
.
pending_trigger
+=
1
self
.
event
.
set
()
def
wait_ready
(
self
):
return
True
def
reading
(
self
):
data
=
numpy
.
arange
(
self
.
npoints
,
dtype
=
numpy
.
int
)
acq_npoint
=
0
chunk
=
self
.
chunk
i
=
0
while
acq_npoint
<
self
.
npoints
:
# print("acq_npoint",acq_npoint,i)
while
not
self
.
pending_trigger
:
self
.
event
.
clear
()
self
.
event
.
wait
()
self
.
pending_trigger
-=
1
if
(
acq_npoint
+
1
)
%
(
chunk
)
==
0
and
acq_npoint
:
self
.
channels
[
0
].
emit
(
data
[
i
*
chunk
:
(
i
+
1
)
*
chunk
])
i
+=
1
acq_npoint
+=
1
if
self
.
npoints
-
i
*
chunk
>
0
:
# print("remain",self.npoints - i*chunk ,i*chunk)
self
.
channels
[
0
].
emit
(
data
[
i
*
chunk
:])
soft_timer
=
timer
.
SoftwareTimerMaster
(
0
,
npoints
=
nb
)
block_data_device
=
BlockDataDevice
(
nb
,
chunk
)
acq_chain
=
AcquisitionChain
()
acq_chain
.
add
(
soft_timer
,
block_data_device
)
s
=
Scan
(
acq_chain
,
"scan"
,
{
"type"
:
"fast_scan"
,
"npoints"
:
nb
},
save
=
False
,
data_watch_callback
=
StepScanDataWatch
(),
)
s
.
run
()
def
grab_lines
(
subproc
,
lines
,
timeout
=
30
,
finish_line
=
"PRESS F5 TO COME BACK TO THE SHELL PROMPT"
):
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment