settings: pipeline context

A simple context to group redis commands.
parent 5c9b6477
......@@ -5,6 +5,7 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from contextlib import contextmanager
from .conductor import client
from bliss.common.utils import Null
from bliss import setup_globals
......@@ -164,6 +165,64 @@ def scan(match="*", count=1000, connection=None):
break
def _get_connection(setting_object):
"""
Return the connection of a setting_object
"""
if isinstance(setting_object, Struct):
return setting_object._proxy._cnx
elif isinstance(setting_object, (SimpleSetting, QueueSetting, HashSetting)):
return setting_object._cnx
else:
raise TypeError(
f"Setting object should be one of: Struct, SimpleSetting, QueueSetting or HashSetting instead of {setting_object!r}"
)
def _set_connection(setting_object, new_cnx):
"""
change the connection of a setting_object
and return the previous connection
"""
if isinstance(setting_object, Struct):
cnx = setting_object._proxy._cnx
setting_object._proxy._cnx = new_cnx
elif isinstance(setting_object, (SimpleSetting, QueueSetting, HashSetting)):
cnx = setting_object._cnx
setting_object._cnx = new_cnx
else:
raise TypeError(
f"Setting object should be one of: Struct, SimpleSetting, QueueSetting or HashSetting instead of {setting_object!r}"
)
return cnx
@contextmanager
def pipeline(*settings):
"""
Contextmanager which create a redis pipeline to group redis commands
on settings.
IN CASE OF you execute the pipeline, it will return raw database values
(byte strings).
"""
first_settings = settings[0]
cnx = _get_connection(first_settings)()
# check they have the same connection
for s in settings[1:]:
if _get_connection(s)() != cnx:
raise RuntimeError("Cannot groupe redis commands in a pipeline")
pipeline = cnx.pipeline()
try:
# replace settings connection with the pipeline
previous_cnx = [_set_connection(s, weakref.ref(pipeline)) for s in settings]
yield pipeline
finally:
[_set_connection(s, c) for c, s in zip(previous_cnx, settings)]
pipeline.execute()
class SimpleSetting(object):
def __init__(
self,
......
......@@ -112,3 +112,18 @@ def test_queue_setting(session):
with pytest.raises(ValueError):
print(settings.InvalidValue())
def test_pipeline_settings(beacon):
t = settings.HashSetting("super_fancy")
values = [("val1", 1), ("val2", 2)]
with settings.pipeline(t):
for val_name, value in values:
t[val_name] = value
try:
with settings.pipeline(t) as p:
for val_name, value in values:
t[val_name] # get
assert p.execute() == [b"1", b"2"]
finally:
t.clear()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment