Commit d65a84b7 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

DataStream: create empty stream upon instantiation

parent 92edfb12
......@@ -15,6 +15,7 @@ import logging
from contextlib import contextmanager
from bliss.config.settings import BaseSetting, pipeline
from bliss.config import streaming_events
from bliss.config.conductor.redis_scripts import register_script, evaluate_script
logger = logging.getLogger(__name__)
......@@ -24,6 +25,25 @@ class CustomLogger(logging.LoggerAdapter):
return "[{}] {}".format(str(self.extra), msg), kwargs
create_stream_script = """
-- Atomic creation of an empty STREAM in Redis
-- KEYS[1]: redis-key of the STREAM
local streamkey = KEYS[1]
if (redis.call("EXISTS", streamkey)==0) then
redis.call("XADD", streamkey, "0-1", "key", "value")
redis.call("XDEL", streamkey, "0-1")
end
"""
def create_data_stream(name, connection):
register_script(connection, "create_stream_script", create_stream_script)
evaluate_script(connection, "create_stream_script", keys=(name,))
class DataStream(BaseSetting):
"""An ordered dictionary of dictionaries in Redis with optionally
a maximal number of items.
......@@ -39,7 +59,9 @@ class DataStream(BaseSetting):
The dictionary values are dictionaries which represent encoded StreamEvent's.
"""
def __init__(self, name, connection=None, maxlen=None, approximate=True):
def __init__(
self, name, connection=None, maxlen=None, approximate=True, create=False
):
"""
:param str name:
:param connection:
......@@ -49,6 +71,8 @@ class DataStream(BaseSetting):
super().__init__(name, connection, None, None)
self._maxlen = maxlen
self._approximate = approximate
if create:
create_data_stream(self.name, self.connection)
def __str__(self):
return f"{self.__class__.__name__}({self.name}, maxlen={self._maxlen})"
......@@ -434,7 +458,9 @@ class DataStreamReader:
raise TypeError("All streams must have the same redis connection")
# Create the synchronization stream
self.__synchro_stream = DataStream(str(uuid.uuid4()), maxlen=16, connection=cnx)
self.__synchro_stream = DataStream(
str(uuid.uuid4()), maxlen=16, connection=cnx, create=True
)
return self.__synchro_stream
@property
......@@ -671,6 +697,10 @@ class DataStreamReader:
self._logger.debug("RECEIVED events %d streams", len(lst))
read_priority = None
for name, events in lst:
if not events:
# This happens because of empty stream creation
# in create_stream_script.
continue
name = name.decode()
sinfo = self._active_streams[name]
if read_priority is None:
......
......@@ -821,6 +821,7 @@ class DataNode(metaclass=DataNodeMetaClass):
:returns DataStream:
"""
kw.setdefault("connection", self.db_connection)
kw.setdefault("create", self.__new_node)
return streaming.DataStream(name, **kw)
def _create_stream(self, suffix, **kw):
......
......@@ -23,7 +23,7 @@ def test_data_stream(wait_all_created, beacon):
"""Create stream and publish nevents
"""
nonlocal nstreams, stream_created, start_streaming
stream = streaming.DataStream(f"stream_{nevents}")
stream = streaming.DataStream(f"stream_{nevents}", create=True)
nstreams += 1
stream_created.set()
start_streaming.wait()
......@@ -77,11 +77,13 @@ def test_data_stream(wait_all_created, beacon):
order = []
# block=0: wait indefinitely for new events
for stream_name, events in connection.xread(streams_to_read, block=0):
if not events:
continue
nevents = int(stream_name.split(b"_")[1])
lst = data.setdefault(nevents, [])
for index, value in events:
for _, value in events:
lst.append(int(value[b"data"]))
streams_to_read[stream_name.decode()] = index
streams_to_read[stream_name.decode()] = events[-1][0]
order.append(nevents)
assert order == sorted(order, reverse=True), "read order not preserved"
if len(data) == len(streams_to_read):
......@@ -176,7 +178,7 @@ class DataStreamTestPublishers:
:param str stream_name:
"""
try:
stream = streaming.DataStream(stream_name)
stream = streaming.DataStream(stream_name, create=True)
idata = 0
while True:
gevent.sleep(random.random() / 1000)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment