Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Bliss
bliss
Commits
07264a8b
Commit
07264a8b
authored
Jun 06, 2021
by
Valentin Valls
Browse files
Refactor the test using hardcoded scan_info
parent
01db9c19
Pipeline
#48088
passed with stages
in 124 minutes and 42 seconds
Changes
7
Pipelines
1
Expand all
Hide whitespace changes
Inline
Side-by-side
tests/qt/flint/factory.py
0 → 100644
View file @
07264a8b
"""Helper for tests"""
class
ScanInfoFactory
:
def
__init__
(
self
):
self
.
__scan_info
=
{
"acquisition_chain"
:
{},
"devices"
:
{},
"channels"
:
{},
"positioners"
:
{},
}
def
__setitem__
(
self
,
key
,
value
):
self
.
__scan_info
[
key
]
=
value
def
add_device
(
self
,
root_id
:
str
,
device_id
:
str
,
meta
:
dict
=
{},
triggered_by
:
str
=
None
,
type
:
str
=
None
,
):
default_root
=
{
"devices"
:
[]}
acq_root
=
self
.
__scan_info
[
"acquisition_chain"
].
setdefault
(
root_id
,
default_root
)
acq_root
[
"devices"
].
append
(
device_id
)
default_device
=
{
"channels"
:
[]}
device_desc
=
self
.
__scan_info
[
"devices"
].
setdefault
(
device_id
,
default_device
)
device_desc
.
update
(
meta
)
if
type
is
not
None
:
device_desc
[
"type"
]
=
type
if
triggered_by
is
not
None
:
triggering_dev
=
self
.
__scan_info
[
"devices"
][
triggered_by
]
triggered_devs
=
triggering_dev
.
setdefault
(
"triggered_devices"
,
[])
triggered_devs
.
append
(
device_id
)
def
add_lima_device
(
self
,
root_id
:
str
,
device_id
:
str
,
image
:
bool
=
False
,
rois
:
dict
=
None
,
triggered_by
:
str
=
None
,
):
self
.
add_device
(
root_id
=
root_id
,
device_id
=
device_id
,
triggered_by
=
triggered_by
,
type
=
"lima"
)
if
image
:
self
.
add_channel
(
device_id
+
":image"
,
device_id
=
device_id
,
dim
=
2
)
if
rois
is
not
None
:
self
.
add_device
(
root_id
=
root_id
,
device_id
=
device_id
+
":roi_counters"
,
triggered_by
=
device_id
,
meta
=
rois
,
)
def
add_channel
(
self
,
channel_id
:
str
,
device_id
:
str
=
None
,
meta
:
dict
=
{},
dim
:
int
=
None
,
unit
:
str
=
None
,
):
if
device_id
is
None
:
device_id
=
channel_id
.
rsplit
(
":"
,
1
)[
0
]
device_desc
=
self
.
__scan_info
[
"devices"
][
device_id
]
device_desc
[
"channels"
].
append
(
channel_id
)
channel_desc
=
self
.
__scan_info
[
"channels"
].
setdefault
(
channel_id
,
{})
channel_desc
.
update
(
meta
)
if
dim
is
not
None
:
channel_desc
[
"dim"
]
=
dim
if
unit
is
not
None
:
channel_desc
[
"unit"
]
=
unit
def
scan_info
(
self
):
scan_info
=
self
.
__scan_info
self
.
__scan_info
=
None
return
scan_info
tests/qt/flint/helper/__init__.py
0 → 100644
View file @
07264a8b
tests/qt/flint/helper/test_scan_info_helper.py
View file @
07264a8b
This diff is collapsed.
Click to expand it.
tests/qt/flint/manager/__init__.py
0 → 100644
View file @
07264a8b
tests/qt/flint/manager/test_manager.py
View file @
07264a8b
...
...
@@ -7,117 +7,72 @@ from bliss.flint.model import plot_item_model
from
bliss.flint.widgets.curve_plot
import
CurvePlotWidget
from
bliss.flint.helper
import
scan_info_helper
from
bliss.flint.helper
import
model_helper
from
tests.qt.flint.factory
import
ScanInfoFactory
SCAN_INFO_LIMA_ROIS
=
{
"acquisition_chain"
:
{
"timer"
:
{
"devices"
:
[
"timer"
,
"beamviewer"
,
"beamviewer:roi_counters"
]}
},
"devices"
:
{
"timer"
:
{
"channels"
:
[
"timer:elapsed_time"
,
"timer:epoch"
],
"triggered_devices"
:
[
"beamviewer"
],
},
"beamviewer"
:
{
"type"
:
"lima"
,
"triggered_devices"
:
[
"beamviewer:roi_counters"
],
"channels"
:
[
"beamviewer:image"
],
},
"beamviewer:roi_counters"
:
{
"channels"
:
[
"beamviewer:roi_counters:roi1_sum"
,
"beamviewer:roi_counters:roi2_sum"
,
],
"roi1"
:
{
"kind"
:
"rect"
,
"x"
:
190
,
"y"
:
110
,
"width"
:
600
,
"height"
:
230
},
"roi2"
:
{
"kind"
:
"arc"
,
"cx"
:
487.0
,
"cy"
:
513.0
,
"r1"
:
137.0
,
"r2"
:
198.0
,
"a1"
:
-
172.0
,
"a2"
:
-
300.0
,
},
},
},
"channels"
:
{
"timer:elapsed_time"
:
{
"dim"
:
0
},
"timer:epoch"
:
{
"dim"
:
0
},
"beamviewer:roi_counters:roi1_sum"
:
{
"dim"
:
0
},
"beamviewer:roi_counters:roi2_sum"
:
{
"dim"
:
0
},
"beamviewer:image"
:
{
"dim"
:
2
},
},
}
def
_create_loopscan_scan_info
(
extra_name
=
None
):
result
=
{
"type"
:
"ascan"
,
"acquisition_chain"
:
{
"main"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
"timer:elapsed_time"
,
"timer:epoch"
],
"triggered_devices"
:
[
"slave"
],
},
"slave"
:
{
"channels"
:
[
"timer:elapsed_time"
,
"timer:epoch"
,
"simulation_diode_sampling_controller:diode1"
,
"simulation_diode_sampling_controller:diode2"
,
]
},
},
"channels"
:
{
"timer:elapsed_time"
:
{
"dim"
:
0
},
"timer:epoch"
:
{
"dim"
:
0
},
"simulation_diode_sampling_controller:diode1"
:
{
"dim"
:
0
},
"simulation_diode_sampling_controller:diode2"
:
{
"dim"
:
0
},
},
}
if
extra_name
is
not
None
:
result
[
"devices"
][
"slave"
][
"channels"
].
append
(
extra_name
)
result
[
"channels"
][
extra_name
]
=
{
"dim"
:
0
}
return
result
def
_create_loopscan_scan_info
():
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"timer"
,
device_id
=
"timer"
)
factory
.
add_channel
(
channel_id
=
"timer:elapsed_time"
,
dim
=
0
,
unit
=
"s"
)
factory
.
add_channel
(
channel_id
=
"timer:epoch"
,
dim
=
0
,
unit
=
"s"
)
factory
.
add_device
(
root_id
=
"timer"
,
device_id
=
"diode"
,
triggered_by
=
"timer"
)
factory
.
add_channel
(
channel_id
=
"diode:diode1"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"diode:diode2"
,
dim
=
0
)
factory
[
"type"
]
=
"loopscan"
return
factory
.
scan_info
()
def
_create_ascan_scan_info
(
master_name
,
extra_name
=
None
):
result
=
{
"type"
:
"ascan"
,
"acquisition_chain"
:
{
"main"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
master_name
],
"triggered_devices"
:
[
"slave"
]},
"slave"
:
{
"channels"
:
[
"timer:elapsed_time"
,
"timer:epoch"
,
"simulation_diode_sampling_controller:diode1"
,
"simulation_diode_sampling_controller:diode2"
,
]
},
},
"channels"
:
{
master_name
:
{
"dim"
:
0
},
"timer:elapsed_time"
:
{
"dim"
:
0
},
"timer:epoch"
:
{
"dim"
:
0
},
"simulation_diode_sampling_controller:diode1"
:
{
"dim"
:
0
},
"simulation_diode_sampling_controller:diode2"
:
{
"dim"
:
0
},
},
}
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"ascan"
,
device_id
=
"master"
)
factory
.
add_channel
(
channel_id
=
master_name
,
device_id
=
"master"
,
dim
=
0
)
factory
.
add_device
(
root_id
=
"ascan"
,
device_id
=
"slave"
,
triggered_by
=
"master"
)
factory
.
add_channel
(
channel_id
=
"timer:elapsed_time"
,
device_id
=
"slave"
,
dim
=
0
,
unit
=
"s"
)
factory
.
add_channel
(
channel_id
=
"timer:epoch"
,
device_id
=
"slave"
,
dim
=
0
,
unit
=
"s"
)
factory
.
add_channel
(
channel_id
=
"diode:diode1"
,
device_id
=
"slave"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"diode:diode2"
,
device_id
=
"slave"
,
dim
=
0
)
if
extra_name
is
not
None
:
result
[
"devices"
][
"slave"
][
"channels"
].
append
(
extra_name
)
result
[
"channels"
][
extra_name
]
=
{
"dim"
:
0
}
return
result
factory
.
add_channel
(
channel_id
=
extra_name
,
device_id
=
"master"
,
dim
=
0
)
factory
[
"type"
]
=
"ascan"
return
factory
.
scan_info
()
def
_create_lima_scan_info
(
include_roi2
):
"""
Simulate a scan containing a lima detector with ROIs.
"""
result
=
copy
.
deepcopy
(
SCAN_INFO_LIMA_ROIS
)
if
not
include_roi2
:
del
result
[
"devices"
][
"beamviewer:roi_counters"
][
"roi2"
]
return
result
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"timer"
,
device_id
=
"timer"
)
factory
.
add_channel
(
channel_id
=
"timer:elapsed_time"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"timer:epoch"
,
dim
=
0
)
rois
=
{
"roi1"
:
{
"kind"
:
"rect"
,
"x"
:
190
,
"y"
:
110
,
"width"
:
600
,
"height"
:
230
}}
if
include_roi2
:
rois
[
"roi2"
]
=
{
"kind"
:
"arc"
,
"cx"
:
487.0
,
"cy"
:
513.0
,
"r1"
:
137.0
,
"r2"
:
198.0
,
"a1"
:
-
172.0
,
"a2"
:
-
300.0
,
}
factory
.
add_lima_device
(
device_id
=
"beamviewer"
,
root_id
=
"timer"
,
triggered_by
=
"timer"
,
image
=
True
,
rois
=
rois
,
)
factory
.
add_channel
(
channel_id
=
"beamviewer:roi_counters:roi1_sum"
,
dim
=
0
)
if
include_roi2
:
factory
.
add_channel
(
channel_id
=
"beamviewer:roi_counters:roi2_sum"
,
dim
=
0
)
scan_info
=
factory
.
scan_info
()
return
scan_info
def
test_curve_plot__from_loopscan_to_ascan
(
local_flint
):
...
...
@@ -176,12 +131,7 @@ def test_curve_plot__user_selection(local_flint):
# user selection
model_helper
.
updateDisplayedChannelNames
(
plot
,
scan
,
[
"simulation_diode_sampling_controller:diode1"
,
"simulation_diode_sampling_controller:diode2"
,
],
plot
,
scan
,
[
"diode:diode1"
,
"diode:diode2"
]
)
plot
.
tagUserEditTime
()
...
...
tests/qt/flint/manager/test_scan_manager.py
View file @
07264a8b
...
...
@@ -4,34 +4,39 @@ import numpy
from
bliss.flint.manager
import
scan_manager
from
bliss.data.lima_image
import
ImageFormatNotSupported
from
bliss.data.lima_image
import
Frame
from
tests.qt.flint.factory
import
ScanInfoFactory
SCAN_INFO_1
=
{
"acquisition_chain"
:
{
"main"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
"axis:roby"
],
"triggered_devices"
:
[
"slave"
]},
"slave"
:
{
"channels"
:
[
"timer:elapsed_time"
]},
},
"channels"
:
{
"axis:roby"
:
{
"dim"
:
0
},
"timer:elapsed_time"
:
{
"dim"
:
0
}},
}
SCAN_INFO_2
=
{
"acquisition_chain"
:
{
"main"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
"axis:robz"
],
"triggered_devices"
:
[
"slave"
]},
"slave"
:
{
"channels"
:
[
"timer:elapsed_time"
]},
},
"channels"
:
{
"axis:robz"
:
{
"dim"
:
0
},
"timer:elapsed_time"
:
{
"dim"
:
0
}},
}
SCAN_INFO_3
=
{
"acquisition_chain"
:
{
"main"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
"lima:image"
],
"triggered_devices"
:
[
"slave"
]},
"slave"
:
{
"channels"
:
[]},
},
"channels"
:
{
"lima:image"
:
{
"dim"
:
2
}},
}
def
_create_scan_info_1
(
node_name
:
str
):
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"main"
,
device_id
=
"master"
)
factory
.
add_channel
(
channel_id
=
"axis:roby"
,
device_id
=
"master"
,
dim
=
0
)
factory
.
add_device
(
root_id
=
"main"
,
device_id
=
"slave"
,
triggered_by
=
"master"
)
factory
.
add_channel
(
channel_id
=
"timer:elapsed_time"
,
device_id
=
"slave"
,
dim
=
0
,
unit
=
"s"
)
factory
[
"node_name"
]
=
node_name
return
factory
.
scan_info
()
def
_create_scan_info_2
(
node_name
:
str
):
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"main"
,
device_id
=
"master"
)
factory
.
add_channel
(
channel_id
=
"axis:robz"
,
device_id
=
"master"
,
dim
=
0
)
factory
.
add_device
(
root_id
=
"main"
,
device_id
=
"slave"
,
triggered_by
=
"master"
)
factory
.
add_channel
(
channel_id
=
"timer:elapsed_time"
,
device_id
=
"slave"
,
dim
=
0
,
unit
=
"s"
)
factory
[
"node_name"
]
=
node_name
return
factory
.
scan_info
()
def
_create_scan_info_3
(
node_name
:
str
):
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"main"
,
device_id
=
"master"
)
factory
.
add_channel
(
channel_id
=
"lima:image"
,
device_id
=
"master"
,
dim
=
2
)
factory
[
"node_name"
]
=
node_name
return
factory
.
scan_info
()
class
MockedScanManager
(
scan_manager
.
ScanManager
):
...
...
@@ -58,8 +63,8 @@ def _create_scan_info(node_name, base_scan_info):
def
test_interleaved_scans
():
scan_info_1
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_1
)
scan_info_2
=
_create_scan_info
(
"scan2"
,
SCAN_INFO_2
)
scan_info_1
=
_create_scan_info
_1
(
"scan1"
)
scan_info_2
=
_create_scan_info
_2
(
"scan2"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
# Disabled async consumption
...
...
@@ -90,8 +95,8 @@ def test_interleaved_scans():
def
test_sequencial_scans
():
scan_info_1
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_1
)
scan_info_2
=
_create_scan_info
(
"scan2"
,
SCAN_INFO_2
)
scan_info_1
=
_create_scan_info
_1
(
"scan1"
)
scan_info_2
=
_create_scan_info
_2
(
"scan2"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
...
...
@@ -115,7 +120,7 @@ def test_sequencial_scans():
def
test_bad_sequence__end_before_new
():
scan_info_1
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_1
)
scan_info_1
=
_create_scan_info
_1
(
"scan1"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
manager
.
emit_scan_finished
(
scan_info_1
)
...
...
@@ -166,7 +171,7 @@ class MockedLimaNode:
def
test_image__default
():
scan_info_3
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_3
)
scan_info_3
=
_create_scan_info
_3
(
"scan1"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
...
...
@@ -187,7 +192,7 @@ def test_image__default():
def
test_image__disable_video
():
scan_info_3
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_3
)
scan_info_3
=
_create_scan_info
_3
(
"scan1"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
...
...
@@ -212,7 +217,7 @@ def test_image__disable_video():
def
test_image__decoding_error
():
scan_info_3
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_3
)
scan_info_3
=
_create_scan_info
_3
(
"scan1"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
...
...
@@ -237,7 +242,7 @@ def test_image__decoding_error():
def
test_prefered_user_refresh
():
scan_info_3
=
_create_scan_info
(
"scan1"
,
SCAN_INFO_3
)
scan_info_3
=
_create_scan_info
_3
(
"scan1"
)
manager
=
MockedScanManager
(
flintModel
=
None
)
...
...
@@ -270,7 +275,7 @@ def test_prefered_user_refresh():
def
test_scalar_data_lost
():
scan_db_name
=
"scan1"
scan_info_1
=
_create_scan_info
(
scan_db_name
,
SCAN_INFO_1
)
scan_info_1
=
_create_scan_info
_1
(
scan_db_name
)
manager
=
MockedScanManager
(
flintModel
=
None
)
# Disabled async consumption
...
...
tests/qt/flint/model/test_scan_model.py
View file @
07264a8b
...
...
@@ -4,39 +4,39 @@ import pytest
import
numpy
from
bliss.flint.model
import
scan_model
from
bliss.flint.helper
import
scan_info_helper
SCATTER_SCAN_INFO
=
{
"acquisition_chain"
:
{
"master_time1"
:
{
"devices"
:
[
"master"
,
"slave"
]}},
"devices"
:
{
"master"
:
{
"channels"
:
[
"device1:channel1"
,
"device2:channel1"
,
"device2:channel2"
],
"triggered_devices"
:
[
"slave"
],
},
"slave"
:
{
"channels"
:
[
"device3:channel1"
,
"device4:channel1"
,
"master_time1:index"
,
"lima:image"
,
]
},
},
"channels"
:
{
"device1:channel1"
:
{
"unit"
:
"mm"
,
"dim"
:
0
},
"device2:channel1"
:
{
"unit"
:
"mm"
,
"dim"
:
0
},
"device2:channel2"
:
{
"unit"
:
"mm"
,
"dim"
:
0
},
"device3:channel1"
:
{
"unit"
:
"mm"
,
"dim"
:
0
},
"device4:channel1"
:
{
"unit"
:
"mm"
,
"dim"
:
0
},
"master_time1:index"
:
{
"unit"
:
"s"
,
"dim"
:
0
},
"lima:image"
:
{
"dim"
:
2
},
},
"data_dim"
:
2
,
}
from
tests.qt.flint.factory
import
ScanInfoFactory
def
_create_scatter_scan_info
():
factory
=
ScanInfoFactory
()
factory
.
add_device
(
root_id
=
"master_time1"
,
device_id
=
"master"
)
factory
.
add_channel
(
channel_id
=
"device1:channel1"
,
device_id
=
"master"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"device2:channel1"
,
device_id
=
"master"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"device2:channel2"
,
device_id
=
"master"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_device
(
root_id
=
"master_time1"
,
device_id
=
"slave"
,
triggered_by
=
"master"
)
factory
.
add_channel
(
channel_id
=
"device3:channel1"
,
device_id
=
"slave"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"device4:channel1"
,
device_id
=
"slave"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"master_time1:index"
,
device_id
=
"slave"
,
unit
=
"mm"
,
dim
=
0
)
factory
.
add_channel
(
channel_id
=
"lima:image"
,
device_id
=
"slave"
,
dim
=
2
)
return
factory
.
scan_info
()
def
test_scan_data_update_whole_channels
():
scan
=
scan_info_helper
.
create_scan_model
(
SCATTER_SCAN_INFO
)
scan_info
=
_create_scatter_scan_info
()
scan
=
scan_info_helper
.
create_scan_model
(
scan_info
)
event
=
scan_model
.
ScanDataUpdateEvent
(
scan
)
expected
=
[
"device1:channel1"
,
...
...
@@ -50,7 +50,8 @@ def test_scan_data_update_whole_channels():
def
test_scan_data_update_single_channel
():
scan
=
scan_info_helper
.
create_scan_model
(
SCATTER_SCAN_INFO
)
scan_info
=
_create_scatter_scan_info
()
scan
=
scan_info_helper
.
create_scan_model
(
scan_info
)
channel
=
scan
.
getChannelByName
(
"device2:channel2"
)
event
=
scan_model
.
ScanDataUpdateEvent
(
scan
,
channel
=
channel
)
expected
=
{
"device2:channel2"
}
...
...
@@ -58,7 +59,8 @@ def test_scan_data_update_single_channel():
def
test_scan_data_update_single_image_channel
():
scan
=
scan_info_helper
.
create_scan_model
(
SCATTER_SCAN_INFO
)
scan_info
=
_create_scatter_scan_info
()
scan
=
scan_info_helper
.
create_scan_model
(
scan_info
)
channel
=
scan
.
getChannelByName
(
"lima:image"
)
event
=
scan_model
.
ScanDataUpdateEvent
(
scan
,
channel
=
channel
)
expected
=
{
"lima:image"
}
...
...
@@ -66,7 +68,8 @@ def test_scan_data_update_single_image_channel():
def
test_scan_data_update_master_channels
():
scan
=
scan_info_helper
.
create_scan_model
(
SCATTER_SCAN_INFO
)
scan_info
=
_create_scatter_scan_info
()
scan
=
scan_info_helper
.
create_scan_model
(
scan_info
)
device
=
scan
.
getDeviceByName
(
"master_time1"
)
event
=
scan_model
.
ScanDataUpdateEvent
(
scan
,
masterDevice
=
device
)
expected
=
{
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment