Commit fb904a7c authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'helper-for-scans-watcher' into 'master'

Added helper for scans watcher

See merge request !3772
parents 7ee3ca98 ef2235bc
Pipeline #48565 failed with stages
in 105 minutes and 35 seconds
......@@ -11,6 +11,7 @@ import numpy
import gevent
import typing
import warnings
import contextlib
from typing import Dict
......@@ -232,9 +233,17 @@ class ScansWatcher:
self._ready_event = gevent.event.Event()
"""Handle the ready event"""
self._terminated_event = gevent.event.Event()
"""Handle the end event"""
self._no_scans_event = gevent.event.Event()
"""Handle the amount of listened scans"""
self._stop_handler = DataStreamReaderStopHandler()
"""Handler to be able to stop the event loop"""
self._no_scans_event.set()
def wait_ready(self, timeout: float = None):
"""Wait until the scan watcher is ready to receive new event.
......@@ -248,6 +257,24 @@ class ScansWatcher:
"""
self._ready_event.wait(timeout=timeout)
def wait_no_more_running_scans(self, timeout: float = None):
"""Wait until there is no more running scans in this watcher.
"""
self._no_scans_event.wait(timeout=timeout)
def wait_terminated(self, timeout: float = None):
"""Wait until the scan watcher is terminated.
The steps between `started` and `ready` can takes few seconds depending
on the amount of data and the load of Redis.
Arguments:
timeout: If not `None`, it should be a floating point number
specifying a timeout for the operation in seconds
(or fractions thereof).
"""
self._terminated_event.wait(timeout=timeout)
def running_scan_names(self) -> typing.Sequence[str]:
"""
Returns the known running scans with there db names.
......@@ -306,6 +333,31 @@ class ScansWatcher:
return key
return None
@contextlib.contextmanager
def watch(self):
"""Context manager to start and stop the watcher.
It uses a gevent spawn.
Yield:
The spawned greenlet
"""
try:
gwatcher = gevent.spawn(self.run)
gwatcher.name = "bliss_scans_watcher"
self.wait_ready(timeout=3)
yield gwatcher
finally:
self.wait_no_more_running_scans(timeout=2)
self.stop()
try:
self.wait_terminated(timeout=1)
finally:
try:
gwatcher.join(timeout=1)
except Exception:
gwatcher.kill()
def run(self):
"""
Run watching scan events.
......@@ -316,6 +368,7 @@ class ScansWatcher:
when `exclude_existing_scans` is True.
"""
assert not self._running
self._terminated_event.clear()
self._running = True
try:
session_node = get_or_create_node(self._session_name, node_type="session")
......@@ -349,12 +402,14 @@ class ScansWatcher:
# New scan was created
scan_info = node.info.get_all()
self._running_scans.add(db_name)
self._no_scans_event.clear()
observer.on_scan_created(db_name, scan_info)
elif node_type == "scan_group":
if self._watch_scan_group:
# New scan was created
scan_info = node.info.get_all()
self._running_scans.add(db_name)
self._no_scans_event.clear()
observer.on_scan_created(db_name, scan_info)
else:
scan_db_name = self._get_scan_db_name_from_child(db_name)
......@@ -439,10 +494,12 @@ class ScansWatcher:
sys.excepthook(*sys.exc_info())
finally:
self._running_scans.discard(db_name)
if len(self._running_scans) == 0:
self._no_scans_event.set()
gevent.idle()
finally:
self._running = False
self._terminated_event.set()
def stop(self):
"""Call it to stop the event loop."""
......
......@@ -71,13 +71,8 @@ def watching(session, observer, exclude_groups=False):
if not exclude_groups:
watcher.set_watch_scan_group(True)
watcher.set_observer(observer)
session_watcher = gevent.spawn(watcher.run)
watcher.wait_ready(timeout=3)
try:
with watcher.watch():
yield watcher
finally:
gevent.sleep(0.5)
session_watcher.kill()
def test_simple_continuous_scan_with_session_watcher(session, test_observer):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment