Commit dccb8c5c authored by Valentin Valls's avatar Valentin Valls
Browse files

Use ROI definitions from device metadata

parent f5578ae0
......@@ -167,7 +167,6 @@ class ScanModelReader:
self._acquisition_chain_description = scan_info.get("acquisition_chain", {})
self._device_description = scan_info.get("devices", {})
self._channel_description = scan_info.get("channels", {})
self._rois_description = scan_info.get("rois", {})
scan_info = self._scan_info
is_group = scan_info.get("is-scan-sequence", False)
......@@ -321,26 +320,32 @@ class ScanModelReader:
pass
class LimaRoiDeviceParser(DefaultDeviceParser):
def parse_channels(self, device, meta):
def parse_channels(self, device: scan_model.Device, meta: Dict):
# cache virtual roi devices
virtual_rois = {}
# FIXME: It would be good to have a real ROI concept in BLISS
# Here we iterate the set of metadata to try to find something interesting
for roi_name, roi_dict in meta.items():
if not isinstance(roi_dict, dict):
continue
if "kind" not in roi_dict:
continue
roi_device = self.create_virtual_roi(roi_name, roi_dict, device)
virtual_rois[roi_name] = roi_device
def get_virtual_roi(channel_fullname):
"""Some magic to create virtual device for each ROIs"""
"""Retrieve roi device from channel name"""
nonlocal virtual_rois
short_name = channel_fullname.rsplit(":", 1)[-1]
# FIXME: It would be good to have a real ROI concept in BLISS
if "_" in short_name:
roi_name, _ = short_name.rsplit("_", 1)
else:
roi_name = short_name
key = f"{device.master().name()}:{device.name()}:{roi_name}"
if key in virtual_rois:
return virtual_rois[key]
roi_device = self.create_virtual_roi(roi_name, key, device)
virtual_rois[key] = roi_device
return roi_device
return virtual_rois.get(roi_name, None)
channel_names = meta.get("channels", [])
for channel_fullname in channel_names:
......@@ -354,22 +359,32 @@ class ScanModelReader:
)
continue
roi_device = get_virtual_roi(channel_fullname)
self.parse_channel(channel_fullname, channel_meta, parent=roi_device)
if roi_device is not None:
parent_channel = roi_device
else:
parent_channel = device
self.parse_channel(
channel_fullname, channel_meta, parent=parent_channel
)
def create_virtual_roi(self, roi_name, key, parent):
def create_virtual_roi(self, roi_name, roi_dict, parent):
device = scan_model.Device(self.reader._scan)
device.setName(roi_name)
device.setMaster(parent)
device.setType(scan_model.DeviceType.VIRTUAL_ROI)
# Read metadata
roi_dict = self.reader._rois_description.get(key)
roi = None
if roi_dict is not None:
try:
roi = lima_roi.dict_to_roi(roi_dict)
except Exception:
_logger.warning("Error while reading roi '%s'", key, exc_info=True)
_logger.warning(
"Error while reading roi '%s' from '%s'",
roi_name,
device.fullName(),
exc_info=True,
)
metadata = scan_model.DeviceMetadata({}, roi)
device.setMetadata(metadata)
......@@ -630,6 +645,9 @@ def create_plot_model(
Use the `plots` description or infer the plots from the `acquisition_chain`.
Finally update the selection using `_display_extra`.
"""
if scan is None:
scan = create_scan_model(scan_info)
if "plots" in scan_info:
plots = read_plot_models(scan_info)
for plot in plots:
......@@ -642,12 +660,12 @@ def create_plot_model(
return True
return False
aq_plots = infer_plot_models(scan_info)
aq_plots = infer_plot_models(scan)
for plot in aq_plots:
if not contains_default_plot_kind(plots, plot):
plots.append(plot)
else:
plots = infer_plot_models(scan_info)
plots = infer_plot_models(scan)
def filter_with_scan_content(channel_names, scan):
if scan is None:
......@@ -890,7 +908,35 @@ def _infer_default_scatter_plot(scan_info: Dict) -> List[plot_model.Plot]:
return plots
def infer_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
def _initialize_image_plot_from_device(device: scan_model.Device) -> plot_model.Plot:
"""Initialize ImagePlot with default information which can be used
structurally"""
plot = plot_item_model.ImagePlot()
# Reach a name which is stable between 2 scans
# FIXME: This have to be provided by the scan_info
def get_stable_name(device):
for channel in device.channels():
name = channel.name()
return name.rsplit(":", 1)[0]
return device.fullName().split(":", 1)[1]
stable_name = get_stable_name(device)
plot.setDeviceName(stable_name)
if device.type() == scan_model.DeviceType.LIMA:
for sub_device in device.devices():
if sub_device.name() in ["roi_counters", "roi_profiles"]:
for roi_device in sub_device.devices():
if roi_device.type() != scan_model.DeviceType.VIRTUAL_ROI:
continue
item = plot_item_model.RoiItem(plot)
item.setDeviceName(roi_device.fullName())
plot.addItem(item)
return plot
def infer_plot_models(scan: scan_model.Scan) -> List[plot_model.Plot]:
"""Infer description of plot models from a scan_info using
`acquisition_chain`.
......@@ -907,6 +953,7 @@ def infer_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
result: List[plot_model.Plot] = []
default_plot = None
scan_info = scan.scanInfo()
acquisition_chain = scan_info.get("acquisition_chain", None)
if len(acquisition_chain.keys()) == 1:
......@@ -1043,45 +1090,23 @@ def infer_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
# Image plot
for master_name in acquisition_chain.keys():
for device_id in acquisition_chain[master_name].get("devices", []):
device_info = scan_info["devices"].get(device_id, {})
device_type = device_info.get("type")
device_name = device_id.rsplit(":", 1)[-1]
plot = None
for channel_name in device_info.get("channels", []):
channel_info = scan_info["channels"].get(channel_name, {})
dim = channel_info.get("dim", 0)
if dim != 2:
continue
for device in scan.devices():
plot = None
for channel in device.channels():
if channel.type() != scan_model.ChannelType.IMAGE:
continue
if plot is None:
plot = plot_item_model.ImagePlot()
device_name = get_device_from_channel(channel_name)
plot.setDeviceName(device_name)
if default_plot is None:
default_plot = plot
if device_type == "lima":
if "rois" in scan_info:
for roi_name, _roi_dict in scan_info["rois"].items():
if not roi_name.startswith(
f"{device_name}:roi_counters:"
) and not roi_name.startswith(
f"{device_name}:roi_profiles:"
):
pass
item = plot_item_model.RoiItem(plot)
item.setDeviceName(f"{master_name}:{roi_name}")
plot.addItem(item)
image_channel = plot_model.ChannelRef(plot, channel_name)
item = plot_item_model.ImageItem(plot)
item.setImageChannel(image_channel)
plot.addItem(item)
if plot is None:
plot = _initialize_image_plot_from_device(device)
if default_plot is None:
default_plot = plot
if plot is not None:
result.append(plot)
image_channel = plot_model.ChannelRef(plot, channel.name())
item = plot_item_model.ImageItem(plot)
item.setImageChannel(image_channel)
plot.addItem(item)
if plot is not None:
result.append(plot)
# Move the default plot on top
if default_plot is not None:
......
......@@ -581,21 +581,18 @@ class ImagePlotWidget(plot_helper.PlotWidget):
if self.__scan is None:
return
master = None
limaDevice = None
for device in self.__scan.devices():
if device.name() != self.deviceName():
if device.type() != scan_model.DeviceType.LIMA:
continue
if device.master() and device.master().isMaster():
master = device
if device.name() == self.deviceName():
limaDevice = device
break
if master is None:
if limaDevice is None:
return
for device in self.__scan.devices():
if not device.isChildOf(master):
continue
for device in limaDevice.devices():
roi = device.metadata().roi
if roi is None:
continue
......
......@@ -64,7 +64,17 @@ SCAN_INFO_LIMA_ROIS = {
"beamviewer:roi_counters:roi4_sum",
"beamviewer:roi_counters:roi4_avg",
"beamviewer:roi_counters:roi5_avg",
]
],
"roi1": {"kind": "rect", "x": 190, "y": 110, "width": 600, "height": 230},
"roi4": {
"kind": "arc",
"cx": 487.0,
"cy": 513.0,
"r1": 137.0,
"r2": 198.0,
"a1": -172.0,
"a2": -300.0,
},
},
},
"channels": {
......@@ -77,24 +87,6 @@ SCAN_INFO_LIMA_ROIS = {
"beamviewer:roi_counters:roi5_avg": {"dim": 0},
"beamviewer:image": {"dim": 2},
},
"rois": {
"beamviewer:roi_counters:roi1": {
"kind": "rect",
"x": 190,
"y": 110,
"width": 600,
"height": 230,
},
"beamviewer:roi_counters:roi4": {
"kind": "arc",
"cx": 487.0,
"cy": 513.0,
"r1": 137.0,
"r2": 198.0,
"a1": -172.0,
"a2": -300.0,
},
},
}
......@@ -203,7 +195,7 @@ def test_create_scan_model_with_lima_rois():
for device in scan.devices():
channelCount += len(list(device.channels()))
assert channelCount == 8
assert deviceCount == 7
assert deviceCount == 6
channel = scan.getChannelByName("beamviewer:roi_counters:roi1_avg")
device = channel.device()
......@@ -452,7 +444,7 @@ def test_amesh_scan_with_image_and_mca():
def test_create_plot_model_with_rois():
scan = scan_info_helper.create_scan_model(SCAN_INFO_LIMA_ROIS)
plots = scan_info_helper.infer_plot_models(SCAN_INFO_LIMA_ROIS)
plots = scan_info_helper.infer_plot_models(scan)
image_plots = [p for p in plots if isinstance(p, plot_item_model.ImagePlot)]
plot = image_plots[0]
roi_items = [i for i in plot.items() if isinstance(i, plot_item_model.RoiItem)]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment