Commit 2a242c9d authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch '2335-ci-test_publishing-hangs' into 'master'

Reduce probability of hanging CI

See merge request !3179
parents d2a201f2 8c8a648d
Pipeline #38346 failed with stages
in 204 minutes and 29 seconds
......@@ -7,6 +7,7 @@
import os, sys
import io
import gevent
from . import connection
from .connection import StolenLockException
from functools import wraps
......@@ -21,10 +22,6 @@ def get_default_connection():
return _default_connection
def has_default_connection():
return _default_connection is not None
class _StringIO(io.StringIO):
def __enter__(self, *args, **kwags):
return self
......@@ -113,6 +110,24 @@ def get_redis_connection(
)
def get_existing_redis_connection(
db=0, single_connection_client=False, pool_name="default", timeout=None
):
"""Returns None when no global connection pool or when timed out
"""
if _default_connection is None:
return None
try:
with gevent.Timeout(timeout, TimeoutError):
return _default_connection.get_redis_connection(
db=db,
single_connection_client=single_connection_client,
pool_name=pool_name,
)
except TimeoutError:
return None
@check_connection
def clean_all_redis_connection(connection=None):
return connection.clean_all_redis_connection()
......
......@@ -99,6 +99,7 @@ import pkgutil
import os
import weakref
import warnings
from numbers import Number
from bliss.common.event import dispatcher
from bliss.common.utils import grouped
from bliss.common.greenlet_utils import protect_from_kill, AllowKill
......@@ -332,13 +333,32 @@ def _create_node(*args, **kwargs):
def set_ttl(db_name):
"""Create a new DataNode in order to call its set_ttl
"""Set the time-to-live upon garbage collection of DataNode
which was instantiated with `create==True` (also affects the parents).
"""
if DataNode._TIMETOLIVE is None:
return
# Do not create a Redis connection pool during garbage collection
if client.has_default_connection():
node = get_node(db_name, state="exists")
if node is not None:
node.set_ttl()
connection = client.get_existing_redis_connection(db=1, timeout=10)
if connection is None:
return
# New instance needs to be created because we are in garbage collection
# of the original instance
node = get_node(db_name, state="exists", connection=connection)
if node is not None:
node.set_ttl()
def enable_ttl(ttl: Number = 24 * 3600):
"""Enable `set_ttl`
"""
DataNode._TIMETOLIVE = ttl
def disable_ttl():
"""Disable `set_ttl`
"""
DataNode._TIMETOLIVE = None
class DataNodeMetaClass(type):
......@@ -361,7 +381,7 @@ class DataNode(metaclass=DataNodeMetaClass):
a `DataNode` depending on its state.
"""
default_time_to_live = 24 * 3600 # 1 day
_TIMETOLIVE = 24 * 3600 # 1 day
VERSION = (1, 0) # change major version for incompatible API changes
@staticmethod
......@@ -617,11 +637,12 @@ class DataNode(metaclass=DataNodeMetaClass):
def set_ttl(self):
"""Set the time-to-live for all Redis objects associated to this node
"""
self.apply_ttl(set(self.get_db_names()))
self.ttl_is_set()
if self._TIMETOLIVE is not None:
self.apply_ttl(set(self.get_db_names()))
self.detach_ttl_setter()
def ttl_is_set(self):
"""This DataNode's ttl has been set
def detach_ttl_setter(self):
"""Make sure ttl is not set upon garbage collection.
"""
if self._ttl_setter is not None:
self._ttl_setter.detach()
......@@ -631,10 +652,12 @@ class DataNode(metaclass=DataNodeMetaClass):
:param list(str) db_names:
"""
if self._TIMETOLIVE is None:
return
p = self.connection.pipeline()
try:
for name in db_names:
p.expire(name, self.default_time_to_live)
p.expire(name, self._TIMETOLIVE)
finally:
p.execute()
......
......@@ -1230,8 +1230,8 @@ class Scan:
db_names |= set(self.node.get_db_names())
self.node.apply_ttl(db_names)
for node in nodes:
node.ttl_is_set()
self.node.ttl_is_set()
node.detach_ttl_setter()
self.node.detach_ttl_setter()
def _device_event(self, event_dict=None, signal=None, sender=None):
with time_profile(self._stats_dict, "scan.events.device", logger=logger):
......
......@@ -46,6 +46,8 @@ from bliss.common.utils import grouped
from bliss.tango.clients.utils import wait_tango_device, wait_tango_db
from bliss import logging_startup
from bliss.scanning import scan_meta
from bliss.data.node import enable_ttl as _enable_ttl
from bliss.data.node import disable_ttl as _disable_ttl
import socket
BLISS = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
......@@ -313,7 +315,22 @@ def wait_ports(ports, timeout=10):
@pytest.fixture
def beacon(ports):
def disable_ttl():
_disable_ttl()
@pytest.fixture
def enable_ttl(disable_ttl):
# We use `disable_ttl` to make sure enable has priority over disable,
# regardless of the fixture order
ttl = 24 * 3600
_enable_ttl(ttl)
yield ttl
_disable_ttl()
@pytest.fixture
def beacon(ports, disable_ttl):
redis_db = redis.Redis(port=ports.redis_port)
redis_db.flushall()
redis_data_db = redis.Redis(port=ports.redis_data_port)
......
......@@ -64,7 +64,7 @@ def test_parent_node(session):
assert isinstance(parent_node, DataNodeContainer)
def test_scan_node(session, redis_data_conn):
def test_scan_node(session, redis_data_conn, enable_ttl):
scan_saving = session.scan_saving
parent = scan_saving.get_parent_node()
m = getattr(setup_globals, "roby")
......@@ -120,7 +120,7 @@ def test_scan_node(session, redis_data_conn):
assert sessions_list()[0].name == "test_session"
def test_interrupted_scan(session, redis_data_conn):
def test_interrupted_scan(session, redis_data_conn, enable_ttl):
"""
Start a scan and simulate a ctrl-c.
"""
......@@ -412,22 +412,22 @@ def test_corrupt_lima_data_channel_node(lima_session, redis_data_conn):
assert len(images) == 0
def test_ttl_on_data_node(beacon, redis_data_conn):
def test_ttl_on_data_node(beacon, redis_data_conn, enable_ttl):
redis_data_conn.delete("testing")
node = DataNode("test", "testing", create=True)
node.set_ttl()
assert redis_data_conn.ttl("testing") == DataNode.default_time_to_live
assert redis_data_conn.ttl("testing") == enable_ttl
del node
assert redis_data_conn.ttl("testing") == DataNode.default_time_to_live
assert redis_data_conn.ttl("testing") == enable_ttl
redis_data_conn.delete("testing")
node = DataNode("test", "testing", create=True)
assert redis_data_conn.ttl("testing") == -1
del node
assert redis_data_conn.ttl("testing") == DataNode.default_time_to_live
assert redis_data_conn.ttl("testing") == enable_ttl
def test_ttl_setter(session, capsys):
def test_ttl_setter(session, capsys, enable_ttl):
heater = getattr(setup_globals, "heater")
diode = getattr(setup_globals, "diode")
roby = getattr(setup_globals, "roby")
......
......@@ -229,7 +229,7 @@ def test_sequence_invalid_group(session):
g = Group(s1, 158453)
def test_sequence_ttl(session):
def test_sequence_ttl(session, enable_ttl):
diode = session.config.get("diode")
s1 = scans.loopscan(3, .1, diode)
s1_ttl1 = s1.node.connection.ttl(s1.node.db_name)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment