Skip to content
Snippets Groups Projects

Resolve "Dataset stack writer: flush size issues"

@@ -89,11 +89,12 @@ class DatasetWriter(_DatasetWriterBase):
) -> None:
super().__init__(parent, name, attrs=attrs, flush_period=flush_period)
self._npoints = npoints
self._buffer: List[ArrayLike] = list()
self._npoints_added: int = 0
self._chunked: bool = False
self._nchunk: int = 0
self._nflushed: int = 0
self._npoints_added: int = 0
self._buffer: List[ArrayLike] = list()
self._chunk_size: int = 0
self._flushed_size: int = 0
def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset:
scan_shape = (self._npoints,)
@@ -116,7 +117,7 @@ class DatasetWriter(_DatasetWriterBase):
options["maxshape"] = max_shape
if options["chunks"]:
self._chunked = True
self._nchunk = options["chunks"][0]
self._chunk_size = options["chunks"][0]
dset = self._parent.create_dataset(self._name, **options)
if self._attrs:
dset.attrs.update(self._attrs)
@@ -144,29 +145,33 @@ class DatasetWriter(_DatasetWriterBase):
def flush_buffer(self, align: bool = False) -> bool:
# Determine how many points to flush
nbuffer = len(self._buffer)
chunk_size = len(self._buffer)
if self._flush_time_expired():
nflush = nbuffer
flush_size = chunk_size
elif align and self._chunked:
n = nbuffer + (self._nflushed % self._nchunk)
nflush = n // self._nchunk * self._nchunk
nflush = min(nflush, nbuffer)
n = chunk_size + (self._flushed_size % self._chunk_size)
flush_size = n // self._chunk_size * self._chunk_size
flush_size = min(flush_size, chunk_size)
else:
nflush = nbuffer
if nflush == 0:
flush_size = chunk_size
if flush_size == 0:
return False
# Enlarge the dataset when needed
nalloc = self._dataset.shape[0]
istart = self._nflushed
nflushed = istart + nflush
if self._chunked and nflushed > nalloc:
self._dataset.resize(nflushed, axis=0)
istart = self._flushed_size
flushed_size = istart + flush_size
if self._chunked and flushed_size > nalloc:
self._dataset.resize(flushed_size, axis=0)
# Copy data from buffer to HDF5
self._dataset[istart : istart + flush_size] = self._buffer[:flush_size]
# Move data from memory to HDF5
self._dataset[istart : istart + nflush] = self._buffer[:nflush]
self._buffer = self._buffer[nflush:]
self._nflushed = nflushed
# Remove copied data from buffer
self._buffer = self._buffer[flush_size:]
self._flushed_size = flushed_size
self._flush_hdf5()
self._last_flush = time.time()
@@ -210,11 +215,12 @@ class StackDatasetWriter(_DatasetWriterBase):
) -> None:
super().__init__(parent, name, attrs=attrs, flush_period=flush_period)
self._npoints = npoints
self._chunked: bool = False
self._nstack = nstack
self._buffers: List[List[ArrayLike]] = list()
self._chunked: bool = False
self._nchunk: ArrayLike = numpy.zeros(2, dtype=int)
self._nflushed: ArrayLike = numpy.array(list(), dtype=int)
self._chunk_size: ArrayLike = numpy.zeros(2, dtype=int)
self._flushed_size_dim1: ArrayLike = numpy.array(list(), dtype=int)
def _create_dataset(
self, first_data_point: numpy.ndarray, stack_index: int
@@ -239,7 +245,7 @@ class StackDatasetWriter(_DatasetWriterBase):
options["maxshape"] = max_shape
if options["chunks"]:
self._chunked = True
self._nchunk = numpy.array(options["chunks"][:2], dtype=int)
self._chunk_size = numpy.array(options["chunks"][:2], dtype=int)
dset = self._parent.create_dataset(self._name, **options)
if self._attrs:
dset.attrs.update(self._attrs)
@@ -249,7 +255,7 @@ class StackDatasetWriter(_DatasetWriterBase):
# Add stack buffers when needed
for _ in range(max(stack_index - len(self._buffers) + 1, 0)):
self._buffers.append(list())
self._nflushed = numpy.append(self._nflushed, 0)
self._flushed_size_dim1 = numpy.append(self._flushed_size_dim1, 0)
return self._buffers[stack_index]
def add_point(self, data: ArrayLike, stack_index: int) -> bool:
@@ -270,60 +276,78 @@ class StackDatasetWriter(_DatasetWriterBase):
def flush_buffer(self, align: bool = False) -> bool:
# Determine how many points to flush for each buffer in the stack
nbuffer = numpy.array([len(buffer) for buffer in self._buffers])
nchunk_dim0, nchunk_dim1 = self._nchunk[:2]
chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers])
flushed_size_dim1 = self._flushed_size_dim1
size_dim0 = len(chunk_sizes)
assert size_dim0 == len(
flushed_size_dim1
), "Number of buffers and number of flushed dim1 points must be the same"
chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2]
if chunk_size_dim0 == 0:
chunk_size_dim0 = size_dim0
if self._flush_time_expired():
nflush = nbuffer
flush_sizes = chunk_sizes
elif align and self._chunked:
n = nbuffer + (self._nflushed % nchunk_dim1)
nflush = n // nchunk_dim1 * nchunk_dim1
nflush = numpy.minimum(nflush, nbuffer)
for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0):
nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] = min(
nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0]
)
size_dim0 = size_dim0 // chunk_size_dim0 * chunk_size_dim0
chunk_sizes = chunk_sizes[:size_dim0]
flushed_size_dim1 = flushed_size_dim1[:size_dim0]
if size_dim0:
n1 = chunk_sizes + (flushed_size_dim1 % chunk_size_dim1)
flush_sizes = n1 // chunk_size_dim1 * chunk_size_dim1
flush_sizes = numpy.minimum(flush_sizes, chunk_sizes)
for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min(
flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0]
)
else:
flush_sizes = list()
else:
nflush = nbuffer
if not any(nflush):
flush_sizes = chunk_sizes
if not any(flush_sizes):
return False
# Enlarge the dataset when needed
nalloc = self._dataset.shape[:2]
istart = self._nflushed
nflushed = istart + nflush
nalloc_new = numpy.array([len(nbuffer), max(nflushed)])
istart_dim1 = flushed_size_dim1
flushed_size_dim1 = istart_dim1 + flush_sizes
nalloc_new = numpy.array([size_dim0, max(flushed_size_dim1)])
if self._chunked and any(nalloc_new > nalloc):
for axis, n in enumerate(nalloc_new):
self._dataset.resize(n, axis=axis)
# Move data from memory to HDF5
if nchunk_dim0 == 0:
nchunk_dim0 = len(nbuffer)
for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0):
idx_dim0 = slice(i0_chunk0, i0_chunk0 + nchunk_dim0)
nflush_dim1 = nflush[idx_dim0]
istart0_dim1 = istart[idx_dim0]
# Copy data from buffer to HDF5
for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0)
buffers = self._buffers[idx_dim0]
if all(nflush_dim1 == nflush_dim1[0]) and all(
istart0_dim1 == istart0_dim1[0]
):
data = [buffer[: nflush_dim1[0]] for buffer in buffers]
idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + nflush_dim1[0])
flush_sizes_dim1 = flush_sizes[idx_dim0]
non_ragged_buffers = len(set(flush_sizes_dim1)) == 1
istart0_dim1 = istart_dim1[idx_dim0]
non_ragged_destination = len(set(istart0_dim1)) == 1
if non_ragged_destination and non_ragged_buffers:
data = [buffer[: flush_sizes_dim1[0]] for buffer in buffers]
idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + flush_sizes_dim1[0])
self._dataset[idx_dim0, idx_dim1] = data
else:
for buffer, i_dim0, istart_dim1, n_dim1 in zip(
for buffer, i_dim0, istart_dim1, i_flush_size_dim1 in zip(
buffers,
range(i0_chunk0, i0_chunk0 + nchunk_dim0),
range(i0_chunk0, i0_chunk0 + chunk_size_dim0),
istart0_dim1,
nflush_dim1,
flush_sizes_dim1,
):
self._dataset[i_dim0, istart_dim1 : istart_dim1 + n_dim1, ...] = (
buffer[:n_dim1]
)
self._buffers = [
buffer[n_dim1:] for buffer, n_dim1 in zip(self._buffers, nflush)
]
self._nflushed = nflushed
self._dataset[
i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ...
] = buffer[:i_flush_size_dim1]
# Remove copied data from buffer
for i0 in range(size_dim0):
self._buffers[i0] = self._buffers[i0][flush_sizes[i0] :]
self._flushed_size_dim1[i0] = flushed_size_dim1[i0]
self._flush_hdf5()
self._last_flush = time.time()
Loading