Commit 5df275d1 authored by Valentin Valls's avatar Valentin Valls
Browse files

Split infer_plot_models into sub functions

parent 56846164
......@@ -586,172 +586,204 @@ def read_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
return result
def infer_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
"""Infer description of plot models from a scan_info using
`acquisition_chain`."""
result: List[plot_model.Plot] = []
def _infer_default_curve_plot(
scan_info: Dict, have_scatter: bool
) -> Optional[plot_model.Plot]:
"""Create a curve plot by inferring the acquisition chain content.
channel_units = read_units(scan_info)
If there is a scatter as main plot, try to use a time counter as axis.
"""
plot = plot_item_model.CurvePlot()
default_plot = None
def get_unit(channel_name: str) -> Optional[str]:
return scan_info["channels"][channel_name].get("unit", None)
have_scalar = False
have_scatter = False
acquisition_chain = scan_info.get("acquisition_chain", None)
if len(acquisition_chain.keys()) == 1:
first_key = list(acquisition_chain.keys())[0]
if first_key == "GroupingMaster":
# Make sure groups does not generate anything plots
return []
for master_name in acquisition_chain.keys():
scalars = _get_channels(scan_info, master_name, dim=0, master=False)
if len(scalars) > 0:
have_scalar = True
if scan_info.get("data_dim", 1) == 2 or scan_info.get("dim", 1) == 2:
have_scatter = True
master_channels = _get_channels(scan_info, master_name, dim=0, master=True)
# Ct
if have_scatter:
# In case of scatter the curve plot have to plot the time in x
# Masters in y1 and the first value in y2
if scan_info.get("type", None) == "ct":
plot = plot_item_model.ScalarPlot()
result.append(plot)
have_scalar = False
have_scatter = False
for timer in scalars:
if timer in master_channels:
# skip the masters
continue
if get_unit(timer) != "s":
# skip non time base
continue
break
else:
timer = None
# Scalar plot
for scalar in scalars:
if scalar in master_channels:
# skip the masters
continue
if get_unit(scalar) == "s":
# skip the time base
continue
break
else:
scalar = None
if have_scalar:
plot = plot_item_model.CurvePlot()
if not have_scalar:
default_plot = plot
if timer is not None:
if scalar is not None:
item = plot_item_model.CurveItem(plot)
x_channel = plot_model.ChannelRef(plot, timer)
y_channel = plot_model.ChannelRef(plot, scalar)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setYAxis("left")
plot.addItem(item)
for master_name in acquisition_chain.keys():
scalars = _get_channels(scan_info, master_name, dim=0, master=False)
master_channels = _get_channels(scan_info, master_name, dim=0, master=True)
for channel_name in master_channels:
item = plot_item_model.CurveItem(plot)
x_channel = plot_model.ChannelRef(plot, timer)
y_channel = plot_model.ChannelRef(plot, channel_name)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setYAxis("right")
plot.addItem(item)
else:
# The plot will be empty
pass
else:
if len(master_channels) > 0 and master_channels[0].startswith("axis:"):
master_channel = master_channels[0]
master_channel_unit = get_unit(master_channel)
is_motor_scan = master_channel_unit != "s"
else:
is_motor_scan = False
if have_scatter:
# In case of scatter the curve plot have to plot the time in x
# Masters in y1 and the first value in y2
for channel_name in scalars:
if is_motor_scan and get_unit(channel_name) == "s":
# Do not display base time for motor based scan
continue
for timer in scalars:
if timer in master_channels:
# skip the masters
continue
if channel_units.get(timer, None) != "s":
# skip non time base
continue
break
else:
timer = None
item = plot_item_model.CurveItem(plot)
data_channel = plot_model.ChannelRef(plot, channel_name)
for scalar in scalars:
if scalar in master_channels:
# skip the masters
continue
if channel_units.get(scalar, None) == "s":
# skip the time base
continue
break
if len(master_channels) == 0:
master_channel = None
else:
scalar = None
if timer is not None:
if scalar is not None:
item = plot_item_model.CurveItem(plot)
x_channel = plot_model.ChannelRef(plot, timer)
y_channel = plot_model.ChannelRef(plot, scalar)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setYAxis("left")
plot.addItem(item)
for channel_name in master_channels:
item = plot_item_model.CurveItem(plot)
x_channel = plot_model.ChannelRef(plot, timer)
y_channel = plot_model.ChannelRef(plot, channel_name)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setYAxis("right")
plot.addItem(item)
else:
# The plot will be empty
pass
else:
if len(master_channels) > 0 and master_channels[0].startswith("axis:"):
master_channel = master_channels[0]
master_channel_unit = channel_units.get(master_channel, None)
is_motor_scan = master_channel_unit != "s"
else:
is_motor_scan = False
master_channel = plot_model.ChannelRef(plot, master_channels[0])
for channel_name in scalars:
channel_unit = channel_units.get(channel_name, None)
if is_motor_scan and channel_unit == "s":
# Do not display base time for motor based scan
continue
item.setXChannel(master_channel)
item.setYChannel(data_channel)
plot.addItem(item)
# Only display the first counter
break
return plot
item = plot_item_model.CurveItem(plot)
data_channel = plot_model.ChannelRef(plot, channel_name)
if len(master_channels) == 0:
master_channel = None
else:
master_channel = plot_model.ChannelRef(plot, master_channels[0])
def _infer_default_scatter_plot(scan_info: Dict) -> List[plot_model.Plot]:
"""Create a set of scatter plots according to the content of acquisition
chain"""
plots: List[plot_model.Plot] = []
item.setXChannel(master_channel)
item.setYChannel(data_channel)
plot.addItem(item)
# Only display the first counter
break
def get_unit(channel_name: str) -> Optional[str]:
return scan_info["channels"][channel_name].get("unit", None)
result.append(plot)
acquisition_chain = scan_info.get("acquisition_chain", None)
# Scatter plot
for master_name in acquisition_chain.keys():
plot = plot_item_model.ScatterPlot()
if have_scatter:
for master_name in acquisition_chain.keys():
plot = plot_item_model.ScatterPlot()
if default_plot is None:
default_plot = plot
scalars = _get_channels(scan_info, master_name, dim=0, master=False)
axes_channels = _get_channels(scan_info, master_name, dim=0, master=True)
scalars = _get_channels(scan_info, master_name, dim=0, master=False)
axes_channels = _get_channels(scan_info, master_name, dim=0, master=True)
# Reach the first scalar which is not a time unit
for scalar in scalars:
if scalar in axes_channels:
# skip the masters
continue
if get_unit(scalar) == "s":
# skip the time base
continue
break
else:
scalar = None
# Reach the first scalar which is not a time unit
for scalar in scalars:
if scalar in axes_channels:
# skip the masters
continue
if channel_units.get(scalar, None) == "s":
# skip the time base
continue
break
else:
scalar = None
if len(axes_channels) >= 1:
x_channel = plot_model.ChannelRef(plot, axes_channels[0])
else:
x_channel = None
if len(axes_channels) >= 1:
x_channel = plot_model.ChannelRef(plot, axes_channels[0])
else:
x_channel = None
if len(axes_channels) >= 2:
y_channel = plot_model.ChannelRef(plot, axes_channels[1])
else:
y_channel = None
if len(axes_channels) >= 2:
y_channel = plot_model.ChannelRef(plot, axes_channels[1])
else:
y_channel = None
if scalar is not None:
data_channel = plot_model.ChannelRef(plot, scalar)
else:
data_channel = None
if scalar is not None:
data_channel = plot_model.ChannelRef(plot, scalar)
else:
data_channel = None
item = plot_item_model.ScatterItem(plot)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setValueChannel(data_channel)
plot.addItem(item)
plots.append(plot)
item = plot_item_model.ScatterItem(plot)
item.setXChannel(x_channel)
item.setYChannel(y_channel)
item.setValueChannel(data_channel)
plot.addItem(item)
return plots
def infer_plot_models(scan_info: Dict) -> List[plot_model.Plot]:
"""Infer description of plot models from a scan_info using
`acquisition_chain`.
- Dedicated default plot is created for 0D channels according to the kind
of scan. It could be:
- ct plot
- curve plot
- scatter plot
- A dedicated image plot is created per lima detectors
- A dedicated MCA plot is created per mca detectors
- Remaining 2D channels are displayed as an image widget
- Remaining 1D channels are displayed as a 1D plot
"""
result: List[plot_model.Plot] = []
result.append(plot)
default_plot = None
acquisition_chain = scan_info.get("acquisition_chain", None)
if len(acquisition_chain.keys()) == 1:
first_key = list(acquisition_chain.keys())[0]
if first_key == "GroupingMaster":
# Make sure groups does not generate any plots
return []
# ct / curve / scatter
if scan_info.get("type", None) == "ct":
plot = plot_item_model.ScalarPlot()
result.append(plot)
else:
have_scalar = False
have_scatter = False
for master_name in acquisition_chain.keys():
scalars = _get_channels(scan_info, master_name, dim=0, master=False)
if len(scalars) > 0:
have_scalar = True
if scan_info.get("data_dim", 1) == 2 or scan_info.get("dim", 1) == 2:
have_scatter = True
if have_scalar:
plot = _infer_default_curve_plot(scan_info, have_scatter)
if plot is not None:
result.append(plot)
if not have_scalar:
default_plot = plot
if have_scatter:
plots = _infer_default_scatter_plot(scan_info)
if len(plots) > 0:
result.extend(plots)
if default_plot is None:
default_plot = plots[0]
# MCA plot
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment