Commit 1cfaf047 authored by Valentin Valls's avatar Valentin Valls
Browse files

Do not store application based in scan_info

- Rework scan_info to provide:
   - `displayed_channels` if something is specified (plotinit)
   - `plotselect` which is always specified
- Add test on plotinit to be sure it works as expected
parent ca1ae9d4
Pipeline #30555 canceled with stages
in 34 minutes and 10 seconds
......@@ -265,9 +265,12 @@ def plotinit(*counters):
counters: String, alias, object identifying an object providing data to
record. It can be a counter name, a counter, an axis, an alias.
"""
scan_display = ScanDisplay()
channel_names = get_channel_names(*counters)
scan_display._plotinit(channel_names)
scan_display = current_session.scan_display
if len(counters) == 0:
channel_names = None
else:
channel_names = get_channel_names(*counters)
scan_display.next_scan_displayed_channels = channel_names
def plotselect(*counters):
......@@ -283,7 +286,7 @@ def plotselect(*counters):
"""
scan_display = ScanDisplay()
channel_names = get_channel_names(*counters)
scan_display._plotselect(channel_names)
scan_display.displayed_channels = channel_names
if flint_proxy.check_flint():
flint = flint_proxy.get_flint(mandatory=False)
......@@ -325,20 +328,8 @@ def get_next_plotted_counters():
"""
Returns names of counters that will be plotted for the next scan.
"""
scan_display = ScanDisplay()
displayed_channels = scan_display.flint_displayed_channels
if displayed_channels is None:
return []
else:
return displayed_channels
def get_nexus_plotted_counters():
"""
Returns names of counters that will be plotted in the Nexus file
"""
scan_display = ScanDisplay()
displayed_channels = scan_display.nexus_displayed_channels
scan_display = current_session.scan_display
displayed_channels = scan_display.next_scan_displayed_channels
if displayed_channels is None:
return []
else:
......
......@@ -320,7 +320,7 @@ def create_plot_model(
if display_extra is not None:
if scan is None:
scan = create_scan_model(scan_info)
displayed_channels = display_extra.get("flint_displayed_channels", None)
displayed_channels = display_extra.get("displayed_channels", None)
# Sanitize
if displayed_channels is not None:
if not isinstance(displayed_channels, list):
......
......@@ -298,7 +298,7 @@ class ManageMainBehaviours(qt.QObject):
useDefaultPlot = (
scan.scanInfo()
.get("_display_extra", {})
.get("flint_displayed_channels", None)
.get("displayed_channels", None)
is not None
)
else:
......
......@@ -1178,12 +1178,21 @@ class Scan:
def _prepare_scan_meta(self):
self._scan_info["filename"] = self.writer.filename
# User metadata:
# User metadata
self.user_scan_meta = get_user_scan_meta().copy()
self._update_scan_info_with_user_scan_meta()
# Plot metadata:
display_extra = self.__scan_display._pop_scan_metadata()
if display_extra:
# Plot metadata
display_extra = {}
displayed_channels = self.__scan_display.displayed_channels
if displayed_channels is not None:
# Contextual display request
display_extra["plotselect"] = displayed_channels
displayed_channels = self.__scan_display._pop_next_scan_displayed_channels()
if displayed_channels is not None:
# Structural display request specified for this scan
display_extra["displayed_channels"] = displayed_channels
if len(display_extra) > 0:
self._scan_info["_display_extra"] = display_extra
def disconnect_all(self):
......
......@@ -5,6 +5,7 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import typing
from bliss.common import deprecation
from bliss import current_session
from bliss.config.settings import ParametersWardrobe
......@@ -156,46 +157,34 @@ class ScanDisplay(ParametersWardrobe):
logger.disabled = not enabled
@property
def nexus_displayed_channels(self):
"""Will be used by the Nexus writer when saving a scan."""
return self._scan_metadata.get("nexus_displayed_channels")
def next_scan_displayed_channels(self) -> typing.Optional[typing.List[str]]:
"""Returns a list of forced channels to display with the next scan.
@nexus_displayed_channels.setter
def nexus_displayed_channels(self, values):
"""None and [] have the same effect"""
self._update_scan_metadata(nexus_displayed_channels=values)
If None, nothing was requested.
"""
return self._scan_metadata.get("displayed_channels", None)
@property
def flint_displayed_channels(self):
"""Will be used by Flint when displaying a new scan.
If `None` it Flint keeps the currently selected channels."""
return self._scan_metadata.get("flint_displayed_channels")
@flint_displayed_channels.setter
def flint_displayed_channels(self, values):
"""None and [] have a different effect"""
self._update_scan_metadata(flint_displayed_channels=values)
def _plotinit(self, channel_names):
"""Set the next Flint plot and Nexus plot"""
self.flint_displayed_channels = channel_names
self.nexus_displayed_channels = channel_names
def _plotselect(self, channel_names):
"""Set the current Flint plot and the next Nexus plot"""
self.displayed_channels = channel_names
self.nexus_displayed_channels = channel_names
def _pop_scan_metadata(self):
metadata = self._scan_metadata
# Preserve the display in Flint until the
# next call to `_plotinit`
self.flint_displayed_channels = None
return metadata
def _update_scan_metadata(self, **kw):
metadata = self._scan_metadata
if metadata is None:
metadata = {}
metadata.update(kw)
self._scan_metadata = metadata
@next_scan_displayed_channels.setter
def next_scan_displayed_channels(self, values: typing.Optional[typing.List[str]]):
"""Specify a list of specific channels to display with the next scan.
"""
meta = self._scan_metadata
if meta is None:
meta = {}
else:
meta = meta.copy()
if values is None:
if "displayed_channels" in meta:
del meta["displayed_channels"]
else:
meta["displayed_channels"] = values
self._scan_metadata = meta
def _pop_next_scan_displayed_channels(self) -> typing.Optional[typing.List[str]]:
"""Return the specific channels to display with the next scan, and
discard it.
"""
meta = self._scan_metadata.copy()
result = meta.pop("displayed_channels", None)
self._scan_metadata = meta
return result
......@@ -786,15 +786,11 @@ example:
names = plot_module.get_next_plotted_counters()
if names:
print("Reset counter(s) for the next scan:")
print("Plotted counter(s) for the next scan:")
for cnt_name in names:
print(f"- {cnt_name}")
else:
print("Plotted counter(s) for the next scan:")
print(" current selection")
print("\nPlotted counter(s) in the Nexus file:")
for cnt_name in plot_module.get_nexus_plotted_counters():
print(f"- {cnt_name}")
print("No specific counter(s) for the next scan")
print("")
......@@ -830,9 +826,6 @@ example:
)
for cnt_name in plot_module.get_plotted_counters():
print(f"- {cnt_name}")
print("\nPlotted counter(s) in the Nexus file:")
for cnt_name in plot_module.get_nexus_plotted_counters():
print(f"- {cnt_name}")
print("")
......
......@@ -645,9 +645,12 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
@property
def flintplots(self):
items = self.get_info("_display_extra", default={}).get(
"nexus_displayed_channels", []
)
display_extra = self.get_info("_display_extra", default={})
items = display_extra.get("displayed_channels", None)
if items is None:
items = display_extra.get("plotselect", None)
if items is None:
items = []
if items:
return {"plotselect": {"items": items, "grid": True}}
else:
......
......@@ -24,7 +24,7 @@ def _test_nxw_plotselect(
scan_saving.technique = ""
detectors = [env_dict[name] for name in ["diode3", "diode4", "diode5"]]
names = [env_dict[name].fullname for name in ["diode3", "diode4"]]
scan_display._plotselect(names)
scan_display.displayed_channels = names
plots = {}
plots["plotselect"] = {"ndim": 0, "type": "grid", "signals": ["diode3", "diode4"]}
......
......@@ -170,16 +170,16 @@ def test_plotinit(session):
sd = ScanDisplay()
plotinit("foo")
assert sd.flint_displayed_channels == ["foo"]
assert sd.next_scan_displayed_channels == ["foo"]
plotinit(roby)
assert sd.flint_displayed_channels == ["axis:roby"]
assert sd.next_scan_displayed_channels == ["axis:roby"]
plotinit(diode)
assert sd.flint_displayed_channels == [diode.fullname]
assert sd.next_scan_displayed_channels == [diode.fullname]
plotinit(diode.fullname)
assert sd.flint_displayed_channels == [diode.fullname]
assert sd.next_scan_displayed_channels == [diode.fullname]
plotinit(diode, roby)
assert sd.flint_displayed_channels == [diode.fullname, "axis:roby"]
assert sd.next_scan_displayed_channels == [diode.fullname, "axis:roby"]
def test_counter_argument_on_cen_com_peak(session):
......
......@@ -196,6 +196,24 @@ def test_meshselect(test_session_with_flint):
assert flint.test_count_displayed_items(plot_id) == 0
def test_plotinit__something(session):
plot.plotinit("aaa")
channels = plot.get_next_plotted_counters()
assert channels == ["aaa"]
def test_plotinit__nothing(session):
plot.plotinit()
assert session.scan_display.next_scan_displayed_channels is None
assert plot.get_next_plotted_counters() == []
def test_plotinit__one_shot(session):
plot.plotinit("aaa")
session.scan_display._pop_next_scan_displayed_channels()
assert plot.get_next_plotted_counters() == []
def test_plotselect(test_session_with_flint):
session = test_session_with_flint
ascan = session.env_dict["ascan"]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment