Commit c3d1a8f4 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Modify recursive stream searching+subscribing to the modified ScanNode streams

parent bf3c0e20
Pipeline #46229 passed with stages
in 142 minutes and 58 seconds
......@@ -1558,31 +1558,32 @@ class DataNodeContainer(DataNode):
search_data_streams = False
# Subscribe to the streams of all children, not only the direct children.
# TODO: this assumes that all streams to subscribe too are called
# "*_children_list" and "*_data". Can be solved with DataNode
# derived class self-registration and each class adding
# stream suffixes and orders.
# TODO: this makes assumptions about the data nodes and their streams.
# Any change in streams (rename stream, add new streams, ...) will
# affect this code.
# Subscribe to streams found by a recursive search
nodes_with_data = list()
nodes_with_data = dict()
search_suffixes = {"data": ["data"], "end": ["prepared", "end"]}
nodes_with_children = list()
excluded_stream_names = set(reader.excluded_stream_names)
if search_data_streams:
# Make sure the NEW_NODE event always arrives before the NEW_DATA event:
# Make sure the NEW_NODE event always arrives before any other node event:
# - assume "...:parent_children_list" is created BEFORE "...parent:child_data"
# - search for *_children_list AFTER searching for *_data
# - subscribe to *_children_list BEFORE subscribing to *_data
node_names = self._search_nodes_with_streams(
"data", excluded_stream_names, include_parent=False
nodes_with_data = list(
for suffix in search_suffixes:
node_names = self._search_nodes_with_streams(
suffix, excluded_stream_names, include_parent=False
nodes_with_data[suffix] = list(
if not exclude_my_children:
node_names = self._search_nodes_with_streams(
"children_list", excluded_stream_names, include_parent=False
......@@ -1597,8 +1598,13 @@ class DataNodeContainer(DataNode):
# Subscribe to the streams that were searched
for node in nodes_with_children:
node._subscribe_stream("children_list", reader, first_index=first_index)
for node in nodes_with_data:
node._subscribe_stream("data", reader, first_index=first_index)
for search_suffix, nodes in nodes_with_data.items():
subscribe_suffixes = search_suffixes[search_suffix]
for node in nodes:
for subscribe_suffix in subscribe_suffixes:
subscribe_suffix, reader, first_index=first_index
# Exclude searched Redis keys from further subscription attempts
reader.excluded_stream_names |= excluded_stream_names
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment