Commit 76a97029 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'factory-for-scan-info-tests' into 'master'

Flint: Refactor the tests which was using hardcoded scan_info

Closes #2771

See merge request !3769
parents 3638092d 07264a8b
Pipeline #48229 failed with stages
in 104 minutes and 11 seconds
"""Helper for tests"""
class ScanInfoFactory:
def __init__(self):
self.__scan_info = {
"acquisition_chain": {},
"devices": {},
"channels": {},
"positioners": {},
}
def __setitem__(self, key, value):
self.__scan_info[key] = value
def add_device(
self,
root_id: str,
device_id: str,
meta: dict = {},
triggered_by: str = None,
type: str = None,
):
default_root = {"devices": []}
acq_root = self.__scan_info["acquisition_chain"].setdefault(
root_id, default_root
)
acq_root["devices"].append(device_id)
default_device = {"channels": []}
device_desc = self.__scan_info["devices"].setdefault(device_id, default_device)
device_desc.update(meta)
if type is not None:
device_desc["type"] = type
if triggered_by is not None:
triggering_dev = self.__scan_info["devices"][triggered_by]
triggered_devs = triggering_dev.setdefault("triggered_devices", [])
triggered_devs.append(device_id)
def add_lima_device(
self,
root_id: str,
device_id: str,
image: bool = False,
rois: dict = None,
triggered_by: str = None,
):
self.add_device(
root_id=root_id, device_id=device_id, triggered_by=triggered_by, type="lima"
)
if image:
self.add_channel(device_id + ":image", device_id=device_id, dim=2)
if rois is not None:
self.add_device(
root_id=root_id,
device_id=device_id + ":roi_counters",
triggered_by=device_id,
meta=rois,
)
def add_channel(
self,
channel_id: str,
device_id: str = None,
meta: dict = {},
dim: int = None,
unit: str = None,
):
if device_id is None:
device_id = channel_id.rsplit(":", 1)[0]
device_desc = self.__scan_info["devices"][device_id]
device_desc["channels"].append(channel_id)
channel_desc = self.__scan_info["channels"].setdefault(channel_id, {})
channel_desc.update(meta)
if dim is not None:
channel_desc["dim"] = dim
if unit is not None:
channel_desc["unit"] = unit
def scan_info(self):
scan_info = self.__scan_info
self.__scan_info = None
return scan_info
......@@ -7,117 +7,72 @@ from bliss.flint.model import plot_item_model
from bliss.flint.widgets.curve_plot import CurvePlotWidget
from bliss.flint.helper import scan_info_helper
from bliss.flint.helper import model_helper
from tests.qt.flint.factory import ScanInfoFactory
SCAN_INFO_LIMA_ROIS = {
"acquisition_chain": {
"timer": {"devices": ["timer", "beamviewer", "beamviewer:roi_counters"]}
},
"devices": {
"timer": {
"channels": ["timer:elapsed_time", "timer:epoch"],
"triggered_devices": ["beamviewer"],
},
"beamviewer": {
"type": "lima",
"triggered_devices": ["beamviewer:roi_counters"],
"channels": ["beamviewer:image"],
},
"beamviewer:roi_counters": {
"channels": [
"beamviewer:roi_counters:roi1_sum",
"beamviewer:roi_counters:roi2_sum",
],
"roi1": {"kind": "rect", "x": 190, "y": 110, "width": 600, "height": 230},
"roi2": {
"kind": "arc",
"cx": 487.0,
"cy": 513.0,
"r1": 137.0,
"r2": 198.0,
"a1": -172.0,
"a2": -300.0,
},
},
},
"channels": {
"timer:elapsed_time": {"dim": 0},
"timer:epoch": {"dim": 0},
"beamviewer:roi_counters:roi1_sum": {"dim": 0},
"beamviewer:roi_counters:roi2_sum": {"dim": 0},
"beamviewer:image": {"dim": 2},
},
}
def _create_loopscan_scan_info(extra_name=None):
result = {
"type": "ascan",
"acquisition_chain": {"main": {"devices": ["master", "slave"]}},
"devices": {
"master": {
"channels": ["timer:elapsed_time", "timer:epoch"],
"triggered_devices": ["slave"],
},
"slave": {
"channels": [
"timer:elapsed_time",
"timer:epoch",
"simulation_diode_sampling_controller:diode1",
"simulation_diode_sampling_controller:diode2",
]
},
},
"channels": {
"timer:elapsed_time": {"dim": 0},
"timer:epoch": {"dim": 0},
"simulation_diode_sampling_controller:diode1": {"dim": 0},
"simulation_diode_sampling_controller:diode2": {"dim": 0},
},
}
if extra_name is not None:
result["devices"]["slave"]["channels"].append(extra_name)
result["channels"][extra_name] = {"dim": 0}
return result
def _create_loopscan_scan_info():
factory = ScanInfoFactory()
factory.add_device(root_id="timer", device_id="timer")
factory.add_channel(channel_id="timer:elapsed_time", dim=0, unit="s")
factory.add_channel(channel_id="timer:epoch", dim=0, unit="s")
factory.add_device(root_id="timer", device_id="diode", triggered_by="timer")
factory.add_channel(channel_id="diode:diode1", dim=0)
factory.add_channel(channel_id="diode:diode2", dim=0)
factory["type"] = "loopscan"
return factory.scan_info()
def _create_ascan_scan_info(master_name, extra_name=None):
result = {
"type": "ascan",
"acquisition_chain": {"main": {"devices": ["master", "slave"]}},
"devices": {
"master": {"channels": [master_name], "triggered_devices": ["slave"]},
"slave": {
"channels": [
"timer:elapsed_time",
"timer:epoch",
"simulation_diode_sampling_controller:diode1",
"simulation_diode_sampling_controller:diode2",
]
},
},
"channels": {
master_name: {"dim": 0},
"timer:elapsed_time": {"dim": 0},
"timer:epoch": {"dim": 0},
"simulation_diode_sampling_controller:diode1": {"dim": 0},
"simulation_diode_sampling_controller:diode2": {"dim": 0},
},
}
factory = ScanInfoFactory()
factory.add_device(root_id="ascan", device_id="master")
factory.add_channel(channel_id=master_name, device_id="master", dim=0)
factory.add_device(root_id="ascan", device_id="slave", triggered_by="master")
factory.add_channel(
channel_id="timer:elapsed_time", device_id="slave", dim=0, unit="s"
)
factory.add_channel(channel_id="timer:epoch", device_id="slave", dim=0, unit="s")
factory.add_channel(channel_id="diode:diode1", device_id="slave", dim=0)
factory.add_channel(channel_id="diode:diode2", device_id="slave", dim=0)
if extra_name is not None:
result["devices"]["slave"]["channels"].append(extra_name)
result["channels"][extra_name] = {"dim": 0}
return result
factory.add_channel(channel_id=extra_name, device_id="master", dim=0)
factory["type"] = "ascan"
return factory.scan_info()
def _create_lima_scan_info(include_roi2):
"""
Simulate a scan containing a lima detector with ROIs.
"""
result = copy.deepcopy(SCAN_INFO_LIMA_ROIS)
if not include_roi2:
del result["devices"]["beamviewer:roi_counters"]["roi2"]
return result
factory = ScanInfoFactory()
factory.add_device(root_id="timer", device_id="timer")
factory.add_channel(channel_id="timer:elapsed_time", dim=0)
factory.add_channel(channel_id="timer:epoch", dim=0)
rois = {"roi1": {"kind": "rect", "x": 190, "y": 110, "width": 600, "height": 230}}
if include_roi2:
rois["roi2"] = {
"kind": "arc",
"cx": 487.0,
"cy": 513.0,
"r1": 137.0,
"r2": 198.0,
"a1": -172.0,
"a2": -300.0,
}
factory.add_lima_device(
device_id="beamviewer",
root_id="timer",
triggered_by="timer",
image=True,
rois=rois,
)
factory.add_channel(channel_id="beamviewer:roi_counters:roi1_sum", dim=0)
if include_roi2:
factory.add_channel(channel_id="beamviewer:roi_counters:roi2_sum", dim=0)
scan_info = factory.scan_info()
return scan_info
def test_curve_plot__from_loopscan_to_ascan(local_flint):
......@@ -176,12 +131,7 @@ def test_curve_plot__user_selection(local_flint):
# user selection
model_helper.updateDisplayedChannelNames(
plot,
scan,
[
"simulation_diode_sampling_controller:diode1",
"simulation_diode_sampling_controller:diode2",
],
plot, scan, ["diode:diode1", "diode:diode2"]
)
plot.tagUserEditTime()
......
......@@ -4,34 +4,39 @@ import numpy
from bliss.flint.manager import scan_manager
from bliss.data.lima_image import ImageFormatNotSupported
from bliss.data.lima_image import Frame
from tests.qt.flint.factory import ScanInfoFactory
SCAN_INFO_1 = {
"acquisition_chain": {"main": {"devices": ["master", "slave"]}},
"devices": {
"master": {"channels": ["axis:roby"], "triggered_devices": ["slave"]},
"slave": {"channels": ["timer:elapsed_time"]},
},
"channels": {"axis:roby": {"dim": 0}, "timer:elapsed_time": {"dim": 0}},
}
SCAN_INFO_2 = {
"acquisition_chain": {"main": {"devices": ["master", "slave"]}},
"devices": {
"master": {"channels": ["axis:robz"], "triggered_devices": ["slave"]},
"slave": {"channels": ["timer:elapsed_time"]},
},
"channels": {"axis:robz": {"dim": 0}, "timer:elapsed_time": {"dim": 0}},
}
SCAN_INFO_3 = {
"acquisition_chain": {"main": {"devices": ["master", "slave"]}},
"devices": {
"master": {"channels": ["lima:image"], "triggered_devices": ["slave"]},
"slave": {"channels": []},
},
"channels": {"lima:image": {"dim": 2}},
}
def _create_scan_info_1(node_name: str):
factory = ScanInfoFactory()
factory.add_device(root_id="main", device_id="master")
factory.add_channel(channel_id="axis:roby", device_id="master", dim=0)
factory.add_device(root_id="main", device_id="slave", triggered_by="master")
factory.add_channel(
channel_id="timer:elapsed_time", device_id="slave", dim=0, unit="s"
)
factory["node_name"] = node_name
return factory.scan_info()
def _create_scan_info_2(node_name: str):
factory = ScanInfoFactory()
factory.add_device(root_id="main", device_id="master")
factory.add_channel(channel_id="axis:robz", device_id="master", dim=0)
factory.add_device(root_id="main", device_id="slave", triggered_by="master")
factory.add_channel(
channel_id="timer:elapsed_time", device_id="slave", dim=0, unit="s"
)
factory["node_name"] = node_name
return factory.scan_info()
def _create_scan_info_3(node_name: str):
factory = ScanInfoFactory()
factory.add_device(root_id="main", device_id="master")
factory.add_channel(channel_id="lima:image", device_id="master", dim=2)
factory["node_name"] = node_name
return factory.scan_info()
class MockedScanManager(scan_manager.ScanManager):
......@@ -58,8 +63,8 @@ def _create_scan_info(node_name, base_scan_info):
def test_interleaved_scans():
scan_info_1 = _create_scan_info("scan1", SCAN_INFO_1)
scan_info_2 = _create_scan_info("scan2", SCAN_INFO_2)
scan_info_1 = _create_scan_info_1("scan1")
scan_info_2 = _create_scan_info_2("scan2")
manager = MockedScanManager(flintModel=None)
# Disabled async consumption
......@@ -90,8 +95,8 @@ def test_interleaved_scans():
def test_sequencial_scans():
scan_info_1 = _create_scan_info("scan1", SCAN_INFO_1)
scan_info_2 = _create_scan_info("scan2", SCAN_INFO_2)
scan_info_1 = _create_scan_info_1("scan1")
scan_info_2 = _create_scan_info_2("scan2")
manager = MockedScanManager(flintModel=None)
......@@ -115,7 +120,7 @@ def test_sequencial_scans():
def test_bad_sequence__end_before_new():
scan_info_1 = _create_scan_info("scan1", SCAN_INFO_1)
scan_info_1 = _create_scan_info_1("scan1")
manager = MockedScanManager(flintModel=None)
manager.emit_scan_finished(scan_info_1)
......@@ -166,7 +171,7 @@ class MockedLimaNode:
def test_image__default():
scan_info_3 = _create_scan_info("scan1", SCAN_INFO_3)
scan_info_3 = _create_scan_info_3("scan1")
manager = MockedScanManager(flintModel=None)
......@@ -187,7 +192,7 @@ def test_image__default():
def test_image__disable_video():
scan_info_3 = _create_scan_info("scan1", SCAN_INFO_3)
scan_info_3 = _create_scan_info_3("scan1")
manager = MockedScanManager(flintModel=None)
......@@ -212,7 +217,7 @@ def test_image__disable_video():
def test_image__decoding_error():
scan_info_3 = _create_scan_info("scan1", SCAN_INFO_3)
scan_info_3 = _create_scan_info_3("scan1")
manager = MockedScanManager(flintModel=None)
......@@ -237,7 +242,7 @@ def test_image__decoding_error():
def test_prefered_user_refresh():
scan_info_3 = _create_scan_info("scan1", SCAN_INFO_3)
scan_info_3 = _create_scan_info_3("scan1")
manager = MockedScanManager(flintModel=None)
......@@ -270,7 +275,7 @@ def test_prefered_user_refresh():
def test_scalar_data_lost():
scan_db_name = "scan1"
scan_info_1 = _create_scan_info(scan_db_name, SCAN_INFO_1)
scan_info_1 = _create_scan_info_1(scan_db_name)
manager = MockedScanManager(flintModel=None)
# Disabled async consumption
......
......@@ -4,39 +4,39 @@ import pytest
import numpy
from bliss.flint.model import scan_model
from bliss.flint.helper import scan_info_helper
SCATTER_SCAN_INFO = {
"acquisition_chain": {"master_time1": {"devices": ["master", "slave"]}},
"devices": {
"master": {
"channels": ["device1:channel1", "device2:channel1", "device2:channel2"],
"triggered_devices": ["slave"],
},
"slave": {
"channels": [
"device3:channel1",
"device4:channel1",
"master_time1:index",
"lima:image",
]
},
},
"channels": {
"device1:channel1": {"unit": "mm", "dim": 0},
"device2:channel1": {"unit": "mm", "dim": 0},
"device2:channel2": {"unit": "mm", "dim": 0},
"device3:channel1": {"unit": "mm", "dim": 0},
"device4:channel1": {"unit": "mm", "dim": 0},
"master_time1:index": {"unit": "s", "dim": 0},
"lima:image": {"dim": 2},
},
"data_dim": 2,
}
from tests.qt.flint.factory import ScanInfoFactory
def _create_scatter_scan_info():
factory = ScanInfoFactory()
factory.add_device(root_id="master_time1", device_id="master")
factory.add_channel(
channel_id="device1:channel1", device_id="master", unit="mm", dim=0
)
factory.add_channel(
channel_id="device2:channel1", device_id="master", unit="mm", dim=0
)
factory.add_channel(
channel_id="device2:channel2", device_id="master", unit="mm", dim=0
)
factory.add_device(root_id="master_time1", device_id="slave", triggered_by="master")
factory.add_channel(
channel_id="device3:channel1", device_id="slave", unit="mm", dim=0
)
factory.add_channel(
channel_id="device4:channel1", device_id="slave", unit="mm", dim=0
)
factory.add_channel(
channel_id="master_time1:index", device_id="slave", unit="mm", dim=0
)
factory.add_channel(channel_id="lima:image", device_id="slave", dim=2)
return factory.scan_info()
def test_scan_data_update_whole_channels():
scan = scan_info_helper.create_scan_model(SCATTER_SCAN_INFO)
scan_info = _create_scatter_scan_info()
scan = scan_info_helper.create_scan_model(scan_info)
event = scan_model.ScanDataUpdateEvent(scan)
expected = [
"device1:channel1",
......@@ -50,7 +50,8 @@ def test_scan_data_update_whole_channels():
def test_scan_data_update_single_channel():
scan = scan_info_helper.create_scan_model(SCATTER_SCAN_INFO)
scan_info = _create_scatter_scan_info()
scan = scan_info_helper.create_scan_model(scan_info)
channel = scan.getChannelByName("device2:channel2")
event = scan_model.ScanDataUpdateEvent(scan, channel=channel)
expected = {"device2:channel2"}
......@@ -58,7 +59,8 @@ def test_scan_data_update_single_channel():
def test_scan_data_update_single_image_channel():
scan = scan_info_helper.create_scan_model(SCATTER_SCAN_INFO)
scan_info = _create_scatter_scan_info()
scan = scan_info_helper.create_scan_model(scan_info)
channel = scan.getChannelByName("lima:image")
event = scan_model.ScanDataUpdateEvent(scan, channel=channel)
expected = {"lima:image"}
......@@ -66,7 +68,8 @@ def test_scan_data_update_single_image_channel():
def test_scan_data_update_master_channels():
scan = scan_info_helper.create_scan_model(SCATTER_SCAN_INFO)
scan_info = _create_scatter_scan_info()
scan = scan_info_helper.create_scan_model(scan_info)
device = scan.getDeviceByName("master_time1")
event = scan_model.ScanDataUpdateEvent(scan, masterDevice=device)
expected = {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment