Commit 3cbcbbeb authored by Wout De Nolf's avatar Wout De Nolf

[writer] threaded architecture

parent d9489d36
Pipeline #33552 failed with stages
in 149 minutes and 41 seconds
......@@ -34,7 +34,7 @@ from silx.io.dictdump import dicttojson, dicttoini
from .io_utils import mkdir
from ..utils import data_merging
from ..utils.process_utils import file_processes
from ..utils.async_utils import SharedLockPool
from ..sync.lock_utils import SharedLockPool
DEFAULT_PLOT_NAME = "plotselect"
......@@ -969,8 +969,8 @@ class File(h5py.File):
are executed.
"""
lockname = os.path.abspath(filename)
with self._LOCKPOOL.acquire(None):
with self._LOCKPOOL.acquire(lockname):
with self._LOCKPOOL.acquire_context(None):
with self._LOCKPOOL.acquire_context(lockname):
yield
@contextmanager
......@@ -978,7 +978,7 @@ class File(h5py.File):
"""Protected section associated to this file.
"""
lockname = os.path.abspath(self.filename)
with self._LOCKPOOL.acquire(lockname):
with self._LOCKPOOL.acquire_context(lockname):
yield
......
......@@ -42,7 +42,7 @@ def default_saveoptions():
}
class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
class NexusScanWriter(scan_writer_base.NexusScanWriter):
"""
Listen to events of a particular scan and write the result in Nexus format.
Extra information in Redis needed (see `devices.redis_info` and `..data.generator`).
......@@ -54,8 +54,8 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
This is not needed for the nxentry because only one writer
will create/update it.
:param args: see `NexusScanWriterBase`
:param kwargs: see `NexusScanWriterBase`
:param args: see `NexusScanWriter`
:param kwargs: see `NexusScanWriter`
"""
for option, default in default_saveoptions().items():
kwargs[option] = kwargs.get(option, default)
......@@ -285,11 +285,10 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
:param str definition_name:
:returns tuple, dict:
"""
start_timestamp = self.get_info("start_timestamp")
if not start_timestamp:
self._h5missing("start_timestamp")
start_time = self.scan_start_time
if not start_time:
self._h5missing("start_time")
return None
start_time = datetime.datetime.fromtimestamp(start_timestamp)
datasets = {"definition": definition_name}
args = (name,)
kwargs = {"start_time": start_time, "datasets": datasets}
......
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Redis subscriber utilities
"""
import gevent
import gevent.time
import redis
import threading
from bliss.config.conductor import client
from bliss.data.node import get_node as _get_node
from bliss.data.node import _get_node_object
from nexus_writer_service.sync import greenlet_utils
from nexus_writer_service.sync import thread_utils
class SubscriberGreenlet(
greenlet_utils.NamedGreenlet, greenlet_utils.GreenletWithResources
):
pass
class SubscriberThread(thread_utils.Thread):
def __init__(self, name):
"""
:param str name: thread name
"""
super().__init__(SubscriberGreenlet, name=name)
class RedisConnectionGenerator:
def __init__(self):
"""Instantiates the global Bliss connection object
`bliss.config.conductor.client._default_connection`
when it does not exist yet. No other thread can use this object.
"""
self.host, self.port = client.get_cache_address()
self.thread_ident = threading.current_thread().ident
@property
def port(self):
return self._port
@port.setter
def port(self, value):
try:
self._port = int(value)
self._is_socket = False
except ValueError:
self._port = value
self._is_socket = True
def connection(self):
if self.is_instantiating_thread:
return None # Use global connection object
else:
return self.new_connection()
@property
def is_instantiating_thread(self):
return self.thread_ident == threading.current_thread().ident
def new_connection(self):
"""Bliss refers to redis.Redis as a connection although
it is more like a connection pool.
:returns redis.Redis:
"""
if self._is_socket:
return redis.Redis(db=1, unix_socket_path=self.port)
else:
return redis.Redis(db=1, host=self.host, port=self.port)
def get_node(db_name, node_type=None, connection=None):
"""
Get DataNode instance event if the Redis node does not exist yet
:param str db_name:
:param str node_type:
:param connection Redis:
"""
node = _get_node(db_name, connection=connection)
if node is None:
node = _get_node_object(node_type, db_name, None, connection)
return node
class PeriodicTask:
def __init__(self, task, period=0):
self.reset()
self._period = period
self._task = task
def reset(self):
self._tm0 = gevent.time.time()
def execute(self):
tm = gevent.time.time()
if (tm - self._tm0) > self._period:
self._task()
self._tm0 = tm
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Synchronization primitives that work across threads and greenlets
.. autosummary::
:toctree:
async_utils
collection_utils
event_utils
greenlet_utils
hub_utils
lock_utils
queue_utils
thread_utils
"""
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import gevent
import gevent.monkey
import gevent.time
import logging
import time
from itertools import count
from ..utils import logging_utils
from . import greenlet_utils
logger = logging.getLogger(__name__)
class CustomLogger(logging.LoggerAdapter):
def process(self, msg, kwargs):
tname, gname = greenlet_utils.greenlet_name(withthread=True)
return (f"{tname}({gname}) [{self.extra}] {msg}", kwargs)
def log_gevent():
"""
Redirect gevent exception stream to error stream
"""
gevent.get_hub().exception_stream = logging_utils.err_stream
def blocking_sleep():
try:
func = gevent.monkey.saved["time"]["sleep"]
except KeyError:
func = time.sleep
return func
def blocking_call(duration=3, logger=None):
"""
Block the gevent's event loop
"""
func = blocking_sleep()
msg = f"Block event loop for {duration} sec ({func})"
if logger:
logger.error(msg)
else:
logging_utils.print_err(msg)
func(duration)
def start_heartbeat(logger, interval=0.5, max_delay=1, name=None):
"""
This spawns a heartbeat greenlet (for testing)
:param logger:
:param num interval:
:param num max_delay:
:param str name:
"""
if not name:
name = "HeartBeat"
if callable(logger):
printmsg = logger
else:
printmsg = logger.warning
class HeartBeat(gevent.Greenlet):
def __init__(self):
self.name = name
super().__init__()
def _run(self):
try:
c = count()
t0 = gevent.time.time()
while True:
t1 = gevent.time.time()
dt = t1 - t0
if dt > max_delay:
err_msg = f"{name} delay of {dt} sec too large (maximum: {max_delay} sec)"
raise RuntimeError(err_msg)
if gevent.config.monitor_thread:
suffix = "monitored"
else:
suffix = "not monitored"
printmsg(f"{name}{next(c)} ({suffix}, delay: {dt} sec)")
gevent.sleep(interval)
t0 = t1
finally:
printmsg("heartbeat exits")
g = HeartBeat()
# signal_utils.kill_on_exit(g.kill, asynch=True)
g.start()
return g
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import threading
import gevent.lock
from . import lock_utils
_ITER_METHODS = {"__iter__", "__reversed__", "keys", "values", "items"}
def add_safe_method(cls, methodname):
if methodname in _ITER_METHODS:
def method(self, *args, **kwargs):
with self._lock:
_method = getattr(self._collection, methodname)
yield from _method(*args, **kwargs)
else:
def method(self, *args, **kwargs):
with self._lock:
_method = getattr(self._collection, methodname)
return _method(*args, **kwargs)
method.__name__ = methodname
method.__qualname__ = f"{cls.__name__}.{methodname}"
setattr(cls, methodname, method)
class SafeCollection(type):
"""Abstract metaclass that adds methods with lock decorator
"""
LOCK_CLASS = NotImplemented
def __new__(self, name, bases, dct):
cls = super().__new__(self, name, bases, dct)
for name in cls.METHODS:
add_safe_method(cls, name)
return cls
def __call__(cls, *args, **kwargs):
instance = super().__call__()
instance._lock = cls.LOCK_CLASS()
instance._collection = instance.COLLECTION(*args, **kwargs)
return instance
class ThreadGreenletSafeCollection(SafeCollection):
LOCK_CLASS = lock_utils.RLock
class ThreadSafeCollection(SafeCollection):
LOCK_CLASS = threading.RLock
class GreenletSafeCollection(SafeCollection):
LOCK_CLASS = gevent.lock.RLock
CONTAINER_METHODS = {"__contains__", "__str__", "__repr__"}
SIZED_METHODS = {"__len__"}
ITERABLE_METHODS = {"__iter__"}
REVERSIBLE_METHODS = {"__reversed__"}
COLLECTION_METHODS = CONTAINER_METHODS | SIZED_METHODS | ITERABLE_METHODS
SEQUENCE_METHODS = COLLECTION_METHODS | REVERSIBLE_METHODS | {"__getitem__"}
MUTABLESEQUENCE_METHODS = SEQUENCE_METHODS | {
"__setitem__",
"__delitem__",
"__iadd__",
"insert",
"append",
"reverse",
"extend",
"pop",
"remove",
"index",
"count",
}
SET_METHODS = COLLECTION_METHODS | {
"__eq__",
"__ne__",
"__le__",
"__lt__",
"__gt__",
"__ge__",
"__and__",
"__or__",
"__sub__",
"__xor__",
"isdisjoint",
}
MUTABLESET_METHODS = SET_METHODS | {
"__ior__",
"__iand__",
"__ixor__",
"__isub__",
"clear",
"pop",
"remove",
"add",
"discard",
}
MAPPING_METHODS = COLLECTION_METHODS | {
"__getitem__",
"__eq__",
"__ne__",
"get",
"keys",
"values",
"items",
}
MUTABLEMAPPING_METHODS = MAPPING_METHODS | {
"__setitem__",
"__delitem__",
"pop",
"popitem",
"clear",
"update",
"setdefault",
}
class List(metaclass=ThreadGreenletSafeCollection):
COLLECTION = list
METHODS = MUTABLESEQUENCE_METHODS
class Set(metaclass=ThreadGreenletSafeCollection):
COLLECTION = set
METHODS = MUTABLESET_METHODS
class Dict(metaclass=ThreadGreenletSafeCollection):
COLLECTION = dict
METHODS = MUTABLEMAPPING_METHODS
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import threading
import gevent
import gevent.event
from . import hub_utils
from . import lock_utils
class SimpleEvent:
"""Event that can be set in any thread but only waited
for in the instantiating thread.
"""
def __init__(self, name=None):
if name is None:
name = self.__class__.__name__
self.name = name
self._event = gevent.event.Event()
self._notifier = hub_utils.HubTrigger(
self._event.set, name=f"{name}Trigger", blocking=True
)
self._threadid = threading.get_ident()
def __str__(self):
return self.name
def set(self, raise_on_destroyed=False):
"""
:param bool raise_on_destroyed:
"""
if raise_on_destroyed:
self._notifier.send()
else:
with hub_utils.skip_loop_destroyed(self._notifier._watcher.loop):
self._notifier.send()
def wait(self, timeout=None):
"""Must be called in the instantiating thread
"""
if self._threadid != threading.get_ident():
raise RuntimeError("You can only wait in the instantiating thread")
self._event.wait(timeout=timeout)
def __getattr__(self, attr):
return getattr(self._event, attr)
class Event:
"""Event that can be set and waited for in any thread.
"""
def __init__(self, name=None):
if name is None:
name = self.__class__.__name__
self.name = name
self._flag = False
self._flaglock = lock_utils.Lock(name=f"{self}FlagLock")
self._resources = lock_utils.ThreadResource(self._init_resource)
def _init_resource(self):
event = gevent.event.Event()
notifier = hub_utils.HubTrigger(event.set, name=f"{self}Trigger", blocking=True)
return event, notifier
def __str__(self):
return self.name
@property
def _event(self):
ev = self._resources.get()[0]
with self._flaglock:
if self._flag:
ev.set()
return ev
@property
def _notifiers(self):
for _, notifier in self._resources:
yield notifier
@property
def _events(self):
for event, _ in self._resources:
yield event
def is_set(self):
return self._flag
def isSet(self):
return self._flag
def ready(self):
return self._flag
def set(self, raise_on_destroyed=False):
"""
:param bool raise_on_destroyed:
"""
with self._flaglock:
self._flag = True
for notifier in self._notifiers:
if raise_on_destroyed:
notifier.send()
else:
with hub_utils.skip_loop_destroyed(notifier._watcher.loop):
notifier.send()
def clear(self):
with self._flaglock:
self._flag = False
for event in self._events:
event.clear()
def wait(self, timeout=None):
self._event.wait(timeout=timeout)
# -*- coding: utf-8 -*-
#
# This file is part of the nexus writer service of the BLISS project.
#
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2020 ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import gevent
import gevent.local
import threading
import types
class NamedGreenlet(gevent.Greenlet):
"""Greenlet with a dedicated name (target name by default)
"""
def __init__(self, run=None, name=None, *args, **kwargs):
"""
:param callable run: greenlet target
:param str name: greenlet name
:param args: positional arguments for `run`
:param kwargs: named arguments for `run`
"""
if not callable(run):
raise