...
 
Commits (40)
......@@ -278,14 +278,14 @@ class Session:
RuntimeWarning,
)
names_list.remove(name)
class_name = object_config.get("class", "")
if class_name.lower() == "session":
warnings.warn(
f"Session {self.name} 'config-objects' list contains session {name}, ignoring (hint: add session in 'include-sessions' list)",
RuntimeWarning,
)
names_list.remove(name)
else:
class_name = object_config.get("class", "")
if class_name.lower() == "session":
warnings.warn(
f"Session {self.name} 'config-objects' list contains session {name}, ignoring (hint: add session in 'include-sessions' list)",
RuntimeWarning,
)
names_list.remove(name)
for name in self.__exclude_objects_names:
try:
......
This diff is collapsed.
......@@ -17,12 +17,17 @@ from bliss.scanning.chain import ChainNode
class LimaBpmCounter(IntegratingCounter):
"""Lima BPM integrating counter."""
pass
def __init__(self, name, value_index, controller):
super().__init__(name, controller)
self.__value_index = value_index
@property
def value_index(self):
return self.__value_index
class BpmChainNode(ChainNode):
def _get_default_chain_parameters(self, scan_params, acq_params):
try:
count_time = acq_params["count_time"]
except:
......@@ -33,7 +38,6 @@ class BpmChainNode(ChainNode):
return params
def get_acquisition_object(self, acq_params, ctrl_params=None):
# --- Warn user if an unexpected is found in acq_params
expected_keys = ["count_time"]
for key in acq_params.keys():
......@@ -44,10 +48,7 @@ class BpmChainNode(ChainNode):
count_time = acq_params["count_time"]
return IntegratingCounterAcquisitionSlave(
self.controller,
*self.counters,
count_time=count_time,
ctrl_params=ctrl_params,
*self.counters, count_time=count_time, ctrl_params=ctrl_params
)
......@@ -57,61 +58,57 @@ class Bpm(IntegratingCounterController):
"bpm", master_controller=acquisition_proxy, chain_node_class=BpmChainNode
)
self._proxy = bpm_proxy
self._name2index = {
"acq_time": 0,
"intensity": 1,
"x": 2,
"y": 3,
"fwhm_x": 4,
"fwhm_y": 5,
}
self._counters.update(
{
"acq_time": self.acq_time,
"x": self.x,
"y": self.y,
"intensity": self.intensity,
"fwhm_x": self.fwhm_x,
"fwhm_y": self.fwhm_y,
"acq_time": LimaBpmCounter("acq_time", 0, self),
"intensity": LimaBpmCounter("intensity", 1, self),
"x": LimaBpmCounter("x", 2, self),
"y": LimaBpmCounter("y", 3, self),
"fwhm_x": LimaBpmCounter("fwhm_x", 4, self),
"fwhm_y": LimaBpmCounter("fwhm_y", 5, self),
}
)
def prepare(self, *counters):
self.start()
def start(self, *counters):
self._proxy.Start()
def stop(self, *counters):
self._proxy.Stop()
@property
def acq_time(self):
return LimaBpmCounter("acq_time", self)
return self._counters["acq_time"]
@property
def x(self):
return LimaBpmCounter("x", self)
return self._counters["x"]
@property
def y(self):
return LimaBpmCounter("y", self)
return self._counters["y"]
@property
def intensity(self):
return LimaBpmCounter("intensity", self)
return self._counters["intensity"]
@property
def fwhm_x(self):
return LimaBpmCounter("fwhm_x", self)
return self._counters["fwhm_x"]
@property
def fwhm_y(self):
return LimaBpmCounter("fwhm_y", self)
def prepare(self, *counters):
self._proxy.On()
def stop(self, *counters):
self._proxy.Off()
return self._counters["fwhm_y"]
def get_values(self, from_index, *counters):
result_size = self._proxy.ResultSize
# BPM data are : timestamp, intensity, center_x, center_y, fwhm_x, fwhm_y, frameno
result_size = 7
all_result = self._proxy.GetResults(from_index)
nb_result = len(all_result) / result_size
nb_result = len(all_result) // result_size
counter2index = [
(numpy.zeros((nb_result,)), self._name2index[cnt.name]) for cnt in counters
(numpy.zeros((nb_result,)), cnt.value_index) for cnt in counters
]
for i, raw in enumerate(grouped(all_result, result_size)):
......
......@@ -248,6 +248,11 @@ class Lima(CounterController):
def stopAcq(self):
self._proxy.stopAcq()
def stop_bpm_live(self):
self._proxy.video_live = False
self._proxy.stopAcq()
self.bpm.stop()
def _get_proxy(self, type_name="LimaCCDs"):
if type_name == "LimaCCDs":
device_name = self.__tg_url
......
This diff is collapsed.
......@@ -28,8 +28,6 @@ YAML_ configuration example:
- name: lifetime
attr_name: SR_Lifetime
TESTS:
pytest tests/controllers_sw/test_tango_attr_counters.py
......@@ -43,7 +41,7 @@ import weakref
from bliss.common.counter import SamplingCounter
from bliss.common import tango
from bliss import global_map
from bliss.common.logtools import *
from bliss.common.logtools import log_debug
from bliss.controllers.counter import SamplingCounterController
......@@ -179,8 +177,10 @@ class tango_attr_as_counter(SamplingCounter):
def convert_func(self, value):
attr_val = value * self.conversion_factor
# beurk: workaround to limit the decimals...
attr_val = int(attr_val * 10000) / 10000
# Trunc the result to avoid too much decimals.
# NB: Inoperant if sampling mode is not SINGLE :(
# TODO: support format defined in tango DS.
attr_val = float("%g" % attr_val)
return attr_val
......
......@@ -7,9 +7,9 @@
from bliss.controllers.temp import Controller
from bliss.common.temperature import Input, Output, Loop, lazy_init
from bliss.common.logtools import *
from bliss.common.logtools import log_info, log_debug
from bliss.common.utils import autocomplete_property
from bliss.common import session
from bliss import global_map
import os
import re
import sys
......@@ -253,7 +253,7 @@ class LakeshoreBase(Controller):
Controller.__init__(self, config, *args)
session.get_current().map.register(handler._comm, parents_list=[self, "comms"])
global_map.register(handler._comm, parents_list=[self, "comms"])
@property
def model(self):
......@@ -501,6 +501,9 @@ class LakeshoreBase(Controller):
self.__kp, self.__ki, self.__kd = self._lakeshore.pid(channel)
return self.__kd
def __info__(self):
return "\n".join(self._show())
def _show(self, name=None):
""" Display all main parameters and values for the temperature controller
Prints:
......
......@@ -114,6 +114,9 @@ class LakeShore336:
@property
def eol(self):
eol = self._comm._eol
if isinstance(eol, bytes):
return eol.decode()
return self._comm._eol
# Initialization methods
......
......@@ -90,7 +90,8 @@ def _get_node_object(node_type, name, parent, connection, create=False, **keys):
m,
lambda x: inspect.isclass(x)
and issubclass(x, DataNode)
and x not in (DataNode, DataNodeContainer),
and x not in (DataNode, DataNodeContainer)
and inspect.getmodule(x) == m,
)
# there should be only 1 class inheriting from DataNode in the plugin
klass = classes[0][-1]
......
......@@ -54,10 +54,18 @@ def data_from_bytes(data, shape=None, dtype=None):
try:
return pickle.loads(data)
except pickle.UnpicklingError:
if dtype is not None:
try:
t = numpy.dtype(dtype)
return dtype(numpy.array(data, dtype=t))
except TypeError:
pass
return float(data)
class ChannelDataNode(DataNode):
_NODE_TYPE = "channel"
def __init__(self, name, **keys):
shape = keys.pop("shape", None)
dtype = keys.pop("dtype", None)
......@@ -72,7 +80,7 @@ class ChannelDataNode(DataNode):
info["fullname"] = fullname
info["unit"] = unit
DataNode.__init__(self, "channel", name, info=info, **keys)
DataNode.__init__(self, self._NODE_TYPE, name, info=info, **keys)
self._queue = None
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.data.nodes.channel import ChannelDataNode
from bliss.data.node import get_node, get_nodes
import numpy
class NodeRefChannel(ChannelDataNode):
"""
A data node that stores references to other DataNodes. It is intened to be used to
keep references e.g. of individual scans to group them together as one group or sequence
of scans.
"""
_NODE_TYPE = "node_ref_channel"
def get(self, from_index, to_index=None):
"""
Return the data nodes according to the references stored in this channel
**from_index** from which image index you want to get
**to_index** to which index you want images
if to_index is None => only one image which as index from_index
if to_index < 0 => to the end of acquisition
"""
if to_index is None:
return get_node(ChannelDataNode.get(self, from_index))
else:
return get_nodes(*ChannelDataNode.get(self, from_index, to_index))
def store(self, event_dict):
self._create_queue()
data = event_dict.get("data")
self.info.update(event_dict["description"])
shape = event_dict["description"]["shape"]
if type(data) is numpy.ndarray:
if len(shape) == data.ndim:
self._queue.append(data)
else:
self._queue.extend(data)
elif type(data) is list or type(data) is tuple:
self._queue.extend(data)
else:
self._queue.append(data)
......@@ -46,8 +46,10 @@ def pickle_dump(var):
class Scan(DataNodeContainer):
_NODE_TYPE = "scan"
def __init__(self, name, create=False, **keys):
DataNodeContainer.__init__(self, "scan", name, create=create, **keys)
DataNodeContainer.__init__(self, self._NODE_TYPE, name, create=create, **keys)
self._info._write_type_conversion = pickle_dump
if self.new_node:
with settings.pipeline(self._struct, self._info) as p:
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.data.nodes.scan import Scan
from bliss.data.node import DataNodeContainer
class GroupScan(Scan):
_NODE_TYPE = "scan_group"
......@@ -35,7 +35,6 @@ class CT2AcquisitionMaster(AcquisitionMaster):
start_once=True,
ctrl_params=None,
):
name = type(device).__name__
self._connected = False
self.acq_expo_time = acq_expo_time
self.acq_mode = acq_mode
......@@ -58,7 +57,7 @@ class CT2AcquisitionMaster(AcquisitionMaster):
trigger_type=trigger_type,
ctrl_params=ctrl_params,
)
super().__init__(device, name=name, **kwargs)
super().__init__(device, **kwargs)
def add_counter(self, counter):
pass
......
......@@ -198,6 +198,9 @@ class LimaAcquisitionMaster(AcquisitionMaster):
setattr(self.device.proxy, param_name, param_value)
self.wait_slaves_prepare()
if self.device.proxy.video_live is True:
self._lima_controller.stop_bpm_live()
self.device.proxy.video_active = True
self._lima_controller.prepareAcq()
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from gevent.queue import Queue
import gevent
from contextlib import contextmanager
import numpy
from bliss.scanning.chain import (
AcquisitionMaster,
AcquisitionSlave,
AcquisitionChannel,
AcquisitionChain,
)
from bliss.scanning.scan import Scan as Scanning_Scan
from bliss.data.nodes.scan import Scan as Data_Scan
from bliss.data.node import get_session_node
from bliss import get_current_session
from bliss.scanning.scan import ScanState
from bliss.data.node import _create_node
class ScanGroup(Scanning_Scan):
def _create_data_node(self, node_name):
self._Scan__node = _create_node(
node_name, "scan_group", parent=self.root_node, info=self._scan_info
)
class Sequence:
""" should have a scan as internal property that runs
in a spawned mode in the background. Each new scan
should publish itself (trigger a master inside the scan)
there should be a possibiltiy of calc channels.
progressbar for sequence??
"""
def __init__(self, scan_info=None, title="sequence_of_scans"):
self.title = title
self.scan_info = scan_info
self.custom_channels = dict()
self._scans = list() # scan objects or scan nodes
self._waiting_scans = list()
def add_custom_channel(self, acq_channel):
assert isinstance(acq_channel, AcquisitionChannel)
self.custom_channels[acq_channel.name] = acq_channel
def wait_all_subscans(self, timeout=0):
if timeout is not None:
with gevent.timeout.Timeout(timeout):
gevent.joinall(self._waiting_scans)
else:
gevent.joinall(self._waiting_scans)
class SequenceContext:
def __init__(self, sequence):
self.sequence = sequence
def _wait_before_adding_scan(self, scan):
scan.wait_state(ScanState.STARTING)
self.sequence.group_acq_master.new_subscan(scan)
def _add_via_node(self, scan):
assert isinstance(scan, Data_Scan)
self.sequence._scans.append(scan)
self.sequence.group_acq_master.new_subscan(scan)
def add(self, scan):
assert isinstance(scan, Scanning_Scan)
self.sequence._scans.append(scan)
if scan.state >= ScanState.STARTING:
# scan is running / has been running already
self.sequence.group_acq_master.new_subscan(scan)
else:
self.sequence._waiting_scans.append(
gevent.spawn(self._wait_before_adding_scan, scan)
)
def add_and_run(self, scan):
assert isinstance(scan, Scanning_Scan)
if scan.state != 0:
raise RuntimeError(
f'Error in add_and_run: scan "{scan.name}" has already been started before!'
)
self.add(scan)
g = gevent.spawn(scan.run)
g.join()
def wait_all_subscans(self, timeout=None):
self.sequence.wait_all_subscans(timeout)
@contextmanager
def sequence_context(self):
self._build_scan()
group_scan = gevent.spawn(self.scan.run)
try:
with gevent.timeout.Timeout(3):
self.scan.wait_state(ScanState.STARTING)
yield self.SequenceContext(self)
finally:
self.group_acq_master.queue.put(StopIteration)
err = False
try:
self.wait_all_subscans(0)
except gevent.Timeout:
gevent.killall(self._waiting_scans)
err = True
if self.scan.state == ScanState.STARTING:
self.scan._Scan__state = ScanState.DONE
if len(self._scans) > 0:
# waiting for the last point to be published before killing the scan greenlet
while self.group_acq_master.queue.qsize() > 0:
self.group_acq_master.publish_event.wait()
gevent.sleep(0)
self.group_acq_master.publish_event.wait()
if self.scan.state == ScanState.STARTING:
self.scan._Scan__state = ScanState.DONE
self.scan.writer._remove_callbacks()
self.scan.disconnect_all()
group_scan.kill()
if err:
raise RuntimeError(
f'Some scans of the sequence "{self.title}" have not been excecuted! \n The dataset will be incomplete!'
)
def _build_scan(self):
self.group_acq_master = GroupingMaster()
chain = AcquisitionChain()
chain.add(self.group_acq_master)
if len(self.custom_channels) > 0:
self.group_custom_slave = GroupingSlave(
"custom_channels", self.custom_channels.values()
)
chain.add(self.group_acq_master, self.group_custom_slave)
self.scan = ScanGroup(chain, self.title, save=True, scan_info=self.scan_info)
@property
def node(self):
return self.scan.node
class Group(Sequence):
def __init__(self, *scans, title="group_of_scans", scan_info=None):
Sequence.__init__(self, title=title, scan_info=scan_info)
with self.sequence_context() as seq_context:
for s in scans:
if isinstance(s, Data_Scan):
if s.node_type not in ["scan", "scan_group"]:
raise RuntimeError(f"Only scans can be added to group!")
scan = s
elif isinstance(s, Scanning_Scan):
if s.state < ScanState.STARTING:
raise RuntimeError(
f"Only scans that have been run before can be added to group!"
)
scan = s.node
elif type(s) == int:
node_found = False
for node in get_session_node(
get_current_session().name
).iterator.walk(filter="scan", wait=False):
if node.info["scan_nb"] == s:
scan = node
node_found = True
break
if not node_found:
raise RuntimeError(f"Scan {s} not found!")
else:
raise RuntimeError(
f"Invalid argument: no scan node found that corresponds to {s}!"
)
seq_context._add_via_node(scan)
class GroupingMaster(AcquisitionMaster):
def __init__(self):
AcquisitionMaster.__init__(
self,
None,
name="GroupingMaster",
npoints=0,
prepare_once=True,
start_once=True,
)
self.queue = Queue()
self._node_channel = AcquisitionChannel(
f"scans", numpy.str, (), reference=True, data_node_type="node_ref_channel"
)
self.channels.append(self._node_channel)
self._number_channel = AcquisitionChannel(f"scan_numbers", numpy.int, ())
self.channels.append(self._number_channel)
self.publish_event = gevent.event.Event()
self.publish_event.set()
def prepare(self):
pass
def __iter__(self):
yield self
for scan in self.queue:
self._new_subscan(scan)
yield self
def _new_subscan(self, scan):
self.publish_event.clear()
if isinstance(scan, Scanning_Scan):
scan = scan.node
self._number_channel.emit(int(scan.info["scan_nb"]))
self._node_channel.emit(scan.db_name)
# handling of ttl of subscan
if scan.connection.ttl(scan.db_name) > 0:
for n in scan.iterator.walk(wait=False):
if n.connection.ttl(n.db_name) > 0:
n.set_ttl()
self.publish_event.set()
def new_subscan(self, scan):
self.queue.put(scan)
def start(self):
pass
def stop(self):
pass
class GroupingSlave(
AcquisitionSlave
): # one instance of this for channels published `on the fly` and one that is called after the scan?
def __init__(self, name, channels):
AcquisitionSlave.__init__(self, None, name=name)
for channel in channels:
self.channels.append(channel)
def prepare(self):
pass
def start(self):
pass
def trigger(self):
pass
def stop(self):
pass
......@@ -14,6 +14,7 @@ import sys
import time
import datetime
import uuid
import collections
from functools import wraps
from bliss import setup_globals, current_session, is_bliss_shell
......@@ -344,10 +345,6 @@ class ScanSaving(ParametersWardrobe):
The *parent* node should be use as parameters for the Scan.
"""
keys = dict()
_change_to_obj_marshalling(keys)
# default and not removable values
_default_values = {
"base_path": "/tmp/scans",
......@@ -376,7 +373,6 @@ class ScanSaving(ParametersWardrobe):
default_values=_default_values,
property_attributes=_property_attributes,
not_removable=_default_values.keys(),
**keys,
)
def __dir__(self):
......@@ -534,9 +530,6 @@ class ScanDisplay(ParametersWardrobe):
"""
This class represents the display parameters for scans for a session.
"""
keys = dict()
_change_to_obj_marshalling(keys)
if session_name is None:
session_name = current_session.name
......@@ -550,7 +543,6 @@ class ScanDisplay(ParametersWardrobe):
},
property_attributes=("session", "counters", "extra_args"),
not_removable=("auto", "motor_position"),
**keys,
)
self.add("_session_name", session_name)
......@@ -580,7 +572,7 @@ class ScanDisplay(ParametersWardrobe):
# FIXME: It could warn to restart flint in case it is already loaded
if not isinstance(extra_args, (list, tuple)):
raise TypeError(
"SCADISPLAY_SCAN.extra_args expects a list or a tuple of strings"
"SCAN_DISPLAY.extra_args expects a list or a tuple of strings"
)
# Do not load it while it is not needed
......@@ -630,10 +622,14 @@ def _get_channels_dict(acq_object, channels_dict):
fullname = acq_chan.fullname
if fullname in display_names:
continue
chan_name = acq_chan.short_name
if chan_name in display_names.values():
chan_name = fullname
display_names[fullname] = chan_name
try:
_, controller_chan_name, chan_name = fullname.split(":")
except ValueError:
controller_chan_name, _, chan_name = fullname.rpartition(":")
display_names[fullname] = (
controller_chan_name,
acq_chan.short_name,
) # use .name to get alias, if any
scalars_units[fullname] = acq_chan.unit
shape = acq_chan.shape
if len(shape) == 0 and fullname not in scalars:
......@@ -650,7 +646,8 @@ def _get_masters_and_channels(acq_chain):
# go through acq chain, group acq channels by master and data shape
tree = acq_chain._tree
chain_dict = dict()
chain_dict = {}
display_names_list = []
for path in tree.paths_to_leaves():
master = None
# path[0] is root
......@@ -661,8 +658,37 @@ def _get_masters_and_channels(acq_chain):
master = acq_object.name
channels = chain_dict.setdefault(master, {"master": {}})
_get_channels_dict(acq_object, channels["master"])
display_names_list.append(channels["master"]["display_names"])
continue
_get_channels_dict(acq_object, channels)
display_names_list.append(channels["display_names"])
# find channel display labels
names_count = collections.Counter()
# eliminate duplicated display_names dict in list
display_names_list = [
d
for i, d in enumerate(display_names_list)
if d not in display_names_list[i + 1 :]
]
for display_names in display_names_list:
for controller_chan_name, chan_name in display_names.values():
if controller_chan_name == chan_name:
# weird case, but it can happen
names_count.update([chan_name])
else:
names_count.update([controller_chan_name, chan_name])
for display_names in display_names_list:
for fullname, (controller_chan_name, chan_name) in display_names.items():
if names_count[chan_name] == 1:
# unique short name
display_names[fullname] = chan_name
else:
if names_count[controller_chan_name] == 1:
display_names[fullname] = controller_chan_name
else:
display_names[fullname] = fullname
return chain_dict
......@@ -738,7 +764,7 @@ class Scan:
self._scan_info = dict(scan_info) if scan_info is not None else dict()
if scan_saving is None:
scan_saving = current_session.scan_saving
scan_saving = ScanSaving(current_session.name)
session_name = scan_saving.session
user_name = scan_saving.user_name
self.__scan_saving = scan_saving
......@@ -785,11 +811,15 @@ class Scan:
get_flint()
self.__state = ScanState.IDLE
self.__state_change = gevent.event.Event()
self._preset_list = list()
self.__node = None
def _create_data_node(self, node_name):
self.__node = _create_node(
node_name, "scan", parent=self.root_node, info=self._scan_info
)
def _prepare_node(self):
if self.__node is None:
### order is important in the next lines...
......@@ -814,9 +844,7 @@ class Scan:
self._scan_info["start_timestamp"] = start_timestamp
node_name = str(self.__scan_number) + "_" + self.name
self.__node = _create_node(
node_name, "scan", parent=self.root_node, info=self._scan_info
)
self._create_data_node(node_name)
def __repr__(self):
return "Scan(number={}, name={}, path={})".format(
......@@ -1010,6 +1038,11 @@ class Scan:
RuntimeError("Can't find axis in this scan")
return axis
def wait_state(self, state):
while self.__state < state:
self.__state_change.clear()
self.__state_change.wait()
def __trigger_data_watch_callback(self, signal, sender, sync=False):
if self._data_watch_callback is not None:
event_set = self._data_events.setdefault(sender, set())
......@@ -1138,6 +1171,7 @@ class Scan:
current_iters = [next(i) for i in self.acq_chain.get_iter_list()]
self.__state = ScanState.PREPARING
self.__state_change.set()
with periodic_exec(0.1 if call_on_prepare else 0, set_watch_event):
self._execute_preset("prepare")
self.prepare(self.scan_info, self.acq_chain._tree)
......@@ -1150,6 +1184,7 @@ class Scan:
gevent.killall(prepare_tasks)
self.__state = ScanState.STARTING
self.__state_change.set()
self._execute_preset("start")
run_next_tasks = [
(gevent.spawn(self._run_next, i), i) for i in current_iters
......@@ -1193,6 +1228,7 @@ class Scan:
)
self.__state = ScanState.STOPPING
self.__state_change.set()
with periodic_exec(0.1 if call_on_stop else 0, set_watch_event):
stop_task = [
......@@ -1237,6 +1273,7 @@ class Scan:
self.disconnect_all()
self.__state = ScanState.DONE
self.__state_change.set()
# Add scan to the globals
current_session.scans.append(self)
......
......@@ -103,18 +103,17 @@ def _get_counters_from_object(arg, recursive=True):
counters = list(arg.counters)
except AttributeError:
pass
if counters:
# replace counters with their aliased counterpart, if any
for i, cnt in enumerate(counters):
alias = global_map.aliases.get_alias(cnt)
if alias:
counters[i] = global_map.aliases.get(alias)
return counters
else:
if not counters:
try:
return list(arg)
counters = list(arg)
except TypeError:
return [arg]
counters = [arg]
# replace counters with their aliased counterpart, if any
for i, cnt in enumerate(counters):
alias = global_map.aliases.get_alias(cnt)
if alias:
counters[i] = global_map.aliases.get(alias)
return counters
def get_all_counters(counter_args):
......
......@@ -109,11 +109,11 @@ class Writer(FileWriter):
scan_name = scan.node.name
scan_info = scan.scan_info
### fill image references ###
### fill image references and groups ###
for fname, channel in scan.get_channels_dict.items():
chan_name = channel.name
if channel.reference:
if channel.reference and channel.data_node_type == "lima":
"""produce a string version of a lima reference that can be saved in hdf5
At the moment there is only Lima references ;
......@@ -155,6 +155,37 @@ class Writer(FileWriter):
dataset.attrs.modify("fullname", channel.fullname)
dataset[:] = data
elif channel.reference and channel.data_node_type == "node_ref_channel":
self.file.create_group(f"{scan_name}/scans")
self.file[f"{scan_name}/scans"].attrs["NX_class"] = "NXcollection"
for subscan in channel.data_node.get(0, -1):
subscan_names = [subscan.name]
# handling multiple top master
if len(subscan.info["acquisition_chain"]) > 1:
for i in range(1, len(subscan.info["acquisition_chain"])):
### logic taken from
### has to stay in sync!!
subsubscan_number, subsubscan_name = subscan.name.split(
"_", maxsplit=1
)
subsubscan_name = (
f"{subsubscan_number}{'.%d_' % i}{subsubscan_name}"
)
subscan_names.append(subsubscan_name)
for subscan_name in subscan_names:
if subscan_name in self.file.keys():
self.file[f"{scan_name}/scans/{subscan_name}"] = self.file[
f"{subscan_name}"
]
else:
# of cause we have to think better what to do in this case...
# e.g. external link?
print(
"ERROR: trying to link to a scan that is not saved in the current file!"
)
#### use scan_meta to fill fields ####
hdf5_scan_meta = {
cat_name: scan_info.get(cat_name, {}) for cat_name in categories_names()
......
......@@ -39,15 +39,24 @@ def is_property(text, repl):
)
if not m:
return False
result = isinstance(
eval(
f"type({m['instance_name']}).{m['attr_name']}",
repl.get_globals(),
repl.get_locals(),
),
property,
)
return result
if (
m["instance_name"] not in repl.get_globals()
and m["instance_name"] not in repl.get_locals()
):
return False
try:
result = isinstance(
eval(
f"type({m['instance_name']}).{m['attr_name']}",
repl.get_globals(),
repl.get_locals(),
),
property,
)
except Exception:
return False
else:
return result
class TypingHelper(object):
......
# Groups of scans and sequences
Bliss provides the capability to bundle several scans together into a group or a sequence.
## Groups
Groups can be used to bundle scans **after** they have run.
!!! example
```python
BLISS [3]: from bliss.scanning.group import Group
...: diode=config.get('diode')
...: s1=loopscan(3,.1,diode)
...: s2=loopscan(3,.2,diode)
...: Group(s1,s2)
```
The `Group` command takes as keyword arguments `title` and `scan_info`. Scans can be added to a group via their Scan object (`bliss.scanning.scan.Scan`, like in the example above), via the corresponding Scan data node (`bliss.data.scan.Scan`) or simply via their scan number. Please note that the scan has still to be in memory (redis) in order for this mechanism to work.
## Sequences
Sequences can be used to bundle scans no matter if they have alreay been exceuted before or if they will be running during the sequence. The only condition: all scans of the sequence musst terminate before leaving the context manager provied by the sequence. The `Sequences` object takes as keyword arguments `title` and `scan_info`.
!!! example
```python
TEST_SESSION [4]: from bliss.scanning.group import Sequence
...: seq=Sequence()
...: with seq.sequence_context() as scan_seq:
...: s1=loopscan(5,.1,diode,run=False)
...: scan_seq.add(s1)
...: s1.run()
...:
...: #do something here ... move motors, open/close shutter
...:
...: s2=loopscan(3,.2,diode,run=False)
...: scan_seq.add_and_run(s2)
...:
...: #do more things
...:
...: s3=loopscan(3,.2,diode)
...: scan_seq.add(s3)
```
In the example above we prepare scans (s1 and s2 ... note the `run=False` keyword argument) and add them to the sequence by calling `add` or `add_and_run`. It is also possible to add a scan that has alreay terminated (s3). However, when thinking about online data analysis it is advisable to first add the scans to the sequencs and run them afterwards (like s1 and s2) as this enables external processes to follow the sequence in real time.
### Channels attached to sequences
It is possible to publish additional channels that are not part of any of the scans inside the sequence. These channels could e.g. be the result of a calculation based on the data acquired during the sequence.
!!! example
```python
TEST_SESSION [12]: from bliss.scanning.group import Sequence
...: from bliss.scanning.chain import AcquisitionChannel
...: import numpy
...: seq=Sequence()
...: seq.add_custom_channel(AcquisitionChannel('mychannel',numpy.float,()))
...: seq.add_custom_channel(AcquisitionChannel('sum',numpy.float,()))
...: with seq.sequence_context() as scan_seq:
...: s1=loopscan(5,.1,diode,run=False)
...: s2=loopscan(5,.1,diode,run=False)
...: scan_seq.add_and_run(s1)
...: scan_seq.add_and_run(s2)
...:
...: seq.custom_channels['mychannel'].emit([1.1,2.2,3.3])
...:
...: my_sum = s1.get_data()[diode] + s2.get_data()[diode]
...: seq.custom_channels['sum'].emit(my_sum)
```
## Listening in redis to scan groups
The following example is listening to groups and sequences in redis:
!!! example
```python
TEST_SESSION [7]: from bliss import current_session
TEST_SESSION [8]: from bliss.data.node import get_session_node
TEST_SESSION [9]: for node in get_session_node(current_session.name).iterator.walk(filter='scan_group',wait=False):
...: print(node.db_name,node.info["scan_nb"])
test_session:tmp:scans:55_sequence_of_scans 55
```
If one wants to listen to scans and groups of scans at the same time this is possible by changing the filter:
!!! example
```python
TEST_SESSION [10]: for node in get_session_node(current_session.name).iterator.walk(filter=['scan_group','scan'],wait=False):
...: print(node.db_name,node.info["scan_nb"])
test_session:tmp:scans:53_loopscan 53
test_session:tmp:scans:54_loopscan 54
test_session:tmp:scans:55_sequence_of_scans 55
test_session:tmp:scans:56_loopscan 56
test_session:tmp:scans:57_loopscan 57
```
For online data analysis it is possible to be notified when a new scan is added to a sequence:
!!! example
```python
TEST_SESSION [28]: for (event,node) in seq.node.iterator.walk_events(filter='node_ref_channel'):
...: print(event, node.db_name)
...: if event.name == "NEW_DATA_IN_CHANNEL":
...: print("\t" , node.get(0,-1))
event.NEW_NODE test_session:tmp:scans:310_sequence_of_scans:GroupingMaster:scans
event.NEW_DATA_IN_CHANNEL test_session:tmp:scans:310_sequence_of_scans:GroupingMaster:scans
[<bliss.data.scan.Scan object at 0x7f527a7b2710>, <bliss.data.scan.Scan object at 0x7f527a7b22d0>]
```
......@@ -112,6 +112,7 @@ nav:
- Watchdog: scan_engine_watchdog.md
- Data node: scan_data_node.md
- Continuous scans: scan_continuous.md
- Grouping scans: scan_group.md
- Instruments:
- ESRF BeamViewer (EBV): using_beamviewer.md
- MCA:
......
......@@ -50,6 +50,7 @@ class HDF5_Writer(object):
# all channels that are involved in this scan will be added here
self.channels = list()
self.lima_channels = list()
self.group_channels = list()
# here we will track for each channel how far the data has been written yet
self.channel_indices = dict()
......@@ -115,6 +116,10 @@ class HDF5_Writer(object):
elif event_type.name == "NEW_NODE" and node.type == "lima":
self.lima_channels.append(node)
# dealing with node_ref_channel
elif event_type.name == "NEW_NODE" and node.type == "node_ref_channel":
self.group_channels.append(node)
# adding data to channel dataset
elif event_type.name == "NEW_DATA_IN_CHANNEL" and node.type == "channel":
print(self.scan_node.db_name, "add data", node.name)
......@@ -126,8 +131,19 @@ class HDF5_Writer(object):
# in this demo we restrict ourselves to treating the lima data at the end of the scan
pass
# dealing with node_ref_channel
elif (
event_type.name == "NEW_DATA_IN_CHANNEL"
and node.type == "node_ref_channel"
):
# could be done in real time during run time of the scan as done for channels
# in this demo we restrict ourselves to treating all reference in the end
pass
# creating a new entry in the hdf5 for each 'top-master'
elif event_type.name == "NEW_NODE" and node.parent.type == "scan":
elif event_type.name == "NEW_NODE" and (
node.parent.type == "scan" or node.parent.type == "scan_group"
):
print(self.scan_node.db_name, "add subscan", node.name)
# add a new subscan to this scan (this is to deal with "multiple top master" scans)
# and the fact that the hdf5 three does not reflect the redis tree in this case
......@@ -212,6 +228,37 @@ class HDF5_Writer(object):
dataset[:] = data
def update_node_ref_channel(self, node):
"""insert subscans"""
self.file.create_group(f"{self.scan_name}/scans")
self.file[f"{self.scan_name}/scans"].attrs["NX_class"] = "NXcollection"
for subscan in node.get(0, -1):
subscan_names = [subscan.name]
# handling multiple top master
if len(subscan.info["acquisition_chain"]) > 1:
for i in range(1, len(subscan.info["acquisition_chain"])):
subsubscan_number, subsubscan_name = subscan.name.split(
"_", maxsplit=1
)
subsubscan_name = (
f"{subsubscan_number}{'.%d_' % i}{subsubscan_name}"
)
subscan_names.append(subsubscan_name)
for subscan_name in subscan_names:
if subscan_name in self.file.keys():
self.file[f"{self.scan_name}/scans/{subscan_name}"] = self.file[
f"{subscan_name}"
]
else:
# of cause we have to think better what to do in this case...
# e.g. external link?
print(
"ERROR: trying to link to a scan that is not saved in the current file!"
)
def lima_ref_array(self, node):
""" used to produce a string version of a lima reference that can be saved in hdf5
"""
......@@ -255,6 +302,9 @@ class HDF5_Writer(object):
for c in self.lima_channels:
self.update_lima_data(c)
for c in self.group_channels:
self.update_node_ref_channel(c)
# instrument entry
instrument = self.file.create_group(f"{self.scan_name}/instrument")
instrument.attrs["NX_class"] = "NXinstrument"
......@@ -304,7 +354,9 @@ def listen_scans_of_session(session, scan_stack=dict()):
# wait for new events on scan
print("Listening to", session)
for event_type, node in session_node.iterator.walk_on_new_events(filter="scan"):
for event_type, node in session_node.iterator.walk_on_new_events(
filter=["scan", "scan_group"]
):
if event_type.name == "NEW_NODE":
exit_read, exit_write = os.pipe()
# we use this pipe to be able to catch the NEW_NODE of scan and
......
......@@ -22,7 +22,7 @@ def test_alias_data_channel(alias_session):
0.001,
env_dict["simu1"].counters.spectrum_det0,
env_dict["dtime"],
env_dict["lima_simulator"].counters.r1_sum,
env_dict["lima_simulator"].counters.r1_sum, # has an alias 'myroi'
env_dict["lima_simulator"].counters.r2_sum,
env_dict["myroi3"],
save=True,
......@@ -33,7 +33,7 @@ def test_alias_data_channel(alias_session):
{a2scan}:axis:robzz robzz
{a2scan}:axis:timer:elapsed_time elapsed_time
{a2scan}:axis:timer:epoch epoch
{a2scan}:axis:timer:lima_simulator:roi_counters:r1_sum r1_sum
{a2scan}:axis:timer:lima_simulator:roi_counters:myroi myroi
{a2scan}:axis:timer:lima_simulator:roi_counters:r2_sum r2_sum
{a2scan}:axis:timer:lima_simulator:roi_counters:myroi3 myroi3
{a2scan}:axis:timer:simu1:dtime dtime
......
......@@ -154,6 +154,28 @@ def test_orderedhash_settings(beacon):
assert tuple(ohs.keys()) == tuple(k for k, v in data)
def test_orderedhas_settings_resize(beacon):
data = tuple(
(str(n), v) for n, v in enumerate((ch for ch in "abcdefghilmnopqrstuvz"))
)
ohs = settings.OrderedHashSetting("resizeordhashset")
for k, v in data:
ohs[k] = v
cnx = ohs._cnx()
# popping will remove from the zset the element with lower score
# in this case is attr='1' score=1
attr, order = cnx.zpopmin(ohs._name_order)[0]
assert attr, order == ("1", 1)
ohs[100] = "bla" # assigning a value will call lua script and trigger the resize
attr, order = cnx.zpopmin(ohs._name_order)[0]
assert attr, order == ("2", 1) # the order is still 1 because is resized
attr, order = cnx.zpopmin(ohs._name_order)[0]
assert attr, order == ("3", 2)
ohs[100] = "bla" # doing it again
attr, order = cnx.zpopmin(ohs._name_order)[0]
assert attr, order == ("4", 1)
def test_orderedhash_settings_remove(beacon):
removeme = settings.OrderedHashSetting("removeme")
removeme["a"] = "a"
......
......@@ -12,8 +12,7 @@ from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss.common.tango import DeviceProxy
from bliss.common.counter import Counter
from bliss.controllers.lima.roi import Roi
from bliss.common.scans import loopscan, DEFAULT_CHAIN
from bliss import setup_globals
from bliss.common.scans import loopscan, timescan, ct, DEFAULT_CHAIN
import gevent
......@@ -40,12 +39,18 @@ def test_lima_simulator(beacon, lima_simulator):
assert simulator.camera.test == "test"
def test_lima_sim_bpm(beacon, lima_simulator):
def test_lima_sim_bpm(beacon, default_session, lima_simulator):
simulator = beacon.get("lima_simulator")
assert "fwhm_x" in simulator.counters._fields
assert "bpm" in simulator.counter_groups._fields
s = loopscan(1, 0.1, simulator.counter_groups.bpm, save=False)
data = s.get_data()
assert f"{simulator.name}:bpm:x" in s.get_data()
assert len(data) == 6 + 2 # 6 bpm counters + 2 timer
def assert_lima_rois(lima_roi_counter, rois):
roi_names = lima_roi_counter.getNames()
......@@ -146,10 +151,10 @@ def test_lima_mapping_and_saving(session, lima_simulator):
try:
simulator.select_directories_mapping("fancy")
mapped_directory = simulator.get_mapped_path(scan_saving.get_path())
ct = setup_globals.ct(0.1, simulator, save=True, run=False)
ct_scan = ct(0.1, simulator, save=True, run=False)
try:
ct.run()
ct_scan.run()
except Exception as e:
# this will fail because directory is not likely to exist
saving_directory = e.args[0].desc.split("Directory :")[-1].split()[0]
......@@ -183,7 +188,7 @@ def test_images_dir_prefix_saving(lima_simulator, scan_tmpdir, session):
scan_saving.images_prefix,
)
setup_globals.loopscan(1, 0.1, simulator)
loopscan(1, 0.1, simulator)
assert os.path.isdir(scan_config["root_path"])
assert os.path.isdir(os.path.join(scan_config["root_path"], "loopscan_1/toto"))
......@@ -219,7 +224,7 @@ def test_images_dir_prefix_saving_absolute(lima_simulator, scan_tmpdir, session)
"{scan_name}_{scan_number}/toto/{img_acq_device}",
)
setup_globals.timescan(0.1, simulator, npoints=1)
timescan(0.1, simulator, npoints=1)
assert os.path.isdir(scan_config["root_path"])
assert os.path.isdir(os.path.join(scan_config["root_path"], "timescan_1/toto"))
......@@ -249,7 +254,7 @@ def test_images_dir_saving_null_writer(lima_simulator, scan_tmpdir, session):
try:
scan_config = scan_saving.get()
setup_globals.timescan(0.1, simulator, npoints=1)
timescan(0.1, simulator, npoints=1)
assert os.path.exists(
os.path.join(
......@@ -271,7 +276,7 @@ def test_dir_no_saving(lima_simulator, scan_tmpdir, session):
try:
scan_config = scan_saving.get()
setup_globals.timescan(0.1, simulator, npoints=1, save=False)
timescan(0.1, simulator, npoints=1, save=False)
assert not os.path.exists(os.path.join(scan_config["root_path"]))
finally:
......
......@@ -20,6 +20,8 @@ from bliss.scanning.acquisition import timer
from bliss.scanning.chain import AcquisitionChain, AcquisitionMaster
from bliss.scanning.channel import AcquisitionChannel
from bliss.scanning.scan import Scan
from bliss.scanning.group import Group
from scripts.external_saving_example.external_saving_example import (
listen_scans_of_session
......@@ -160,6 +162,11 @@ def test_external_hdf5_writer(
c_samp = SoftCounter(a, "read", name="test-samp", mode=SamplingMode.SAMPLES)
scan5_b = scans.ascan(ax, 1, 9, 9, .1, c_samp)
# a group entry
s1 = scans.loopscan(3, .1, diode_sim)
s2 = scans.loopscan(3, .05, diode_sim)
g = Group(s1, s2)
##wait for all scan entries
external_writer_file = s1.scan_info["filename"].replace(".", "_external.")
bliss_writer_file = s1.scan_info["filename"]
......@@ -217,3 +224,8 @@ def test_external_hdf5_writer(
external_writer = h5todict(external_writer_file)["6_ascan"]
bliss_writer = h5todict(bliss_writer_file)["6_ascan"]
deep_compare(external_writer, bliss_writer)
# check group
external_writer = h5todict(external_writer_file)[g.node.name]
assert "scans" in external_writer
assert len(external_writer["scans"]) == 2
import gevent
from bliss.common import scans
def test_async_demo_default(default_session, scan_tmpdir):
# put scan file in a tmp directory
default_session.scan_saving.base_path = str(scan_tmpdir)
diode = default_session.config.get("diode")
sim_ct_gauss = default_session.config.get("sim_ct_gauss")
robz = default_session.config.get("robz")
s1 = scans.loopscan(20, .1, diode, run=False)
s2 = scans.ascan(robz, 0, 1, 20, .1, sim_ct_gauss,