Commit 1337cd66 authored by Valentin Valls's avatar Valentin Valls
Browse files

Rename the 'data' stream to 'end' stream

parent dc8ecc5d
......@@ -40,7 +40,7 @@ A ScanNode is represented by 4 Redis keys:
{db_name} -> see DataNodeContainer
{db_name}_info -> see DataNodeContainer
{db_name}_children -> see DataNodeContainer
{db_name}_data -> contains the END event
{db_name}_end -> contains the END event
A ChannelDataNode is represented by 3 Redis keys:
......
......@@ -24,7 +24,7 @@ class ScanNode(DataNodeContainer):
def __init__(self, name, **kwargs):
super().__init__(self._NODE_TYPE, name, **kwargs)
# Lower priority than all other streams
self._sync_stream = self._create_stream("data", priority=1)
self._end_stream = self._create_stream("end", priority=1)
@property
def dataset(self):
......@@ -37,7 +37,7 @@ class ScanNode(DataNodeContainer):
return
# to avoid to have multiple modification events
# TODO: what does the comment above mean?
with settings.pipeline(self._sync_stream, self._info):
with settings.pipeline(self._end_stream, self._info):
event = EndScanEvent()
add_info = {
"end_time": event.time,
......@@ -45,7 +45,7 @@ class ScanNode(DataNodeContainer):
"end_timestamp": event.timestamp,
}
self._info.update(add_info)
self._sync_stream.add_event(event)
self._end_stream.add_event(event)
def _get_event_class(self, stream_event):
stream_event = stream_event[0]
......@@ -75,11 +75,11 @@ class ScanNode(DataNodeContainer):
def get_db_names(self, **kw):
db_names = super().get_db_names(**kw)
db_names.append(self._sync_stream.name)
db_names.append(self._end_stream.name)
return db_names
def get_settings(self):
return super().get_settings() + [self._sync_stream]
return super().get_settings() + [self._end_stream]
def _subscribe_streams(self, reader, first_index=None, **kw):
"""Subscribe to all associated streams of this node.
......@@ -88,7 +88,7 @@ class ScanNode(DataNodeContainer):
:param **kw: see DataNodeContainer
"""
super()._subscribe_streams(reader, first_index=first_index, **kw)
suffix = self._sync_stream.name.rsplit("_", 1)[-1]
suffix = self._end_stream.name.rsplit("_", 1)[-1]
self._subscribe_stream(
suffix, reader, first_index=0, create=True, ignore_excluded=True
)
......@@ -98,7 +98,7 @@ class ScanNode(DataNodeContainer):
:param DataStream stream:
:returns callable:
"""
if stream.name == self._sync_stream.name:
if stream.name == self._end_stream.name:
return self._iter_data_stream_events
return super(ScanNode, self).get_stream_event_handler(stream)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment