Commit 48e37fd5 authored by Valentin Valls's avatar Valentin Valls
Browse files

Rework priorities to make them registrable from outside the stream creation

parent ba44b02c
......@@ -726,7 +726,7 @@ class DataNode(metaclass=DataNodeMetaClass):
self.node_type = node_type
self._priorities = {}
"""Hold priorities per streams only during the initialization"""
"""Hold priorities per streams."""
# The info dictionary associated to the DataNode
self._info = settings.HashObjSetting(f"{db_name}_info", connection=connection)
......@@ -744,6 +744,17 @@ class DataNode(metaclass=DataNodeMetaClass):
self._ttl_setter = None
self._struct = self._get_struct(db_name, connection=self.db_connection)
def _register_stream_priority(self, fullname: str, priority: int):
Register the stream priority which will be used on the reader side.
:paran str fullname: Full name of the stream
:param int priority: data from streams with a lower priority is never
yielded as long as higher priority streams have
data. Lower number means higher priority.
self._priorities[fullname] = priority
def add_prefetch(self, async_proxy=None):
"""As long as caching on the proxy level exists in CachingRedisDbProxy,
we need to prefetch settings like this.
......@@ -812,19 +823,14 @@ class DataNode(metaclass=DataNodeMetaClass):
kw.setdefault("connection", self.db_connection)
return streaming.DataStream(name, **kw)
def _create_stream(self, suffix, priority: int = None, **kw):
def _create_stream(self, suffix, **kw):
"""Create a stream associated to this DataNode.
:param str suffix:
:param int priority: data from streams with a lower priority is never
yielded as long as higher priority streams have
data. Lower number means higher priority.
:param `**kw`: see `_create_nonassociated_stream`
:returns DataStream:
stream = self._create_nonassociated_stream(f"{self.db_name}_{suffix}", **kw)
if priority is not None:
self._priorities[] = priority
return stream
......@@ -1373,12 +1379,10 @@ class DataNode(metaclass=DataNodeMetaClass):
stream = self._create_nonassociated_stream(stream_name)
if "priority" not in kw:
# If the priority is not forced by the _subscribe_stream arguments
# Use the one from the config
priority = self._priorities.pop(, None)
if priority is not None:
kw["priority"] = priority
# Use the priority as it was setup
priority = self._priorities.get(, 0)
if priority is not None:
kw["priority"] = priority
reader.add_streams(stream, node=self, first_index=first_index, **kw)
......@@ -20,7 +20,8 @@ class _ChannelDataNodeBase(DataNode):
def __init__(self, name, **kwargs):
super().__init__(self._NODE_TYPE, name, **kwargs)
self._queue = self._create_stream("data", maxlen=CHANNEL_MAX_LEN, priority=2)
self._queue = self._create_stream("data", maxlen=CHANNEL_MAX_LEN)
self._register_stream_priority(f"{self.db_name}_data", 2)
self._last_index = self._idx_to_streamid(0)
......@@ -32,10 +32,11 @@ class ScanNode(DataNodeContainer):
def __init__(self, name, **kwargs):
super().__init__(self._NODE_TYPE, name, **kwargs)
# Lower priority than all other streams
self._end_stream = self._create_stream("end", priority=3)
# Lower priority than NEW_NODE, higher than NEW_DATA
self._prepared_stream = self._create_stream("prepared", priority=1)
self._end_stream = self._create_stream("end")
self._prepared_stream = self._create_stream("prepared")
# Register to priority as the following way: NEW DATA > PREPARED > NEW NODE > END
self._register_stream_priority(, 3)
self._register_stream_priority(, 1)
def dataset(self):
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment