Commit b04f42ac authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Redis caching: the cache is managed by a Redis connection pool,

so effectively by the beacon connection.
parent dad11d95
Pipeline #38933 passed with stages
in 90 minutes and 7 seconds
......@@ -14,7 +14,7 @@ from sortedcontainers import SortedKeyList
from collections.abc import MutableSequence
from bliss.config import settings
from bliss.config.settings_cache import get_redis_client_cache
from bliss.config.conductor.client import get_caching_redis_proxy
from bliss import current_session
from bliss import global_map
from bliss.common.proxy import Proxy
......@@ -76,7 +76,8 @@ def get_active_name():
"""
session_name = current_session.name
active_mg_name = settings.SimpleSetting(
"%s:active_measurementgroup" % session_name, connection=get_redis_client_cache()
"%s:active_measurementgroup" % session_name,
connection=get_caching_redis_proxy(),
)
return active_mg_name.get()
......@@ -97,7 +98,8 @@ def set_active_name(name):
session_name = current_session.name
active_mg_name = settings.SimpleSetting(
"%s:active_measurementgroup" % session_name, connection=get_redis_client_cache()
"%s:active_measurementgroup" % session_name,
connection=get_caching_redis_proxy(),
)
active_mg_name.set(name)
......@@ -283,12 +285,12 @@ class MeasurementGroup:
# Current State
self._current_state = settings.SimpleSetting(
"%s" % name, default_value="default", connection=get_redis_client_cache()
"%s" % name, default_value="default", connection=get_caching_redis_proxy()
)
# list of states ; at least one "default" state
self._all_states = settings.QueueSetting(
"%s:MG_states" % name, connection=get_redis_client_cache()
"%s:MG_states" % name, connection=get_caching_redis_proxy()
)
self._all_states.set(["default"])
......@@ -345,7 +347,7 @@ class MeasurementGroup:
def _disabled_setting(self):
# key is : "<MG name>:<state_name>" ex : "MG1:default"
_key = "%s:%s" % (self.name, self._current_state.get())
return settings.QueueSetting(_key, connection=get_redis_client_cache())
return settings.QueueSetting(_key, connection=get_caching_redis_proxy())
@_check_counter_name
def disable(self, *counter_patterns):
......
......@@ -14,7 +14,8 @@ import sys
from bliss.common import event
from bliss.common.greenlet_utils import KillMask
from bliss.config.channels import Channel
from bliss.config import settings, settings_cache
from bliss.config import settings
from bliss.config.conductor.client import get_caching_redis_proxy
def setting_update_from_channel(value, setting_name=None, axis=None):
......@@ -104,7 +105,7 @@ class AxisSettings:
self._disabled_settings = disabled_settings_namedtuple(
set(), dict(axis.config.config_dict)
)
cnx_cache = settings_cache.get_redis_client_cache()
cnx_cache = get_caching_redis_proxy()
self._hash = settings.HashSetting(
"axis.%s" % axis.name,
default_values=axis.config.config_dict,
......
......@@ -8,10 +8,10 @@
import os, sys
import io
import warnings
import gevent
from . import connection
from .connection import StolenLockException
from functools import wraps
import gevent
from bliss.config.conductor import connection
_default_connection = None
......@@ -102,39 +102,34 @@ def get_cache_address(connection=None):
return connection.get_redis_connection_address()
def get_redis_connection(single_connection_client=False, **kw):
def get_redis_connection(**kw):
"""This doesn't return a connection but a proxy which may hold
a connection.
"""
if single_connection_client:
warnings.warn("Use 'get_redis_proxy' instead", FutureWarning)
return get_redis_proxy(**kw)
else:
warnings.warn("Use 'get_fixed_connection_redis_proxy' instead", FutureWarning)
return get_fixed_connection_redis_proxy(**kw)
warnings.warn("Use 'get_redis_proxy' instead", FutureWarning)
return get_redis_proxy(**kw)
@check_connection
def get_redis_proxy(db=0, connection=None, pool_name="default"):
def get_redis_proxy(db=0, connection=None):
"""Greenlet-safe proxy.
:returns SafeRedisDbProxy:
"""
return connection.get_redis_proxy(db=db, pool_name=pool_name)
return connection.get_redis_proxy(db=db)
@check_connection
def get_fixed_connection_redis_proxy(db=0, connection=None, pool_name="default"):
"""NOT greenlet-safe.
def get_caching_redis_proxy(db=0, connection=None, shared_cache=True):
"""Greenlet-safe proxy.
:returns FixedConnectionRedisDbProxy:
:param bool shared_cache:
:returns CachingRedisDbProxy:
"""
return connection.get_fixed_connection_redis_proxy(db=db, pool_name=pool_name)
return connection.get_caching_redis_proxy(db=db, shared_cache=shared_cache)
def get_existing_redis_proxy(
db=0, single_connection_client=False, pool_name="default", timeout=None
):
def get_existing_redis_proxy(db=0, timeout=None):
"""Greenlet-safe proxy.
:returns None or SafeRedisDbProxy:
......@@ -143,7 +138,7 @@ def get_existing_redis_proxy(
return None
try:
with gevent.Timeout(timeout, TimeoutError):
return _default_connection.get_redis_proxy(db=db, pool_name=pool_name)
return _default_connection.get_redis_proxy(db=db)
except TimeoutError:
return None
......
......@@ -74,7 +74,7 @@ class ConnectionException(Exception):
Exception.__init__(self, *args, **kwargs)
RedisProxyId = namedtuple("RedisProxyId", ["name", "db", "type"])
RedisProxyId = namedtuple("RedisProxyId", ["db", "type"])
class Connection:
......@@ -201,17 +201,17 @@ class Connection:
# (these proxies don't hold a `redis.Redis.Connection` instance)
self._reusable_redis_proxies = {} # {RedisProxyId: SafeRedisDbProxy}
# Keep weak references to all non-reusable Redis connections
# (these proxies don't hold a `redis.Redis.Connection` instance)
self._non_reusable_redis_proxies = (
weakref.WeakSet()
) # set(FixedConnectionRedisDbProxy)
# Keep weak references to all Redis connection pools:
# Keep weak references to all reusable Redis connection pools:
self._redis_connection_pools = (
weakref.WeakValueDictionary()
) # {RedisProxyId: RedisDbConnectionPool}
# Keep weak references to all cached Redis proxies which are not
# reused (although they could be but their cache with kep growing)
self._non_reusable_redis_proxies = (
weakref.WeakValueDictionary()
) # {id: RedisDbProxy}
# Hard references to the connection pools are held by the
# Redis proxies themselves. Connections of RedisDbConnectionPool
# are closed upon garbage collection of RedisDbConnectionPool. So
......@@ -421,54 +421,45 @@ class Connection:
def _get_redis_conn_pool(self, proxyid: RedisProxyId):
"""Get a Redis connection pool (create when it does not exist yet)
for the pool name and db.
for the db.
:param RedisProxyId proxyid:
:returns RedisDbConnectionPool:
"""
pool = self._redis_connection_pools.get(proxyid)
if pool is None:
address = self.get_redis_connection_address()
if proxyid.db == 1:
try:
address = self.get_redis_data_server_connection_address()
except RuntimeError: # Service not running on beacon server
pass
host, port = address
if host == "localhost":
redis_url = f"unix://{port}"
else:
redis_url = f"redis://{host}:{port}"
pool = redis_connection.create_connection_pool(
redis_url, proxyid.db, client_name=self.CLIENT_NAME
)
pool = self._create_redis_conn_pool(proxyid)
self._redis_connection_pools[proxyid] = pool
return pool
def get_redis_connection(self, single_connection_client=False, **kw):
if single_connection_client:
warnings.warn("Use 'get_redis_proxy' instead", FutureWarning)
return self.get_redis_proxy(**kw)
else:
warnings.warn(
"Use 'get_fixed_connection_redis_proxy' instead", FutureWarning
)
return self.get_fixed_connection_redis_proxy(**kw)
def get_redis_proxy(self, db=0, pool_name="default"):
"""Get a proxy to a Redis database. When a proxy already exists
for `pool_name` and `db`, return that one. The proxy is greenlet-safe
as it gets a new `redis.connection.Connection` from its pool every
time it executes a Redis command.
def _create_redis_conn_pool(self, proxyid: RedisProxyId):
"""
:param RedisProxyId proxyid:
:returns RedisDbConnectionPool:
"""
address = self.get_redis_connection_address()
if proxyid.db == 1:
try:
address = self.get_redis_data_server_connection_address()
except RuntimeError: # Service not running on beacon server
pass
The proxy will be closed (return connection to pool) when the
instantiating greenlet is garbage collected.
host, port = address
if host == "localhost":
redis_url = f"unix://{port}"
else:
redis_url = f"redis://{host}:{port}"
return redis_connection.create_connection_pool(
redis_url,
proxyid.db,
caching=proxyid.type == "caching",
client_name=self.CLIENT_NAME,
)
:returns SafeRedisDbProxy:
def _get_reusable_redis_proxy(self, proxyid: RedisProxyId):
"""Get a reusabed proxy and create it when it doesn't exist.
"""
with self._get_redis_lock:
proxyid = RedisProxyId(name=pool_name, db=db, type="safe")
proxy = self._reusable_redis_proxies.get(proxyid)
if proxy is None:
pool = self._get_redis_conn_pool(proxyid)
......@@ -476,30 +467,45 @@ class Connection:
self._reusable_redis_proxies[proxyid] = proxy
return proxy
def get_fixed_connection_redis_proxy(self, db=0, pool_name="default"):
"""Get a new proxy to a Redis database. The proxy is not greenlet-safe
as it holds a single redis `redis.connection.Connection` which is
used for all commands.
def _get_non_reusable_redis_proxy(self, proxyid: RedisProxyId):
"""Get a reusabed proxy and create it when it doesn't exist.
"""
with self._get_redis_lock:
pool = self._create_redis_conn_pool(proxyid)
proxy = pool.create_proxy()
# We need something unique for this instance,
# proxyid is not unique
self._non_reusable_redis_proxies[id(proxy)] = proxy
return proxy
def get_redis_connection(self, **kw):
warnings.warn("Use 'get_redis_proxy' instead", FutureWarning)
return self.get_redis_proxy(**kw)
def get_redis_proxy(self, db=0):
"""Get a proxy to a Redis database. When a proxy already exists
for the `db`, return that one. The proxy is greenlet-safe
as it gets a new `redis.connection.Connection` from its pool every
time it executes a Redis command.
Warning: Pipelines created from such a proxy will create a new
`redis.connection.Connection`.
:returns SafeRedisDbProxy:
"""
proxyid = RedisProxyId(db=db, type="safe")
return self._get_reusable_redis_proxy(proxyid)
The proxy will be closed (return connection to pool) when the
instantiating greenlet is garbage collected.
def get_caching_redis_proxy(self, db=0, shared_cache=True):
"""This returns a proxy with Redis client side caching. When
`shared_cache=True` we will use an existing proxy so that all
proxies you get this way share the same cache.
:returns FixedConnectionRedisDbProxy:
:param bool shared_cache:
:returns CachingRedisDbProxy:
"""
with self._get_redis_lock:
proxyid = RedisProxyId(name=pool_name, db=db, type="fixed")
pool = self._get_redis_conn_pool(proxyid)
proxy = pool.create_fixed_connection_proxy()
self._non_reusable_redis_proxies.add(proxy)
# A fixed connection cannot be shared between greenlets so
# close the proxy when the greenlet is destroyed:
weakref.finalize(
gevent.getcurrent(), self._close_fixed_connection_redis_proxy, id(proxy)
)
return proxy
proxyid = RedisProxyId(db=db, type="caching")
if shared_cache:
return self._get_reusable_redis_proxy(proxyid)
else:
return self._get_non_reusable_redis_proxy(proxyid)
def close_all_redis_connections(self):
# To close `redis.connection.Connection` you need to call its
......@@ -510,10 +516,10 @@ class Connection:
# socket instances.
#
# Note: closing a proxy will not close any connections
proxies = list(self._non_reusable_redis_proxies)
proxies = list(self._non_reusable_redis_proxies.values())
proxies += list(self._reusable_redis_proxies.values())
self._reusable_redis_proxies = dict()
self._non_reusable_redis_proxies = weakref.WeakSet()
self._non_reusable_redis_proxies = weakref.WeakValueDictionary()
for proxy in proxies:
proxy.connection_pool.disconnect()
......@@ -521,16 +527,6 @@ class Connection:
warnings.warn("Use 'close_all_redis_connections' instead", FutureWarning)
self.close_all_redis_connections()
def _close_fixed_connection_redis_proxy(self, proxyid):
"""Return the `redis.connection.Connection` instance the reusable
proxy associated to this name and database, back to its connection
pool. This is also called upon the proxy's garbage collected.
"""
for proxy in self._non_reusable_redis_proxies:
if id(proxy) == proxyid:
proxy.close()
break
@check_connect
def get_config_file(self, file_path, timeout=3.0):
with gevent.Timeout(timeout, RuntimeError("Can't get configuration file")):
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import os
import weakref
import gevent
import gevent.event
from functools import wraps
from collections.abc import MutableMapping
"""Implementation of a client-side Redis cache with invalidation connection.
"""
class RedisCacheError(RuntimeError):
pass
class CacheInvalidationGreenlet(gevent.Greenlet):
"""Greenlet that subscribes to the Redis invalidation key
"""
_REDIS_INVALIDATION_KEY = b"__redis__:invalidate"
def __init__(self, connection_pool, db_cache, running_event):
self._connection_pool = connection_pool
self._db_cache = db_cache
self._running_event = running_event
self._tracking_redirect_id = None
self._close_pipe_read = None
self._close_pipe_write = None
super().__init__()
def kill(self, blocking=True, timeout=None):
"""Try to close through the pipe first, then kill the greenlet.
"""
self._close_fd(self._close_pipe_write)
if blocking:
with gevent.Timeout(timeout):
try:
self.join(timeout=3)
except gevent.Timeout:
super().kill()
else:
super().kill(blocking=blocking, timeout=timeout)
def _run(self):
try:
pubsub = self._setup_invalidation_connection()
self._db_cache._cache = dict()
self._running_event.set()
self._invalidation_loop(pubsub)
finally:
self._running_event.clear()
self._db_cache._cache = None
self._tracking_redirect_id = None
self._close_fd(self._close_pipe_write)
self._close_pipe_write = None
self._close_fd(self._close_pipe_read)
self._close_pipe_read = None
@staticmethod
def _close_fd(fd):
if fd is None:
return
try:
os.close(fd)
except OSError:
# already closed
pass
def _setup_invalidation_connection(self):
"""We need to maintain a single connection (so a fixed client id)
which subscribes to invalidation messages send to the Redis
invalidation key `__redis__:invalidate`.
"""
proxy = self._connection_pool.create_uncached_single_connection_proxy()
tracking_redirect_id = proxy.client_id()
pubsub = proxy.pubsub()
# Pass the fixed connection to the PubSub instance. We no longer
# need the proxy that held the connection. Make sure closing the
# proxy does not give the connection back to the pool.
pubsub.connection = proxy.connection
proxy.connection = None
del proxy
pubsub.subscribe(self._REDIS_INVALIDATION_KEY)
confirmation = pubsub.get_message(timeout=5)
assert confirmation["type"] == "subscribe"
assert confirmation["data"] == 1
rp, wp = os.pipe()
self._close_pipe_read = rp
self._close_pipe_write = wp
def local_kill(*args):
try:
os.close(wp)
except OSError: # pipe was already closed
pass
# When the pubsub connection tries to reconnect, we need to
# stop this greenlet and invalidate the entire cache.
pubsub.connection.register_connect_callback(local_kill)
self._tracking_redirect_id = tracking_redirect_id
return pubsub
def _invalidation_loop(self, pubsub):
"""This loop is stopped when killing the greenlet or when the
pubsub instance tries to reconnect. This means it was disconnected
and we could have missed invalidation messages.
"""
read_fds = [pubsub.connection._sock, self._close_pipe_read]
while True:
# Returns immediately
msg = pubsub.get_message()
if msg is None:
read_event, _, _ = gevent.select.select(read_fds, [], [])
if self._close_pipe_read in read_event:
# The connection was closed
break
# There is a new pubsub message
continue
if msg.get("channel") != self._REDIS_INVALIDATION_KEY:
continue
# The message is an invalidation event
inv_keys = msg.get("data")
if not isinstance(inv_keys, list):
continue
# Invalidate local caching of these Redis keys,
# which means remove those keys
for key in inv_keys:
try:
self._db_cache.pop(key.decode(), None)
except TypeError:
# The cache is in the process of closing down
# (set to None which gives this TypeError)
break
except UnicodeDecodeError:
pass
def track_connection(self, connection):
"""Send the tracking redirect command to Redis. As a result, we
will get invalidation messages for all keys that are get/set over
this connection will cause the server
:param redis.connection.Connection connection:
"""
if not self._tracking_redirect_id:
raise RedisCacheError("Redis key invalidation greenlet is not running")
# Requires Redis protocol RESP3, supported by Redis 6.
connection.send_command(
"CLIENT",
"TRACKING",
"on",
"REDIRECT",
self._tracking_redirect_id,
"BCAST",
"NOLOOP",
)
if connection.read_response() != b"OK":
raise RuntimeError("Cannot start Redis key invalidation tracking")
def check_enabled(method):
"""Raises `RedisCacheError` when the cache is not enabled
"""
@wraps(method)
def inner(self, *args, **kw):
if not self.enabled:
raise RedisCacheError("The Redis cache is not enabled")
return method(self, *args, **kw)
return inner
class RedisCache(MutableMapping):
"""Redis cache with a mutable mapping interface, which raises
ConnectionRefusedError when not subscribed to the Redis invalidation key.
"""
def __init__(self, connection_pool):
self._connection_pool = connection_pool
self._running_event = gevent.event.Event()
# Until caching is done on the connection level:
self.cache_lock = gevent.lock.RLock()
self._cache = None
self._invalidation_greenlet = None
super().__init__()
def __del__(self):
self.disable(blocking=False)
@property
def enabled(self):
return self._running_event.is_set()
@property
def disabled(self):
return not self.enabled
def enable(self, timeout=None):
if self.enabled:
return
with gevent.Timeout(timeout):
glt = CacheInvalidationGreenlet(
self._connection_pool, weakref.proxy(self), self._running_event
)
glt.start()
try:
self._running_event.wait()
except gevent.Timeout:
# Show why the greenlet did not start
try:
glt.get(timeout=0)
except gevent.Timeout:
pass
# Show the original timeout error
raise
self._invalidation_greenlet = glt
def disable(self, blocking=True, timeout=None):
if self.disabled:
return
if self._invalidation_greenlet:
self._invalidation_greenlet.kill(blocking=blocking, timeout=timeout)
if not self._invalidation_greenlet:
self._invalidation_greenlet = None
@check_enabled
def __setitem__(self, key, value):
self._cache[key] = value
@check_enabled
def __getitem__(self, key):
return self._cache[key]
@check_enabled
def __delitem__(self, key):
del self._cache[key]