Commit 4a9b06a5 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

[writer] global file pool

parent 5dc69c2b
Pipeline #20359 failed with stages
in 34 minutes
......@@ -32,6 +32,7 @@ from silx.io.dictdump import dicttojson, dicttoini
from .io_utils import mkdir
from ..utils import data_merging
from ..utils.process_utils import file_processes
from ..utils.async_utils import SharedLockPool
DEFAULT_PLOT_NAME = "plotselect"
......@@ -859,28 +860,43 @@ def lockedErrorMessage(filename):
return msg
class FilePool(SharedLockPool):
"""
Allows to acquire locks identified by name recursively.
"""
@contextmanager
def _acquire_open(self, filename):
"""Protect file opening/creation
"""
with super().acquire(None):
with super().acquire(filename):
yield
@contextmanager
def acquire(self, filename):
"""Protect blocks of file IO operations. To be used in case other
operations in the block yield to the async event loop)
"""
lockname = os.path.abspath(filename)
with super().acquire(lockname):
yield
FILEPOOL = FilePool()
class File(h5py.File):
def __init__(
self,
name,
mode="r",
enable_file_locking=None,
swmr=None,
creationlocks=None,
**kwargs
):
def __init__(self, name, mode="r", enable_file_locking=None, swmr=None, **kwargs):
"""
:param str name:
:param str mode:
:param bool enable_file_locking: by default it is disabled for `mode=='r'`
and enabled in all other modes
:param bool swmr: when not specified: try both modes when `mode=='r'`
:param SharedLockPool creationlocks:
:param **kwargs: see `h5py.File.__init__`
"""
self._lockname = os.path.abspath(name)
self._creationlocks = creationlocks
with self.acquire_lock():
with self._acquire_creation_lock(name):
# https://support.hdfgroup.org/HDF5/docNewFeatures/SWMR/Design-HDF5-FileLocking.pdf
if not HASSWMR and swmr:
swmr = False
......@@ -917,29 +933,28 @@ class File(h5py.File):
kwargs["libver"] = libver
super().__init__(name, mode=mode, swmr=swmr, **kwargs)
@contextmanager
def _acquire_creation_lock(self, filename):
with FILEPOOL._acquire_open(filename):
yield
@contextmanager
def acquire_lock(self):
if self._creationlocks is None:
with FILEPOOL.acquire(self.filename):
yield
else:
with self._creationlocks.acquire(self._lockname):
yield
class nxRoot(File):
def __init__(self, name, mode="r", creationlocks=None, **kwargs):
def __init__(self, name, mode="r", **kwargs):
"""
:param str name:
:param str mode:
:param SharedLockPool creationlocks:
:param **kwargs: see `h5py.File.__init__`
"""
self._lockname = os.path.abspath(name)
self._creationlocks = creationlocks
with self.acquire_lock():
with self._acquire_creation_lock(name):
if mode != "r":
mkdir(os.path.dirname(name))
super().__init__(name, creationlocks=creationlocks, mode=mode, **kwargs)
super().__init__(name, mode=mode, **kwargs)
nxRootInit(self)
......
......@@ -155,7 +155,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
def __init__(
self,
db_name,
lockpool,
node_type=None,
resource_profiling=False,
parentlogger=None,
......@@ -163,7 +162,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
):
"""
:param str db_name:
:param geventsync.SharedLockPool lockpool:
:param str node_type:
:param Logger parentlogger:
:param bool resource_profiling:
......@@ -197,7 +195,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
self._devices = {} # str -> dict(subscan.name:dict)
self._nxroot = {} # for recursive calling
self._nxentry = None # for recursive calling
self._nxroot_locks = lockpool
self._configurable = False
def _listen_event_loop(self, **kwargs):
......@@ -474,7 +471,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
"mode": "a",
"enable_file_locking": self.saveoptions["enable_file_locking"],
"swmr": self.saveoptions["swmr"],
"creationlocks": self._nxroot_locks,
}
@contextmanager
......
......@@ -116,7 +116,6 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
else:
self._scan_writer_class = scan_writer_base.NexusScanWriterBase
self.writers = {}
self._locks = async_utils.SharedLockPool()
self.minimal_purge_delay = 5
self.purge_delay = purge_delay
self._fds = {}
......@@ -224,7 +223,6 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
"""
writer = self._scan_writer_class(
node.db_name,
self._locks,
node_type=node.node_type,
parentlogger=self.logger,
resource_profiling=self.resource_profiling,
......@@ -297,13 +295,12 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
self.logger.info, self._fds, prefix="{} fds since start"
)
nfds = len(fds)
nlocks = len(self._locks)
ngreenlets = len(process_utils.greenlets())
nthreads = len(process_utils.threads())
mb = int(process_utils.memory() / 1024 ** 2)
self.logger.info(
"{} threads, {} greenlets, {} fds, {}MB MEM, {} locks".format(
nthreads, ngreenlets, nfds, mb, nlocks
"{} threads, {} greenlets, {} fds, {}MB MEM".format(
nthreads, ngreenlets, nfds, mb
)
)
......
......@@ -120,9 +120,9 @@ def greenlet_ident(g=None):
return gevent.get_hub().ident_registry.get_ident(g)
class SharedLockPool(object):
class SharedLockPool:
"""
Allows to acquire locks identified by name recursively.
Allows to acquire locks identified by name (hashable type) recursively.
"""
def __init__(self):
......
......@@ -14,7 +14,6 @@ from louie import dispatcher
from bliss.common import scans
from nexus_writer_service.utils import scan_utils
from nexus_writer_service.io import nexus
from nexus_writer_service.utils import async_utils
def test_nxw_readers(nexus_writer_config):
......@@ -51,13 +50,8 @@ def _test_nxw_readers(
measurement = nexus.nxCollection(nxentry, "measurement")
measurement["a"] = list(range(10))
measurement["b"] = list(range(10))
creationlocks = async_utils.SharedLockPool()
startevent = gevent.event.Event()
readerkwargs = {
"mode": mode,
"enable_file_locking": enable_file_locking,
"creationlocks": creationlocks,
}
readerkwargs = {"mode": mode, "enable_file_locking": enable_file_locking}
readers = [
gevent.spawn(reader, filename, startevent, hold=i == 0, **readerkwargs)
for i in range(4)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment