From b64fd92c14671a121541efc5508507156d946b74 Mon Sep 17 00:00:00 2001 From: woutdenolf <woutdenolf@users.sf.net> Date: Thu, 23 Jan 2025 20:13:56 +0100 Subject: [PATCH 1/3] better variable names --- src/ewoksdata/data/hdf5/dataset_writer.py | 98 ++++++++++++----------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py index 95694ce..a4c601f 100644 --- a/src/ewoksdata/data/hdf5/dataset_writer.py +++ b/src/ewoksdata/data/hdf5/dataset_writer.py @@ -92,7 +92,7 @@ class DatasetWriter(_DatasetWriterBase): self._buffer: List[ArrayLike] = list() self._npoints_added: int = 0 self._chunked: bool = False - self._nchunk: int = 0 + self._chunk_size: int = 0 self._nflushed: int = 0 def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset: @@ -116,7 +116,7 @@ class DatasetWriter(_DatasetWriterBase): options["maxshape"] = max_shape if options["chunks"]: self._chunked = True - self._nchunk = options["chunks"][0] + self._chunk_size = options["chunks"][0] dset = self._parent.create_dataset(self._name, **options) if self._attrs: dset.attrs.update(self._attrs) @@ -144,28 +144,28 @@ class DatasetWriter(_DatasetWriterBase): def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush - nbuffer = len(self._buffer) + chunk_size = len(self._buffer) if self._flush_time_expired(): - nflush = nbuffer + flush_size = chunk_size elif align and self._chunked: - n = nbuffer + (self._nflushed % self._nchunk) - nflush = n // self._nchunk * self._nchunk - nflush = min(nflush, nbuffer) + n = chunk_size + (self._nflushed % self._chunk_size) + flush_size = n // self._chunk_size * self._chunk_size + flush_size = min(flush_size, chunk_size) else: - nflush = nbuffer - if nflush == 0: + flush_size = chunk_size + if flush_size == 0: return False # Enlarge the dataset when needed nalloc = self._dataset.shape[0] istart = self._nflushed - nflushed = istart + nflush + nflushed = istart + flush_size if self._chunked and nflushed > nalloc: self._dataset.resize(nflushed, axis=0) # Move data from memory to HDF5 - self._dataset[istart : istart + nflush] = self._buffer[:nflush] - self._buffer = self._buffer[nflush:] + self._dataset[istart : istart + flush_size] = self._buffer[:flush_size] + self._buffer = self._buffer[flush_size:] self._nflushed = nflushed self._flush_hdf5() @@ -213,8 +213,8 @@ class StackDatasetWriter(_DatasetWriterBase): self._nstack = nstack self._buffers: List[List[ArrayLike]] = list() self._chunked: bool = False - self._nchunk: ArrayLike = numpy.zeros(2, dtype=int) - self._nflushed: ArrayLike = numpy.array(list(), dtype=int) + self._chunk_size: ArrayLike = numpy.zeros(2, dtype=int) + self._nflushed_dim1: ArrayLike = numpy.array(list(), dtype=int) def _create_dataset( self, first_data_point: numpy.ndarray, stack_index: int @@ -239,7 +239,7 @@ class StackDatasetWriter(_DatasetWriterBase): options["maxshape"] = max_shape if options["chunks"]: self._chunked = True - self._nchunk = numpy.array(options["chunks"][:2], dtype=int) + self._chunk_size = numpy.array(options["chunks"][:2], dtype=int) dset = self._parent.create_dataset(self._name, **options) if self._attrs: dset.attrs.update(self._attrs) @@ -249,7 +249,7 @@ class StackDatasetWriter(_DatasetWriterBase): # Add stack buffers when needed for _ in range(max(stack_index - len(self._buffers) + 1, 0)): self._buffers.append(list()) - self._nflushed = numpy.append(self._nflushed, 0) + self._nflushed_dim1 = numpy.append(self._nflushed_dim1, 0) return self._buffers[stack_index] def add_point(self, data: ArrayLike, stack_index: int) -> bool: @@ -270,60 +270,62 @@ class StackDatasetWriter(_DatasetWriterBase): def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush for each buffer in the stack - nbuffer = numpy.array([len(buffer) for buffer in self._buffers]) - nchunk_dim0, nchunk_dim1 = self._nchunk[:2] + chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers]) + chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2] if self._flush_time_expired(): - nflush = nbuffer + flush_sizes = chunk_sizes elif align and self._chunked: - n = nbuffer + (self._nflushed % nchunk_dim1) - nflush = n // nchunk_dim1 * nchunk_dim1 - nflush = numpy.minimum(nflush, nbuffer) - for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0): - nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] = min( - nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] + n = chunk_sizes + (self._nflushed_dim1 % chunk_size_dim1) + flush_sizes = n // chunk_size_dim1 * chunk_size_dim1 + flush_sizes = numpy.minimum(flush_sizes, chunk_sizes) + for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0): + flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min( + flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] ) else: - nflush = nbuffer - if not any(nflush): + flush_sizes = chunk_sizes + if not any(flush_sizes): return False # Enlarge the dataset when needed nalloc = self._dataset.shape[:2] - istart = self._nflushed - nflushed = istart + nflush - nalloc_new = numpy.array([len(nbuffer), max(nflushed)]) + istart_dim1 = self._nflushed_dim1 + nflushed_dim1 = istart_dim1 + flush_sizes + nalloc_new = numpy.array([len(chunk_sizes), max(nflushed_dim1)]) if self._chunked and any(nalloc_new > nalloc): for axis, n in enumerate(nalloc_new): self._dataset.resize(n, axis=axis) # Move data from memory to HDF5 - if nchunk_dim0 == 0: - nchunk_dim0 = len(nbuffer) - for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0): - idx_dim0 = slice(i0_chunk0, i0_chunk0 + nchunk_dim0) - nflush_dim1 = nflush[idx_dim0] - istart0_dim1 = istart[idx_dim0] + if chunk_size_dim0 == 0: + chunk_size_dim0 = len(chunk_sizes) + for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0): + idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0) + flush_sizes_dim1 = flush_sizes[idx_dim0] + istart0_dim1 = istart_dim1[idx_dim0] buffers = self._buffers[idx_dim0] - if all(nflush_dim1 == nflush_dim1[0]) and all( - istart0_dim1 == istart0_dim1[0] + if all(istart0_dim1 == istart0_dim1[0]) and all( + flush_sizes_dim1 == flush_sizes_dim1[0] ): - data = [buffer[: nflush_dim1[0]] for buffer in buffers] - idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + nflush_dim1[0]) + data = [buffer[: flush_sizes_dim1[0]] for buffer in buffers] + idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + flush_sizes_dim1[0]) self._dataset[idx_dim0, idx_dim1] = data else: - for buffer, i_dim0, istart_dim1, n_dim1 in zip( + for buffer, i_dim0, istart_dim1, i_flush_size_dim1 in zip( buffers, - range(i0_chunk0, i0_chunk0 + nchunk_dim0), + range(i0_chunk0, i0_chunk0 + chunk_size_dim0), istart0_dim1, - nflush_dim1, + flush_sizes_dim1, ): - self._dataset[i_dim0, istart_dim1 : istart_dim1 + n_dim1, ...] = ( - buffer[:n_dim1] - ) + self._dataset[ + i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ... + ] = buffer[:i_flush_size_dim1] + self._buffers = [ - buffer[n_dim1:] for buffer, n_dim1 in zip(self._buffers, nflush) + buffer[i_flush_size_dim1:] + for buffer, i_flush_size_dim1 in zip(self._buffers, flush_sizes) ] - self._nflushed = nflushed + self._nflushed_dim1 = nflushed_dim1 self._flush_hdf5() self._last_flush = time.time() -- GitLab From 4ad43859de619822be750a5b3181b36da4b2e98e Mon Sep 17 00:00:00 2001 From: woutdenolf <woutdenolf@users.sf.net> Date: Thu, 23 Jan 2025 21:19:35 +0100 Subject: [PATCH 2/3] StackDatasetWriter dim0 chunking bug --- src/ewoksdata/data/hdf5/dataset_writer.py | 57 ++++++++++++++--------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py index a4c601f..bd50b1a 100644 --- a/src/ewoksdata/data/hdf5/dataset_writer.py +++ b/src/ewoksdata/data/hdf5/dataset_writer.py @@ -271,42 +271,59 @@ class StackDatasetWriter(_DatasetWriterBase): def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush for each buffer in the stack chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers]) + nflushed_dim1 = self._nflushed_dim1 + size_dim0 = len(chunk_sizes) + assert size_dim0 == len( + nflushed_dim1 + ), "Number of buffers and number of flushed dim1 points must be the same" + chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2] + if chunk_size_dim0 == 0: + chunk_size_dim0 = size_dim0 + if self._flush_time_expired(): flush_sizes = chunk_sizes elif align and self._chunked: - n = chunk_sizes + (self._nflushed_dim1 % chunk_size_dim1) - flush_sizes = n // chunk_size_dim1 * chunk_size_dim1 - flush_sizes = numpy.minimum(flush_sizes, chunk_sizes) - for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0): - flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min( - flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] - ) + size_dim0 = size_dim0 // chunk_size_dim0 * chunk_size_dim0 + chunk_sizes = chunk_sizes[:size_dim0] + nflushed_dim1 = nflushed_dim1[:size_dim0] + if size_dim0: + n1 = chunk_sizes + (nflushed_dim1 % chunk_size_dim1) + flush_sizes = n1 // chunk_size_dim1 * chunk_size_dim1 + flush_sizes = numpy.minimum(flush_sizes, chunk_sizes) + for i0_chunk0 in range(0, size_dim0, chunk_size_dim0): + flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min( + flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] + ) + else: + flush_sizes = list() else: flush_sizes = chunk_sizes + if not any(flush_sizes): return False # Enlarge the dataset when needed nalloc = self._dataset.shape[:2] - istart_dim1 = self._nflushed_dim1 + istart_dim1 = nflushed_dim1 nflushed_dim1 = istart_dim1 + flush_sizes - nalloc_new = numpy.array([len(chunk_sizes), max(nflushed_dim1)]) + nalloc_new = numpy.array([size_dim0, max(nflushed_dim1)]) if self._chunked and any(nalloc_new > nalloc): for axis, n in enumerate(nalloc_new): self._dataset.resize(n, axis=axis) # Move data from memory to HDF5 - if chunk_size_dim0 == 0: - chunk_size_dim0 = len(chunk_sizes) - for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0): + for i0_chunk0 in range(0, size_dim0, chunk_size_dim0): idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0) + buffers = self._buffers[idx_dim0] + flush_sizes_dim1 = flush_sizes[idx_dim0] + non_ragged_buffers = len(set(flush_sizes_dim1)) == 1 + istart0_dim1 = istart_dim1[idx_dim0] - buffers = self._buffers[idx_dim0] - if all(istart0_dim1 == istart0_dim1[0]) and all( - flush_sizes_dim1 == flush_sizes_dim1[0] - ): + non_ragged_destination = len(set(istart0_dim1)) == 1 + + if non_ragged_destination and non_ragged_buffers: data = [buffer[: flush_sizes_dim1[0]] for buffer in buffers] idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + flush_sizes_dim1[0]) self._dataset[idx_dim0, idx_dim1] = data @@ -321,11 +338,9 @@ class StackDatasetWriter(_DatasetWriterBase): i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ... ] = buffer[:i_flush_size_dim1] - self._buffers = [ - buffer[i_flush_size_dim1:] - for buffer, i_flush_size_dim1 in zip(self._buffers, flush_sizes) - ] - self._nflushed_dim1 = nflushed_dim1 + for i0 in range(size_dim0): + self._buffers[i0] = self._buffers[i0][flush_sizes[i0] :] + self._nflushed_dim1[i0] = nflushed_dim1[i0] self._flush_hdf5() self._last_flush = time.time() -- GitLab From 41961eb5ac94fa4efd86326e088260605a8e2b59 Mon Sep 17 00:00:00 2001 From: woutdenolf <woutdenolf@users.sf.net> Date: Thu, 23 Jan 2025 21:40:23 +0100 Subject: [PATCH 3/3] more renaming for clarification --- src/ewoksdata/data/hdf5/dataset_writer.py | 51 +++++++++++++---------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py index bd50b1a..eea663f 100644 --- a/src/ewoksdata/data/hdf5/dataset_writer.py +++ b/src/ewoksdata/data/hdf5/dataset_writer.py @@ -89,11 +89,12 @@ class DatasetWriter(_DatasetWriterBase): ) -> None: super().__init__(parent, name, attrs=attrs, flush_period=flush_period) self._npoints = npoints - self._buffer: List[ArrayLike] = list() - self._npoints_added: int = 0 self._chunked: bool = False + self._npoints_added: int = 0 + + self._buffer: List[ArrayLike] = list() self._chunk_size: int = 0 - self._nflushed: int = 0 + self._flushed_size: int = 0 def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset: scan_shape = (self._npoints,) @@ -145,28 +146,32 @@ class DatasetWriter(_DatasetWriterBase): def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush chunk_size = len(self._buffer) + if self._flush_time_expired(): flush_size = chunk_size elif align and self._chunked: - n = chunk_size + (self._nflushed % self._chunk_size) + n = chunk_size + (self._flushed_size % self._chunk_size) flush_size = n // self._chunk_size * self._chunk_size flush_size = min(flush_size, chunk_size) else: flush_size = chunk_size + if flush_size == 0: return False # Enlarge the dataset when needed nalloc = self._dataset.shape[0] - istart = self._nflushed - nflushed = istart + flush_size - if self._chunked and nflushed > nalloc: - self._dataset.resize(nflushed, axis=0) + istart = self._flushed_size + flushed_size = istart + flush_size + if self._chunked and flushed_size > nalloc: + self._dataset.resize(flushed_size, axis=0) - # Move data from memory to HDF5 + # Copy data from buffer to HDF5 self._dataset[istart : istart + flush_size] = self._buffer[:flush_size] + + # Remove copied data from buffer self._buffer = self._buffer[flush_size:] - self._nflushed = nflushed + self._flushed_size = flushed_size self._flush_hdf5() self._last_flush = time.time() @@ -210,11 +215,12 @@ class StackDatasetWriter(_DatasetWriterBase): ) -> None: super().__init__(parent, name, attrs=attrs, flush_period=flush_period) self._npoints = npoints + self._chunked: bool = False self._nstack = nstack + self._buffers: List[List[ArrayLike]] = list() - self._chunked: bool = False self._chunk_size: ArrayLike = numpy.zeros(2, dtype=int) - self._nflushed_dim1: ArrayLike = numpy.array(list(), dtype=int) + self._flushed_size_dim1: ArrayLike = numpy.array(list(), dtype=int) def _create_dataset( self, first_data_point: numpy.ndarray, stack_index: int @@ -249,7 +255,7 @@ class StackDatasetWriter(_DatasetWriterBase): # Add stack buffers when needed for _ in range(max(stack_index - len(self._buffers) + 1, 0)): self._buffers.append(list()) - self._nflushed_dim1 = numpy.append(self._nflushed_dim1, 0) + self._flushed_size_dim1 = numpy.append(self._flushed_size_dim1, 0) return self._buffers[stack_index] def add_point(self, data: ArrayLike, stack_index: int) -> bool: @@ -271,10 +277,10 @@ class StackDatasetWriter(_DatasetWriterBase): def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush for each buffer in the stack chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers]) - nflushed_dim1 = self._nflushed_dim1 + flushed_size_dim1 = self._flushed_size_dim1 size_dim0 = len(chunk_sizes) assert size_dim0 == len( - nflushed_dim1 + flushed_size_dim1 ), "Number of buffers and number of flushed dim1 points must be the same" chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2] @@ -286,9 +292,9 @@ class StackDatasetWriter(_DatasetWriterBase): elif align and self._chunked: size_dim0 = size_dim0 // chunk_size_dim0 * chunk_size_dim0 chunk_sizes = chunk_sizes[:size_dim0] - nflushed_dim1 = nflushed_dim1[:size_dim0] + flushed_size_dim1 = flushed_size_dim1[:size_dim0] if size_dim0: - n1 = chunk_sizes + (nflushed_dim1 % chunk_size_dim1) + n1 = chunk_sizes + (flushed_size_dim1 % chunk_size_dim1) flush_sizes = n1 // chunk_size_dim1 * chunk_size_dim1 flush_sizes = numpy.minimum(flush_sizes, chunk_sizes) for i0_chunk0 in range(0, size_dim0, chunk_size_dim0): @@ -305,14 +311,14 @@ class StackDatasetWriter(_DatasetWriterBase): # Enlarge the dataset when needed nalloc = self._dataset.shape[:2] - istart_dim1 = nflushed_dim1 - nflushed_dim1 = istart_dim1 + flush_sizes - nalloc_new = numpy.array([size_dim0, max(nflushed_dim1)]) + istart_dim1 = flushed_size_dim1 + flushed_size_dim1 = istart_dim1 + flush_sizes + nalloc_new = numpy.array([size_dim0, max(flushed_size_dim1)]) if self._chunked and any(nalloc_new > nalloc): for axis, n in enumerate(nalloc_new): self._dataset.resize(n, axis=axis) - # Move data from memory to HDF5 + # Copy data from buffer to HDF5 for i0_chunk0 in range(0, size_dim0, chunk_size_dim0): idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0) buffers = self._buffers[idx_dim0] @@ -338,9 +344,10 @@ class StackDatasetWriter(_DatasetWriterBase): i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ... ] = buffer[:i_flush_size_dim1] + # Remove copied data from buffer for i0 in range(size_dim0): self._buffers[i0] = self._buffers[i0][flush_sizes[i0] :] - self._nflushed_dim1[i0] = nflushed_dim1[i0] + self._flushed_size_dim1[i0] = flushed_size_dim1[i0] self._flush_hdf5() self._last_flush = time.time() -- GitLab