Commit edb068c5 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

better logging for Redis streaming

parent e54dab9d
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF # Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info. # Distributed under the GNU LGPLv3. See LICENSE for more info.
import sys
import gevent import gevent
import uuid import uuid
import enum import enum
...@@ -350,7 +351,7 @@ class DataStreamReader: ...@@ -350,7 +351,7 @@ class DataStreamReader:
self.stop_handler = stop_handler self.stop_handler = stop_handler
def __str__(self): def __str__(self):
return "{}({} subscribed, {} activate, {} consumer".format( return "{}({} subscribed, {} active, {} consumer)".format(
self.__class__.__name__, self.__class__.__name__,
self.n_subscribed_streams, self.n_subscribed_streams,
self.n_active_streams, self.n_active_streams,
...@@ -448,10 +449,10 @@ class DataStreamReader: ...@@ -448,10 +449,10 @@ class DataStreamReader:
return return
with pipeline(synchro_stream): with pipeline(synchro_stream):
if end: if end:
self._logger.debug("SYNC_END") self._logger.debug("PUBLISH SYNC_END")
synchro_stream.add(self.SYNC_END) synchro_stream.add(self.SYNC_END)
else: else:
self._logger.debug("SYNC_EVENT") self._logger.debug("PUBLISH SYNC_EVENT")
synchro_stream.add(self.SYNC_EVENT) synchro_stream.add(self.SYNC_EVENT)
synchro_stream.ttl(60) synchro_stream.ttl(60)
...@@ -501,7 +502,7 @@ class DataStreamReader: ...@@ -501,7 +502,7 @@ class DataStreamReader:
continue continue
if not ignore_excluded and stream.name in self.excluded_stream_names: if not ignore_excluded and stream.name in self.excluded_stream_names:
continue continue
self._logger.debug(f"ADD STREAM {stream.name}") self._logger.debug("ADD STREAM %s", stream.name)
self.check_stream_connection(stream) self.check_stream_connection(stream)
sinfo = self._compile_stream_info( sinfo = self._compile_stream_info(
stream, first_index=first_index, priority=priority, **info stream, first_index=first_index, priority=priority, **info
...@@ -656,6 +657,7 @@ class DataStreamReader: ...@@ -656,6 +657,7 @@ class DataStreamReader:
with self._read_task_context(): with self._read_task_context():
keep_reading = True keep_reading = True
synchro_name = self._synchro_stream.name synchro_name = self._synchro_stream.name
self._logger.debug("READING events starts.")
while keep_reading: while keep_reading:
# When not waiting for new events (wait=False) # When not waiting for new events (wait=False)
# will stop reading after reading all current # will stop reading after reading all current
...@@ -664,8 +666,9 @@ class DataStreamReader: ...@@ -664,8 +666,9 @@ class DataStreamReader:
keep_reading = self._wait keep_reading = self._wait
# When wait=True: wait indefinitely when no events # When wait=True: wait indefinitely when no events
self._logger.debug("READING ...") self._logger.debug("READING events ...")
lst = self._read_active_streams() lst = self._read_active_streams()
self._logger.debug("RECEIVED events %d streams", len(lst))
read_priority = None read_priority = None
for name, events in lst: for name, events in lst:
name = name.decode() name = name.decode()
...@@ -676,16 +679,16 @@ class DataStreamReader: ...@@ -676,16 +679,16 @@ class DataStreamReader:
# Lower priority streams are never read until # Lower priority streams are never read until
# while higher priority streams have unread data # while higher priority streams have unread data
keep_reading = True keep_reading = True
self._logger.debug("SKIP %s: %d events", name, len(events)) self._logger.debug("SKIP %d events from %s", len(events), name)
break break
self._logger.debug("PROCESS %s: %d events", name, len(events)) self._logger.debug("PROCESS %d events from %s", len(events), name)
if name == synchro_name: if name == synchro_name:
self._process_synchro_events(events) self._process_synchro_events(events)
keep_reading = True keep_reading = True
else: else:
self._process_consumer_events(sinfo, events) self._process_consumer_events(sinfo, events)
gevent.idle() gevent.idle()
self._logger.debug("READING DONE.") self._logger.debug("EVENTS processed.")
# Keep reading when active streams are modified # Keep reading when active streams are modified
# by the consumer. This ensures that all streams # by the consumer. This ensures that all streams
...@@ -693,6 +696,7 @@ class DataStreamReader: ...@@ -693,6 +696,7 @@ class DataStreamReader:
self._wait_no_consuming() self._wait_no_consuming()
if not keep_reading: if not keep_reading:
keep_reading = self.has_new_synchro_events() keep_reading = self.has_new_synchro_events()
self._logger.debug("READING events finished.")
def _wait_no_consuming(self): def _wait_no_consuming(self):
"""Wait until the consumer is not processing an event """Wait until the consumer is not processing an event
...@@ -715,7 +719,7 @@ class DataStreamReader: ...@@ -715,7 +719,7 @@ class DataStreamReader:
for index, raw in events: for index, raw in events:
if streaming_events.EndEvent.istype(raw): if streaming_events.EndEvent.istype(raw):
# stop reader loop (does not stop consumer) # stop reader loop (does not stop consumer)
self._logger.debug("STOP reading event") self._logger.debug("RECEIVED stop event")
raise StopIteration raise StopIteration
self._synchro_index = index self._synchro_index = index
self._update_active_streams() self._update_active_streams()
...@@ -723,10 +727,19 @@ class DataStreamReader: ...@@ -723,10 +727,19 @@ class DataStreamReader:
def _log_events(self, task, stream, events): def _log_events(self, task, stream, events):
if self._logger.getEffectiveLevel() > logging.DEBUG: if self._logger.getEffectiveLevel() > logging.DEBUG:
return return
content = "\n ".join( content = "\n ".join(self._log_events_content(events))
[f"{raw[b'__EVENT__']}: {raw.get(b'db_name')}" for idx, raw in events] self._logger.debug(f"{task} from {stream.name}:\n {content}")
)
self._logger.debug(f"{task} {stream.name}:\n {content}") @staticmethod
def _log_events_content(events):
for _, raw in events:
evttype = raw[b"__EVENT__"].decode()
db_name = raw.get(b"db_name", b"").decode()
nbytes = sys.getsizeof(raw)
if db_name:
yield f"{evttype}: {db_name} ({nbytes} bytes)"
else:
yield f"{evttype}: {nbytes} bytes"
def _process_consumer_events(self, sinfo, events): def _process_consumer_events(self, sinfo, events):
"""Queue stream events and progress the index """Queue stream events and progress the index
...@@ -735,7 +748,7 @@ class DataStreamReader: ...@@ -735,7 +748,7 @@ class DataStreamReader:
:param dict sinfo: stream info :param dict sinfo: stream info
:param list events: list((index, raw))) :param list events: list((index, raw)))
""" """
self._log_events("QUEUE", sinfo["stream"], events) self._log_events("BUFFER events", sinfo["stream"], events)
self._queue.put((sinfo["stream"], events)) self._queue.put((sinfo["stream"], events))
sinfo["first_index"] = events[-1][0] sinfo["first_index"] = events[-1][0]
...@@ -769,14 +782,15 @@ class DataStreamReader: ...@@ -769,14 +782,15 @@ class DataStreamReader:
if not self._streams and not self._wait: if not self._streams and not self._wait:
self._queue.put(StopIteration) self._queue.put(StopIteration)
self._logger.debug("CONSUMING ...") self._logger.debug("CONSUMING events starts.")
for item in self._queue: for item in self._queue:
if isinstance(item, Exception): if isinstance(item, Exception):
raise item raise item
self._log_events("QUEUE", item[0], item[1]) self._log_events("CONSUME events", item[0], item[1])
self._consumer_state = self.ConsumerState.PROCESSING self._consumer_state = self.ConsumerState.PROCESSING
yield item yield item
self._consumer_state = self.ConsumerState.WAITING self._consumer_state = self.ConsumerState.WAITING
finally: finally:
self._consumer_state = self.ConsumerState.FINISHED self._consumer_state = self.ConsumerState.FINISHED
self._has_consumer = False self._has_consumer = False
self._logger.debug("CONSUMING events finished.")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment