Commit dc8ecc5d authored by Valentin Valls's avatar Valentin Valls
Browse files

Allow to set priority during the _create_stream

parent e4bed8ac
...@@ -724,6 +724,9 @@ class DataNode(metaclass=DataNodeMetaClass): ...@@ -724,6 +724,9 @@ class DataNode(metaclass=DataNodeMetaClass):
self.__db_name = db_name self.__db_name = db_name
self.node_type = node_type self.node_type = node_type
self._priorities = {}
"""Hold priorities per streams only during the initialization"""
# The info dictionary associated to the DataNode # The info dictionary associated to the DataNode
self._info = settings.HashObjSetting(f"{db_name}_info", connection=connection) self._info = settings.HashObjSetting(f"{db_name}_info", connection=connection)
info_dict = self._init_info(create=create, **kwargs) info_dict = self._init_info(create=create, **kwargs)
...@@ -808,14 +811,20 @@ class DataNode(metaclass=DataNodeMetaClass): ...@@ -808,14 +811,20 @@ class DataNode(metaclass=DataNodeMetaClass):
kw.setdefault("connection", self.db_connection) kw.setdefault("connection", self.db_connection)
return streaming.DataStream(name, **kw) return streaming.DataStream(name, **kw)
def _create_stream(self, suffix, **kw): def _create_stream(self, suffix, priority: int = None, **kw):
"""Create a stream associated to this DataNode. """Create a stream associated to this DataNode.
:param str suffix: :param str suffix:
:param int priority: data from streams with a lower priority is never
yielded as long as higher priority streams have
data. Lower number means higher priority.
:param `**kw`: see `_create_nonassociated_stream` :param `**kw`: see `_create_nonassociated_stream`
:returns DataStream: :returns DataStream:
""" """
return self._create_nonassociated_stream(f"{self.db_name}_{suffix}", **kw) stream = self._create_nonassociated_stream(f"{self.db_name}_{suffix}", **kw)
if priority is not None:
self._priorities[stream.name] = priority
return stream
@classmethod @classmethod
def _streamid_to_idx(cls, streamID): def _streamid_to_idx(cls, streamID):
...@@ -1362,6 +1371,14 @@ class DataNode(metaclass=DataNodeMetaClass): ...@@ -1362,6 +1371,14 @@ class DataNode(metaclass=DataNodeMetaClass):
if not self.db_connection.exists(stream_name): if not self.db_connection.exists(stream_name):
return return
stream = self._create_nonassociated_stream(stream_name) stream = self._create_nonassociated_stream(stream_name)
if "priority" not in kw:
# If the priority is not forced by the _subscribe_stream arguments
# Use the one from the config
priority = self._priorities.pop(stream.name, None)
if priority is not None:
kw["priority"] = priority
reader.add_streams(stream, node=self, first_index=first_index, **kw) reader.add_streams(stream, node=self, first_index=first_index, **kw)
def _subscribe_streams( def _subscribe_streams(
......
...@@ -23,7 +23,8 @@ class ScanNode(DataNodeContainer): ...@@ -23,7 +23,8 @@ class ScanNode(DataNodeContainer):
def __init__(self, name, **kwargs): def __init__(self, name, **kwargs):
super().__init__(self._NODE_TYPE, name, **kwargs) super().__init__(self._NODE_TYPE, name, **kwargs)
self._sync_stream = self._create_stream("data") # Lower priority than all other streams
self._sync_stream = self._create_stream("data", priority=1)
@property @property
def dataset(self): def dataset(self):
...@@ -80,19 +81,6 @@ class ScanNode(DataNodeContainer): ...@@ -80,19 +81,6 @@ class ScanNode(DataNodeContainer):
def get_settings(self): def get_settings(self):
return super().get_settings() + [self._sync_stream] return super().get_settings() + [self._sync_stream]
def _subscribe_stream(self, stream_suffix, reader, first_index=None, **kw):
"""Subscribe to a particular stream associated with this node.
:param str stream_suffix: stream to add is "{db_name}_{stream_suffix}"
:param DataStreamReader reader:
:param str or int first_index: Redis stream index (None is now)
"""
stream_suffix_with_sepator = f"_{stream_suffix}"
if self._sync_stream.name.endswith(stream_suffix_with_sepator):
# Lower priority than all other streams
kw["priority"] = 1
super()._subscribe_stream(stream_suffix, reader, first_index=first_index, **kw)
def _subscribe_streams(self, reader, first_index=None, **kw): def _subscribe_streams(self, reader, first_index=None, **kw):
"""Subscribe to all associated streams of this node. """Subscribe to all associated streams of this node.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment