Commit 7cb4b1cb authored by Cyril Guilloud's avatar Cyril Guilloud

Merge branch 'beacon_object_event_callback' into 'master'

Beacon object event callback

See merge request !1409
parents 8c3d8160 374e51cd
Pipeline #12785 failed with stages
......@@ -9,6 +9,7 @@ import sys
from gevent import GreenletExit
from louie import dispatcher
from louie.dispatcher import get_receivers
from louie import robustapply
from louie import saferef
......
......@@ -5,10 +5,12 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import sys
import yaml
from functools import wraps
from bliss.config.settings import HashObjSetting, pipeline
from bliss.config.channels import Cache
from bliss.config.channels import Cache, EventChannel
from bliss.common import event
from bliss.common.utils import Null, autocomplete_property
from bliss.config.conductor.client import remote_open
......@@ -96,7 +98,9 @@ class BeaconObject:
raise RuntimeError(
f"parameter {fset.__name__} is read only."
)
fset(self, value)
rvalue = fset(self, value)
set_value = rvalue if rvalue is not None else value
self._event_channel.post(fset.__name__)
else:
fence = {"in_set": False}
......@@ -107,9 +111,9 @@ class BeaconObject:
try:
fence["in_set"] = True
rvalue = fset(self, value)
self._settings[fset.__name__] = (
rvalue if rvalue is not None else value
)
set_value = rvalue if rvalue is not None else value
self._settings[fset.__name__] = set_value
self._event_channel.post(fset.__name__)
self._initialize_with_setting()
finally:
fence["in_set"] = False
......@@ -133,6 +137,8 @@ class BeaconObject:
self.__initialized = Cache(self, "initialized", default_value=False)
self._in_initialize_with_setting = False
self._event_channel = EventChannel(f"__EVENT__:{self.name}")
self._event_channel.register_callback(self.__event_handler)
@autocomplete_property
def config(self):
......@@ -190,7 +196,10 @@ class BeaconObject:
f"in file:{self.config.filename}"
)
self.config.update(d)
self._settings.remove(*self.__settings_properties().keys())
try:
self._settings.remove(*self.__settings_properties().keys())
except AttributeError: # apply config before init
pass
self.__initialized.value = False
self._initialize_with_setting()
......@@ -233,6 +242,7 @@ class BeaconObject:
)
else: # initialize setting
self._settings[name] = getattr(self, name)
self._event_channel.post(name)
if error_messages:
raise NotImplementedError("\n".join(error_messages))
......@@ -268,6 +278,19 @@ class BeaconObject:
def __config_getter(self):
return self.__filter_attribute(BeaconObject._config_getter)
def __event_handler(self, events):
events = [ev for ev in set(events) if event.get_receivers(self, ev)]
if not events:
return # noting to do
settings_values = self.settings.get_all()
for ev in events:
value = settings_values.get(ev)
try:
event.send(self, ev, value)
except Exception:
sys.excepthook(*sys.exc_info())
@staticmethod
def property(
fget=None,
......
......@@ -525,3 +525,67 @@ def clear_cache(*devices):
cached_channels = DEVICE_CACHE.get(device, [])
for channel in cached_channels:
channel.value = channel.default_value
class EventChannel(AdvancedInstantiationInterface):
def __new__(cls, name, *args, **kwargs):
bus = kwargs.get("bus")
if bus is None:
bus = Bus(kwargs.get("redis"))
event = bus.get_channel(name)
if event is None:
event = cls.instanciate(bus, name)
bus.set_channel(event)
return event
def __preinit__(self, bus, name):
self._bus = bus
self._name = name
self._callback_refs = set()
self.__pending_events = list()
self._subscribed_event = gevent.event.Event()
self.ready = True
@property
def name(self):
return self._name
def register_callback(self, callback):
if not callable(callback):
raise ValueError(
"Event {}: {!r} is not callable".format(self.name, callback)
)
cb_ref = saferef.safe_ref(callback)
self._callback_refs.add(cb_ref)
def unregister_callback(self, callback):
cb_ref = saferef.safe_ref(callback)
try:
self._callback_refs.remove(cb_ref)
except KeyError:
pass
def post(self, value):
self.__pending_events.append(value)
self._bus.schedule_update(self)
def close(self):
# Nothing to do
pass
@property
def _raw_value(self):
# create a raw value with the pending events
pending_events = self.__pending_events
self.__pending_events = list()
value = _Value(time.time(), pending_events)
return value
def _set_raw_value(self, raw_value):
value = raw_value.value
callbacks = [_f for _f in [ref() for ref in self._callback_refs] if _f]
for cb in callbacks:
try:
cb(value)
except:
sys.excepthook(*sys.exc_info())
......@@ -76,3 +76,31 @@ cached values for the list of devices provided as function arguments.
When the last client holding a channel disconnects, the channel is
removed. It is cleared from Redis. In case another channel with the same
name is created afterwards, reading it returns the default value.
## `EventChannel` object
EventChannel object is a way to distribute event system wide.
All pickable python object can be `post`
The receiver will get the posted event by block.
### Usage
Process A:
```py
BLISS [1]: from bliss.config.channels import EventChannel
BLISS [2]: c = EventChannel('test')
BLISS [3]: for i in range(10):
...: c.post(f'event {i}')
```
Process B:
```py
BLISS [1]: from bliss.config.channels import EventChannel
BLISS [2]: c = EventChannel('test')
BLISS [3]: def f(events_list):
...: print(events_list)
BLISS [4]: c.register_callback(f)
BLISS [5]: ['event 0', 'event 1', 'event 2', 'event 3', 'event 4', 'event 5', 'event 6', 'event 7', 'event 8', 'event 9']
```
......@@ -6,7 +6,9 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import numpy
import pytest
import gevent
from bliss.config.beacon_object import BeaconObject
from bliss.common import event
class Ctrl(BeaconObject):
......@@ -228,3 +230,47 @@ def test_config_and_settings_priority_test(beacon):
"mode",
"speed",
]
def test_event(beacon):
cfg = beacon.get("controller_setting2")
ctrl = Ctrl8(cfg)
events_dict = {"nb": 0}
current_values = dict()
cbk_event = gevent.event.Event()
def speed_cbk(value):
current_values["speed"] = value
cbk_event.set()
events_dict["nb"] += 1
def velocity_cbk(value):
current_values["velocity"] = value
cbk_event.set()
events_dict["nb"] += 1
def mode_cbk(value):
current_values["mode"] = value
cbk_event.set()
events_dict["nb"] += 1
def wait():
with gevent.Timeout(1):
while events_dict["nb"] < 3:
cbk_event.wait()
cbk_event.clear()
events_dict["nb"] = 0
event.connect(ctrl, "speed", speed_cbk)
event.connect(ctrl, "velocity", velocity_cbk)
event.connect(ctrl, "mode", mode_cbk)
# Init
ctrl.apply_config()
wait()
assert current_values == {"speed": 20, "velocity": 1.9, "mode": "fixed"}
ctrl.speed = 100
ctrl.velocity = 0.3
ctrl.mode = "Hello"
wait()
assert current_values == {"speed": 100, "velocity": .3, "mode": "Hello"}
......@@ -279,3 +279,35 @@ def test_2processes_set_channel_value_constructor(channel_subprocess):
c = channels.Channel("test_chan")
assert c.value == "test"
def test_channel_event():
e = channels.EventChannel("bla")
full_event_list = list()
called_cbk = {"nb": 0}
called = gevent.event.Event()
def f(events_list):
called_cbk["nb"] += 1
full_event_list.extend(events_list)
called.set()
e.register_callback(f)
first_events = [f"ev {i}" for i in range(10)]
second_events = [i for i in range(4)]
# post first events
[e.post(ev) for ev in first_events]
# give the hand to gevent loop and wait callback to be called
with gevent.Timeout(1):
called.wait()
called.clear()
# post second events
[e.post(ev) for ev in second_events]
# give the hand to gevent loop and wait callback to be called
with gevent.Timeout(1):
called.wait()
called.clear()
assert called_cbk["nb"] == 2
assert full_event_list == first_events + second_events
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment