Commit 527ef615 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

scan data expires in 5 minutes

parent 50cc2c5d
from bliss.config.conductor import client
def set_expiration_date(keys, seconds):
"""Set the expiration date of all Redis keys
"""
async_proxy = client.get_redis_proxy(db=1).pipeline()
try:
for name in keys:
async_proxy.expire(name, seconds)
finally:
async_proxy.execute()
......@@ -8,7 +8,6 @@
from gevent.queue import Queue
import gevent
from contextlib import contextmanager
import numpy
from bliss.scanning.chain import (
AcquisitionMaster,
AcquisitionSlave,
......@@ -20,6 +19,9 @@ from bliss.data.nodes.scan import ScanNode
from bliss.scanning.scan import ScanState, ScanPreset
from bliss.scanning.scan_info import ScanInfo
from bliss.common.logtools import user_warning
from bliss import current_session
from bliss.config.settings import scan as scan_redis
from bliss.config.conductor import client
class ScanGroup(Scan):
......@@ -250,7 +252,7 @@ class Sequence:
if self._scan is None:
return self._scan_info
else:
self.scan.scan_info
return self._scan.scan_info
@property
def state(self):
......@@ -296,14 +298,8 @@ class Group(Sequence):
class GroupingMaster(AcquisitionMaster):
def __init__(self):
AcquisitionMaster.__init__(
self,
None,
name="GroupingMaster",
npoints=0,
prepare_once=True,
start_once=True,
super().__init__(
None, name="GroupingMaster", npoints=0, prepare_once=True, start_once=True
)
self.scan_queue = Queue()
......@@ -348,12 +344,8 @@ class GroupingMaster(AcquisitionMaster):
self._number_channel.emit(int(scan.info["scan_nb"]))
self._node_channel.emit(scan.db_name)
# Reset the node TTL's
if scan.connection.ttl(scan.db_name) > 0:
scan.set_ttl()
for n in scan.walk(wait=False):
if n.connection.ttl(n.db_name) > 0:
n.set_ttl(include_parents=False)
self._reset_expiration_date(scan)
except BaseException:
self._publish_success &= False
raise
......@@ -362,6 +354,15 @@ class GroupingMaster(AcquisitionMaster):
finally:
self._publish_event.set()
def _reset_expiration_date(self, scan_node):
proxy = client.get_redis_proxy(db=1)
if proxy.ttl(scan_node.db_name) == -1:
return
scan_keys = set(scan_redis(scan_node.db_name + ":*", connection=proxy))
scan_keys |= set(scan_node.get_db_names(include_parents=False))
parent_keys = set(scan_node.parent.get_db_names(include_parents=True))
current_session.scan_saving.set_expiration_date(scan_keys, parent_keys)
def wait_all_published(self, timeout=None):
"""Wait until `_publish_new_subscan` is called for all subscans
that are queued. Publishing is done by iterating over this
......@@ -393,12 +394,12 @@ class GroupingMaster(AcquisitionMaster):
pass
class GroupingSlave(
AcquisitionSlave
): # one instance of this for channels published `on the fly` and one that is called after the scan?
def __init__(self, name, channels):
class GroupingSlave(AcquisitionSlave):
"""For custom sequence channels
"""
AcquisitionSlave.__init__(self, None, name=name)
def __init__(self, name, channels):
super().__init__(None, name=name)
self.start_event = gevent.event.Event()
for channel in channels:
self.channels.append(channel)
......
......@@ -831,10 +831,32 @@ class Scan:
self.node.end(self._scan_info, exception=_exception)
with capture():
self.set_ttl()
self.set_expiration_date()
self._update_scan_info_in_redis()
def set_expiration_date(self):
"""Set the expiration date of all Redis keys associated to this scan
"""
scan_keys = self.get_db_names()
parent_keys = self.get_parent_db_names()
self.__scan_saving.set_expiration_date(scan_keys, parent_keys)
def get_db_names(self):
"""Get all Redis keys associated to this scan
"""
db_names = set()
nodes = list(self.nodes.values())
for node in nodes:
db_names |= set(node.get_db_names(include_parents=False))
db_names |= set(self.node.get_db_names(include_parents=False))
return db_names
def get_parent_db_names(self):
"""Get all Redis keys associated to the parents of this scan
"""
return set(self.node.parent.get_db_names(include_parents=True))
def _init_scan_number(self):
self.writer.template.update(
{
......@@ -1250,7 +1272,6 @@ class Scan:
self._current_pipeline_stream = self.root_connection.pipeline()
return self._stream_pipeline_task
def _device_event(self, event_dict=None, signal=None, sender=None):
if signal == "end":
if self._USE_PIPELINE_MGR:
......
......@@ -37,6 +37,7 @@ from bliss.common.utils import autocomplete_property
from bliss.icat.proposal import Proposal
from bliss.icat.dataset_collection import DatasetCollection
from bliss.icat.dataset import Dataset
from bliss.data.expiration import set_expiration_date
_SCAN_SAVING_CLASS = None
......@@ -330,7 +331,7 @@ class BasicScanSaving(EvalParametersWardrobe):
"data_policy",
]
REDIS_SETTING_PREFIX = "scan_saving"
SLOTS = []
SLOTS = ["_data_expiration_time"]
def __init__(self, name=None):
"""
......@@ -357,6 +358,7 @@ class BasicScanSaving(EvalParametersWardrobe):
The *parent* node should be use as parameters for the Scan.
"""
self._data_expiration_time = 300 # seconds
if not name:
name = str(uuid.uuid4().hex)
super().__init__(
......@@ -731,6 +733,13 @@ class BasicScanSaving(EvalParametersWardrobe):
"""
pass
def set_expiration_date(self, scan_keys, parent_keys):
"""Set the expiration date of all Redis keys
"""
set_expiration_date(
itertools.chain(scan_keys, parent_keys), self._data_expiration_time
)
class ESRFScanSaving(BasicScanSaving):
"""Parameterized representation of the scan data file path
......@@ -1590,3 +1599,8 @@ class ESRFScanSaving(BasicScanSaving):
if dataset.is_closed:
raise RuntimeError("Dataset is already closed (choose a different name)")
dataset.gather_metadata(on_exists="skip")
def set_expiration_date(self, scan_keys, _):
"""Set the expiration date of the Redis keys of the scan only
"""
set_expiration_date(scan_keys, self._data_expiration_time)
import pytest
import gevent
from bliss.common.scans import loopscan
from bliss.scanning.group import Sequence
from bliss.scanning.group import Group
from ..data_policies import set_data_policy
@pytest.fixture
def esrf_session(session, icat_backend):
set_data_policy(session, "esrf")
session.scan_saving.writer = "null"
yield session
def assert_expiration(redis_data_conn, scans, esrf=False):
if not isinstance(scans, (list, tuple)):
scans = [scans]
for scan in scans:
prefix = scan.node.db_name.encode()
compare_ttl = None
for key in redis_data_conn.keys("*"):
ttl = redis_data_conn.ttl(key)
if key.startswith(prefix):
# Key associated to the scan
assert ttl > 0, key
elif prefix.startswith(key):
# Key associated to a parent of the scan
if esrf:
assert ttl == -1, key
else:
assert ttl > 0, key
if ttl == -1:
continue
if compare_ttl is None:
compare_ttl = ttl
else:
assert abs(compare_ttl - ttl) < 0.5, key
def normal_scan(session):
diode = session.env_dict["diode"]
return loopscan(1, 0.1, diode)
def interrupted_scan(session):
diode = session.env_dict["diode"]
scan = loopscan(1000, 0.1, diode, run=False)
gscan = gevent.spawn(scan.run)
gevent.sleep(2)
gscan.kill(KeyboardInterrupt)
return scan
def sequence_scan(session):
diode = session.env_dict["diode"]
scan1 = loopscan(3, .1, diode)
gevent.sleep(1)
seq = Sequence()
with seq.sequence_context() as seq_context:
scan2 = loopscan(3, .05, diode)
seq_context.add(scan2)
seq_context.add(scan1)
return seq, scan1, scan2
def group_scan(session):
diode = session.env_dict["diode"]
scan1 = loopscan(3, .1, diode)
gevent.sleep(1)
scan2 = loopscan(3, .05, diode)
gevent.sleep(1)
group = Group(scan2, scan1)
return group, scan1, scan2
def test_expiration_after_scan(session, redis_data_conn):
scan = normal_scan(session)
assert_expiration(redis_data_conn, scan, esrf=False)
def test_expiration_after_scan_esrf(esrf_session, redis_data_conn):
scan = normal_scan(esrf_session)
assert_expiration(redis_data_conn, scan, esrf=True)
def test_expiration_after_interrupted_scan(session, redis_data_conn):
scan = interrupted_scan(session)
assert_expiration(redis_data_conn, scan, esrf=False)
def test_expiration_after_interrupted_scan_esrf(esrf_session, redis_data_conn):
scan = interrupted_scan(esrf_session)
assert_expiration(redis_data_conn, scan, esrf=True)
def test_expiration_after_sequence(session, redis_data_conn):
scans = sequence_scan(session)
assert_expiration(redis_data_conn, scans, esrf=False)
def test_expiration_after_sequence_esrf(esrf_session, redis_data_conn):
scans = sequence_scan(esrf_session)
assert_expiration(redis_data_conn, scans, esrf=True)
def test_expiration_after_group(session, redis_data_conn):
scans = group_scan(session)
assert_expiration(redis_data_conn, scans, esrf=False)
def test_expiration_after_group_esrf(esrf_session, redis_data_conn):
scans = group_scan(esrf_session)
assert_expiration(redis_data_conn, scans, esrf=True)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment