Commit 50cc2c5d authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Remove TTL setting from DataNode

parent 0704b54a
......@@ -630,35 +630,6 @@ class DataNodeAsyncHelper:
self._nodes = None
def set_ttl(db_name):
"""Set the time-to-live upon garbage collection of DataNode
which was instantiated with `create==True` (also affects the parents).
"""
if DataNode._TIMETOLIVE is None:
return
# Do not create a Redis connection pool during garbage collection
connection = client.get_existing_redis_proxy(db=1, timeout=10)
if connection is None:
return
# New instance needs to be created because we are in garbage collection
# of the original instance
node = get_node(db_name, state="exists", connection=connection)
if node is not None:
node.set_ttl()
def enable_ttl(ttl: Number = 24 * 3600):
"""Enable `set_ttl`
"""
DataNode._TIMETOLIVE = ttl
def disable_ttl():
"""Disable `set_ttl`
"""
DataNode._TIMETOLIVE = None
class DataNodeMetaClass(type):
def __call__(cls, *args, **kwargs):
"""This wraps the __init__ execution
......@@ -739,7 +710,6 @@ class DataNode(metaclass=DataNodeMetaClass):
self._struct = self._create_struct(db_name, node_type)
else:
self.__new_node = False
self._ttl_setter = None
self._struct = self._get_struct(db_name, connection=self.db_connection)
def _register_stream_priority(self, fullname: str, priority: int):
......@@ -780,8 +750,6 @@ class DataNode(metaclass=DataNodeMetaClass):
self._struct.parent = parent.db_name
if add_to_parent:
parent.add_children(self)
# Set TTL on garbage collection
self._ttl_setter = weakref.finalize(self, set_ttl, self.__db_name)
def get_nodes(self, *db_names, **kw):
"""
......@@ -988,34 +956,6 @@ class DataNode(metaclass=DataNodeMetaClass):
def connect(self, signal, callback):
dispatcher.connect(callback, signal, self)
@protect_from_kill
def set_ttl(self, include_parents=True):
"""Set the time-to-live for all Redis objects associated to this node
"""
if self._TIMETOLIVE is not None:
self.apply_ttl(set(self.get_db_names(include_parents=include_parents)))
self.detach_ttl_setter()
def detach_ttl_setter(self):
"""Make sure ttl is not set upon garbage collection.
"""
if self._ttl_setter is not None:
self._ttl_setter.detach()
def apply_ttl(self, db_names):
"""Set time-to-live for a list of Redis objects
:param list(str) db_names:
"""
if self._TIMETOLIVE is None:
return
p = self.connection.pipeline()
try:
for name in db_names:
p.expire(name, self._TIMETOLIVE)
finally:
p.execute()
def get_db_names(self, include_parents=True):
"""All associated Redis keys, including the associated keys of the parents.
"""
......
......@@ -1250,17 +1250,6 @@ class Scan:
self._current_pipeline_stream = self.root_connection.pipeline()
return self._stream_pipeline_task
def set_ttl(self):
# node.get_db_names takes the most time
db_names = set()
nodes = list(self.nodes.values())
for node in nodes:
db_names |= set(node.get_db_names(include_parents=False))
db_names |= set(self.node.get_db_names())
self.node.apply_ttl(db_names)
for node in nodes:
node.detach_ttl_setter()
self.node.detach_ttl_setter()
def _device_event(self, event_dict=None, signal=None, sender=None):
if signal == "end":
......
......@@ -48,8 +48,6 @@ from bliss.tango.clients.utils import wait_tango_device, wait_tango_db
from bliss.shell.cli.repl import BlissRepl
from bliss import logging_startup
from bliss.scanning import scan_meta
from bliss.data.node import enable_ttl as _enable_ttl
from bliss.data.node import disable_ttl as _disable_ttl
import socket
BLISS = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
......@@ -413,22 +411,7 @@ def wait_ports(ports, timeout=10):
@pytest.fixture
def disable_ttl():
_disable_ttl()
@pytest.fixture
def enable_ttl(disable_ttl):
# We use `disable_ttl` to make sure enable has priority over disable,
# regardless of the fixture order
ttl = 24 * 3600
_enable_ttl(ttl)
yield ttl
_disable_ttl()
@pytest.fixture
def beacon(ports, disable_ttl):
def beacon(ports):
redis_db = redis.Redis(port=ports.redis_port)
redis_db.flushall()
redis_data_db = redis.Redis(port=ports.redis_data_port)
......
......@@ -17,7 +17,7 @@ from collections import OrderedDict
from bliss import setup_globals, current_session
from bliss.common import scans
from bliss.scanning.scan import Scan, ScanState, ScanAbort
from bliss.scanning.chain import AcquisitionChain, AcquisitionMaster, AcquisitionSlave
from bliss.scanning.chain import AcquisitionChain, AcquisitionSlave
from bliss.scanning.acquisition.motor import SoftwarePositionTriggerMaster
from bliss.scanning.acquisition.counter import SamplingCounterAcquisitionSlave
from bliss.config.settings import scan as redis_scan
......@@ -74,7 +74,7 @@ def test_parent_node(session):
assert isinstance(parent_node, DataNodeContainer)
def test_scan_node(session, redis_data_conn, enable_ttl):
def test_scan_node(session, redis_data_conn):
scan_saving = session.scan_saving
parent = scan_saving.get_parent_node()
m = getattr(setup_globals, "roby")
......@@ -108,9 +108,6 @@ def test_scan_node(session, redis_data_conn, enable_ttl):
scan_info_dict = redis_data_conn.hgetall(s.node.db_name + "_info")
assert pickle.loads(scan_info_dict[b"metadata"]) == 42
for db_name in redis_data_conn.keys("*"):
assert redis_data_conn.ttl(db_name) > 0
roby_node_db_name = s.node.db_name + ":axis"
scan_children_node = [roby_node_db_name]
roby_children_node = [
......@@ -125,13 +122,10 @@ def test_scan_node(session, redis_data_conn, enable_ttl):
assert children(s.node.db_name) == [x.encode() for x in scan_children_node]
assert children(roby_node_db_name) == [x.encode() for x in roby_children_node]
for child_node_name in scan_children_node + roby_children_node:
assert redis_data_conn.ttl(child_node_name) > 0
assert sessions_list()[0].name == "test_session"
def test_interrupted_scan(session, redis_data_conn, enable_ttl):
def test_interrupted_scan(session, redis_data_conn):
"""
Start a scan and simulate a ctrl-c.
"""
......@@ -147,34 +141,20 @@ def test_interrupted_scan(session, redis_data_conn, enable_ttl):
s = Scan(chain, "test_scan")
assert s._Scan__state == ScanState.IDLE
assert s.state == ScanState.IDLE
# Run scan in greenlet
scan_task = gevent.spawn(s.run)
# IDLE->PREPARING->STARTING->STOPPING->DONE
# Wait for scan state to be STARTING
with gevent.Timeout(2):
s.wait_state(ScanState.STARTING)
assert s._Scan__state == ScanState.STARTING
assert s.state == ScanState.STARTING
# Stop the scan like a ctrl-c
with pytest.raises(ScanAbort):
scan_task.kill(KeyboardInterrupt)
scan_task.get()
assert redis_data_conn.ttl(s.node.db_name) > 0
roby_node_db_name = s.node.db_name + ":axis"
scan_children_node = [roby_node_db_name]
roby_children_node = [
roby_node_db_name + ":roby",
roby_node_db_name + ":simulation_diode_sampling_controller:diode",
]
for child_node_name in scan_children_node + roby_children_node:
assert redis_data_conn.ttl(child_node_name) > 0
assert s.state == ScanState.USER_ABORTED
def _validate_node_indexing(node, shape, dtype, npoints, expected_data, extract):
......@@ -429,31 +409,6 @@ def test_corrupt_lima_data_channel_node(lima_session, redis_data_conn):
assert len(images) == 0
def test_ttl_on_data_node(beacon, redis_data_conn, enable_ttl):
redis_data_conn.delete("testing")
node = DataNode("test", "testing", create=True)
node.set_ttl()
assert redis_data_conn.ttl("testing") == enable_ttl
del node
assert redis_data_conn.ttl("testing") == enable_ttl
redis_data_conn.delete("testing")
node = DataNode("test", "testing", create=True)
assert redis_data_conn.ttl("testing") == -1
del node
assert redis_data_conn.ttl("testing") == enable_ttl
def test_ttl_setter(session, capsys, enable_ttl):
heater = getattr(setup_globals, "heater")
diode = getattr(setup_globals, "diode")
roby = getattr(setup_globals, "roby")
robz = getattr(setup_globals, "robz")
s = scans.loopscan(1, .1, heater, diode, run=False)
out, err = capsys.readouterr()
assert err == ""
def test_walk_after_nodes_disappeared(session, streaming_debug_logging):
detector = session.env_dict["diode"]
s = scans.loopscan(1, 0.1, detector)
......
......@@ -220,32 +220,14 @@ def test_sequence_invalid_group(session):
s2 = scans.loopscan(3, .05, diode, run=False)
with pytest.raises(ScanSequenceError):
g = Group(s1, s2)
Group(s1, s2)
n = get_or_create_node("bla:bla:bla")
with pytest.raises(ScanSequenceError):
g = Group(s1, n)
Group(s1, n)
with pytest.raises(ScanSequenceError):
g = Group(s1, 158453)
def test_sequence_ttl(session, enable_ttl):
diode = session.config.get("diode")
s1 = scans.loopscan(3, .1, diode)
s1_ttl1 = s1.node.connection.ttl(s1.node.db_name)
gevent.sleep(2)
seq = Sequence()
with seq.sequence_context() as seq_context:
seq_context.add(scans.loopscan(3, .1, diode))
s1_ttl2 = s1.node.connection.ttl(s1.node.db_name)
seq_context.add(s1)
s1_ttl3 = s1.node.connection.ttl(s1.node.db_name)
assert s1_ttl1 > s1_ttl2
assert s1_ttl2 < s1_ttl3
Group(s1, 158453)
def test_sequence_events(session):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment