From b64fd92c14671a121541efc5508507156d946b74 Mon Sep 17 00:00:00 2001
From: woutdenolf <woutdenolf@users.sf.net>
Date: Thu, 23 Jan 2025 20:13:56 +0100
Subject: [PATCH 1/3] better variable names

---
 src/ewoksdata/data/hdf5/dataset_writer.py | 98 ++++++++++++-----------
 1 file changed, 50 insertions(+), 48 deletions(-)

diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py
index 95694ce..a4c601f 100644
--- a/src/ewoksdata/data/hdf5/dataset_writer.py
+++ b/src/ewoksdata/data/hdf5/dataset_writer.py
@@ -92,7 +92,7 @@ class DatasetWriter(_DatasetWriterBase):
         self._buffer: List[ArrayLike] = list()
         self._npoints_added: int = 0
         self._chunked: bool = False
-        self._nchunk: int = 0
+        self._chunk_size: int = 0
         self._nflushed: int = 0
 
     def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset:
@@ -116,7 +116,7 @@ class DatasetWriter(_DatasetWriterBase):
             options["maxshape"] = max_shape
         if options["chunks"]:
             self._chunked = True
-            self._nchunk = options["chunks"][0]
+            self._chunk_size = options["chunks"][0]
         dset = self._parent.create_dataset(self._name, **options)
         if self._attrs:
             dset.attrs.update(self._attrs)
@@ -144,28 +144,28 @@ class DatasetWriter(_DatasetWriterBase):
 
     def flush_buffer(self, align: bool = False) -> bool:
         # Determine how many points to flush
-        nbuffer = len(self._buffer)
+        chunk_size = len(self._buffer)
         if self._flush_time_expired():
-            nflush = nbuffer
+            flush_size = chunk_size
         elif align and self._chunked:
-            n = nbuffer + (self._nflushed % self._nchunk)
-            nflush = n // self._nchunk * self._nchunk
-            nflush = min(nflush, nbuffer)
+            n = chunk_size + (self._nflushed % self._chunk_size)
+            flush_size = n // self._chunk_size * self._chunk_size
+            flush_size = min(flush_size, chunk_size)
         else:
-            nflush = nbuffer
-        if nflush == 0:
+            flush_size = chunk_size
+        if flush_size == 0:
             return False
 
         # Enlarge the dataset when needed
         nalloc = self._dataset.shape[0]
         istart = self._nflushed
-        nflushed = istart + nflush
+        nflushed = istart + flush_size
         if self._chunked and nflushed > nalloc:
             self._dataset.resize(nflushed, axis=0)
 
         # Move data from memory to HDF5
-        self._dataset[istart : istart + nflush] = self._buffer[:nflush]
-        self._buffer = self._buffer[nflush:]
+        self._dataset[istart : istart + flush_size] = self._buffer[:flush_size]
+        self._buffer = self._buffer[flush_size:]
         self._nflushed = nflushed
 
         self._flush_hdf5()
@@ -213,8 +213,8 @@ class StackDatasetWriter(_DatasetWriterBase):
         self._nstack = nstack
         self._buffers: List[List[ArrayLike]] = list()
         self._chunked: bool = False
-        self._nchunk: ArrayLike = numpy.zeros(2, dtype=int)
-        self._nflushed: ArrayLike = numpy.array(list(), dtype=int)
+        self._chunk_size: ArrayLike = numpy.zeros(2, dtype=int)
+        self._nflushed_dim1: ArrayLike = numpy.array(list(), dtype=int)
 
     def _create_dataset(
         self, first_data_point: numpy.ndarray, stack_index: int
@@ -239,7 +239,7 @@ class StackDatasetWriter(_DatasetWriterBase):
             options["maxshape"] = max_shape
         if options["chunks"]:
             self._chunked = True
-            self._nchunk = numpy.array(options["chunks"][:2], dtype=int)
+            self._chunk_size = numpy.array(options["chunks"][:2], dtype=int)
         dset = self._parent.create_dataset(self._name, **options)
         if self._attrs:
             dset.attrs.update(self._attrs)
@@ -249,7 +249,7 @@ class StackDatasetWriter(_DatasetWriterBase):
         # Add stack buffers when needed
         for _ in range(max(stack_index - len(self._buffers) + 1, 0)):
             self._buffers.append(list())
-            self._nflushed = numpy.append(self._nflushed, 0)
+            self._nflushed_dim1 = numpy.append(self._nflushed_dim1, 0)
         return self._buffers[stack_index]
 
     def add_point(self, data: ArrayLike, stack_index: int) -> bool:
@@ -270,60 +270,62 @@ class StackDatasetWriter(_DatasetWriterBase):
 
     def flush_buffer(self, align: bool = False) -> bool:
         # Determine how many points to flush for each buffer in the stack
-        nbuffer = numpy.array([len(buffer) for buffer in self._buffers])
-        nchunk_dim0, nchunk_dim1 = self._nchunk[:2]
+        chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers])
+        chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2]
         if self._flush_time_expired():
-            nflush = nbuffer
+            flush_sizes = chunk_sizes
         elif align and self._chunked:
-            n = nbuffer + (self._nflushed % nchunk_dim1)
-            nflush = n // nchunk_dim1 * nchunk_dim1
-            nflush = numpy.minimum(nflush, nbuffer)
-            for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0):
-                nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] = min(
-                    nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0]
+            n = chunk_sizes + (self._nflushed_dim1 % chunk_size_dim1)
+            flush_sizes = n // chunk_size_dim1 * chunk_size_dim1
+            flush_sizes = numpy.minimum(flush_sizes, chunk_sizes)
+            for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0):
+                flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min(
+                    flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0]
                 )
         else:
-            nflush = nbuffer
-        if not any(nflush):
+            flush_sizes = chunk_sizes
+        if not any(flush_sizes):
             return False
 
         # Enlarge the dataset when needed
         nalloc = self._dataset.shape[:2]
-        istart = self._nflushed
-        nflushed = istart + nflush
-        nalloc_new = numpy.array([len(nbuffer), max(nflushed)])
+        istart_dim1 = self._nflushed_dim1
+        nflushed_dim1 = istart_dim1 + flush_sizes
+        nalloc_new = numpy.array([len(chunk_sizes), max(nflushed_dim1)])
         if self._chunked and any(nalloc_new > nalloc):
             for axis, n in enumerate(nalloc_new):
                 self._dataset.resize(n, axis=axis)
 
         # Move data from memory to HDF5
-        if nchunk_dim0 == 0:
-            nchunk_dim0 = len(nbuffer)
-        for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0):
-            idx_dim0 = slice(i0_chunk0, i0_chunk0 + nchunk_dim0)
-            nflush_dim1 = nflush[idx_dim0]
-            istart0_dim1 = istart[idx_dim0]
+        if chunk_size_dim0 == 0:
+            chunk_size_dim0 = len(chunk_sizes)
+        for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0):
+            idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0)
+            flush_sizes_dim1 = flush_sizes[idx_dim0]
+            istart0_dim1 = istart_dim1[idx_dim0]
             buffers = self._buffers[idx_dim0]
-            if all(nflush_dim1 == nflush_dim1[0]) and all(
-                istart0_dim1 == istart0_dim1[0]
+            if all(istart0_dim1 == istart0_dim1[0]) and all(
+                flush_sizes_dim1 == flush_sizes_dim1[0]
             ):
-                data = [buffer[: nflush_dim1[0]] for buffer in buffers]
-                idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + nflush_dim1[0])
+                data = [buffer[: flush_sizes_dim1[0]] for buffer in buffers]
+                idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + flush_sizes_dim1[0])
                 self._dataset[idx_dim0, idx_dim1] = data
             else:
-                for buffer, i_dim0, istart_dim1, n_dim1 in zip(
+                for buffer, i_dim0, istart_dim1, i_flush_size_dim1 in zip(
                     buffers,
-                    range(i0_chunk0, i0_chunk0 + nchunk_dim0),
+                    range(i0_chunk0, i0_chunk0 + chunk_size_dim0),
                     istart0_dim1,
-                    nflush_dim1,
+                    flush_sizes_dim1,
                 ):
-                    self._dataset[i_dim0, istart_dim1 : istart_dim1 + n_dim1, ...] = (
-                        buffer[:n_dim1]
-                    )
+                    self._dataset[
+                        i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ...
+                    ] = buffer[:i_flush_size_dim1]
+
         self._buffers = [
-            buffer[n_dim1:] for buffer, n_dim1 in zip(self._buffers, nflush)
+            buffer[i_flush_size_dim1:]
+            for buffer, i_flush_size_dim1 in zip(self._buffers, flush_sizes)
         ]
-        self._nflushed = nflushed
+        self._nflushed_dim1 = nflushed_dim1
 
         self._flush_hdf5()
         self._last_flush = time.time()
-- 
GitLab


From 4ad43859de619822be750a5b3181b36da4b2e98e Mon Sep 17 00:00:00 2001
From: woutdenolf <woutdenolf@users.sf.net>
Date: Thu, 23 Jan 2025 21:19:35 +0100
Subject: [PATCH 2/3] StackDatasetWriter dim0 chunking bug

---
 src/ewoksdata/data/hdf5/dataset_writer.py | 57 ++++++++++++++---------
 1 file changed, 36 insertions(+), 21 deletions(-)

diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py
index a4c601f..bd50b1a 100644
--- a/src/ewoksdata/data/hdf5/dataset_writer.py
+++ b/src/ewoksdata/data/hdf5/dataset_writer.py
@@ -271,42 +271,59 @@ class StackDatasetWriter(_DatasetWriterBase):
     def flush_buffer(self, align: bool = False) -> bool:
         # Determine how many points to flush for each buffer in the stack
         chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers])
+        nflushed_dim1 = self._nflushed_dim1
+        size_dim0 = len(chunk_sizes)
+        assert size_dim0 == len(
+            nflushed_dim1
+        ), "Number of buffers and number of flushed dim1 points must be the same"
+
         chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2]
+        if chunk_size_dim0 == 0:
+            chunk_size_dim0 = size_dim0
+
         if self._flush_time_expired():
             flush_sizes = chunk_sizes
         elif align and self._chunked:
-            n = chunk_sizes + (self._nflushed_dim1 % chunk_size_dim1)
-            flush_sizes = n // chunk_size_dim1 * chunk_size_dim1
-            flush_sizes = numpy.minimum(flush_sizes, chunk_sizes)
-            for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0):
-                flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min(
-                    flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0]
-                )
+            size_dim0 = size_dim0 // chunk_size_dim0 * chunk_size_dim0
+            chunk_sizes = chunk_sizes[:size_dim0]
+            nflushed_dim1 = nflushed_dim1[:size_dim0]
+            if size_dim0:
+                n1 = chunk_sizes + (nflushed_dim1 % chunk_size_dim1)
+                flush_sizes = n1 // chunk_size_dim1 * chunk_size_dim1
+                flush_sizes = numpy.minimum(flush_sizes, chunk_sizes)
+                for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
+                    flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0] = min(
+                        flush_sizes[i0_chunk0 : i0_chunk0 + chunk_size_dim0]
+                    )
+            else:
+                flush_sizes = list()
         else:
             flush_sizes = chunk_sizes
+
         if not any(flush_sizes):
             return False
 
         # Enlarge the dataset when needed
         nalloc = self._dataset.shape[:2]
-        istart_dim1 = self._nflushed_dim1
+        istart_dim1 = nflushed_dim1
         nflushed_dim1 = istart_dim1 + flush_sizes
-        nalloc_new = numpy.array([len(chunk_sizes), max(nflushed_dim1)])
+        nalloc_new = numpy.array([size_dim0, max(nflushed_dim1)])
         if self._chunked and any(nalloc_new > nalloc):
             for axis, n in enumerate(nalloc_new):
                 self._dataset.resize(n, axis=axis)
 
         # Move data from memory to HDF5
-        if chunk_size_dim0 == 0:
-            chunk_size_dim0 = len(chunk_sizes)
-        for i0_chunk0 in range(0, len(chunk_sizes), chunk_size_dim0):
+        for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
             idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0)
+            buffers = self._buffers[idx_dim0]
+
             flush_sizes_dim1 = flush_sizes[idx_dim0]
+            non_ragged_buffers = len(set(flush_sizes_dim1)) == 1
+
             istart0_dim1 = istart_dim1[idx_dim0]
-            buffers = self._buffers[idx_dim0]
-            if all(istart0_dim1 == istart0_dim1[0]) and all(
-                flush_sizes_dim1 == flush_sizes_dim1[0]
-            ):
+            non_ragged_destination = len(set(istart0_dim1)) == 1
+
+            if non_ragged_destination and non_ragged_buffers:
                 data = [buffer[: flush_sizes_dim1[0]] for buffer in buffers]
                 idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + flush_sizes_dim1[0])
                 self._dataset[idx_dim0, idx_dim1] = data
@@ -321,11 +338,9 @@ class StackDatasetWriter(_DatasetWriterBase):
                         i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ...
                     ] = buffer[:i_flush_size_dim1]
 
-        self._buffers = [
-            buffer[i_flush_size_dim1:]
-            for buffer, i_flush_size_dim1 in zip(self._buffers, flush_sizes)
-        ]
-        self._nflushed_dim1 = nflushed_dim1
+        for i0 in range(size_dim0):
+            self._buffers[i0] = self._buffers[i0][flush_sizes[i0] :]
+            self._nflushed_dim1[i0] = nflushed_dim1[i0]
 
         self._flush_hdf5()
         self._last_flush = time.time()
-- 
GitLab


From 41961eb5ac94fa4efd86326e088260605a8e2b59 Mon Sep 17 00:00:00 2001
From: woutdenolf <woutdenolf@users.sf.net>
Date: Thu, 23 Jan 2025 21:40:23 +0100
Subject: [PATCH 3/3] more renaming for clarification

---
 src/ewoksdata/data/hdf5/dataset_writer.py | 51 +++++++++++++----------
 1 file changed, 29 insertions(+), 22 deletions(-)

diff --git a/src/ewoksdata/data/hdf5/dataset_writer.py b/src/ewoksdata/data/hdf5/dataset_writer.py
index bd50b1a..eea663f 100644
--- a/src/ewoksdata/data/hdf5/dataset_writer.py
+++ b/src/ewoksdata/data/hdf5/dataset_writer.py
@@ -89,11 +89,12 @@ class DatasetWriter(_DatasetWriterBase):
     ) -> None:
         super().__init__(parent, name, attrs=attrs, flush_period=flush_period)
         self._npoints = npoints
-        self._buffer: List[ArrayLike] = list()
-        self._npoints_added: int = 0
         self._chunked: bool = False
+        self._npoints_added: int = 0
+
+        self._buffer: List[ArrayLike] = list()
         self._chunk_size: int = 0
-        self._nflushed: int = 0
+        self._flushed_size: int = 0
 
     def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset:
         scan_shape = (self._npoints,)
@@ -145,28 +146,32 @@ class DatasetWriter(_DatasetWriterBase):
     def flush_buffer(self, align: bool = False) -> bool:
         # Determine how many points to flush
         chunk_size = len(self._buffer)
+
         if self._flush_time_expired():
             flush_size = chunk_size
         elif align and self._chunked:
-            n = chunk_size + (self._nflushed % self._chunk_size)
+            n = chunk_size + (self._flushed_size % self._chunk_size)
             flush_size = n // self._chunk_size * self._chunk_size
             flush_size = min(flush_size, chunk_size)
         else:
             flush_size = chunk_size
+
         if flush_size == 0:
             return False
 
         # Enlarge the dataset when needed
         nalloc = self._dataset.shape[0]
-        istart = self._nflushed
-        nflushed = istart + flush_size
-        if self._chunked and nflushed > nalloc:
-            self._dataset.resize(nflushed, axis=0)
+        istart = self._flushed_size
+        flushed_size = istart + flush_size
+        if self._chunked and flushed_size > nalloc:
+            self._dataset.resize(flushed_size, axis=0)
 
-        # Move data from memory to HDF5
+        # Copy data from buffer to HDF5
         self._dataset[istart : istart + flush_size] = self._buffer[:flush_size]
+
+        # Remove copied data from buffer
         self._buffer = self._buffer[flush_size:]
-        self._nflushed = nflushed
+        self._flushed_size = flushed_size
 
         self._flush_hdf5()
         self._last_flush = time.time()
@@ -210,11 +215,12 @@ class StackDatasetWriter(_DatasetWriterBase):
     ) -> None:
         super().__init__(parent, name, attrs=attrs, flush_period=flush_period)
         self._npoints = npoints
+        self._chunked: bool = False
         self._nstack = nstack
+
         self._buffers: List[List[ArrayLike]] = list()
-        self._chunked: bool = False
         self._chunk_size: ArrayLike = numpy.zeros(2, dtype=int)
-        self._nflushed_dim1: ArrayLike = numpy.array(list(), dtype=int)
+        self._flushed_size_dim1: ArrayLike = numpy.array(list(), dtype=int)
 
     def _create_dataset(
         self, first_data_point: numpy.ndarray, stack_index: int
@@ -249,7 +255,7 @@ class StackDatasetWriter(_DatasetWriterBase):
         # Add stack buffers when needed
         for _ in range(max(stack_index - len(self._buffers) + 1, 0)):
             self._buffers.append(list())
-            self._nflushed_dim1 = numpy.append(self._nflushed_dim1, 0)
+            self._flushed_size_dim1 = numpy.append(self._flushed_size_dim1, 0)
         return self._buffers[stack_index]
 
     def add_point(self, data: ArrayLike, stack_index: int) -> bool:
@@ -271,10 +277,10 @@ class StackDatasetWriter(_DatasetWriterBase):
     def flush_buffer(self, align: bool = False) -> bool:
         # Determine how many points to flush for each buffer in the stack
         chunk_sizes = numpy.array([len(buffer) for buffer in self._buffers])
-        nflushed_dim1 = self._nflushed_dim1
+        flushed_size_dim1 = self._flushed_size_dim1
         size_dim0 = len(chunk_sizes)
         assert size_dim0 == len(
-            nflushed_dim1
+            flushed_size_dim1
         ), "Number of buffers and number of flushed dim1 points must be the same"
 
         chunk_size_dim0, chunk_size_dim1 = self._chunk_size[:2]
@@ -286,9 +292,9 @@ class StackDatasetWriter(_DatasetWriterBase):
         elif align and self._chunked:
             size_dim0 = size_dim0 // chunk_size_dim0 * chunk_size_dim0
             chunk_sizes = chunk_sizes[:size_dim0]
-            nflushed_dim1 = nflushed_dim1[:size_dim0]
+            flushed_size_dim1 = flushed_size_dim1[:size_dim0]
             if size_dim0:
-                n1 = chunk_sizes + (nflushed_dim1 % chunk_size_dim1)
+                n1 = chunk_sizes + (flushed_size_dim1 % chunk_size_dim1)
                 flush_sizes = n1 // chunk_size_dim1 * chunk_size_dim1
                 flush_sizes = numpy.minimum(flush_sizes, chunk_sizes)
                 for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
@@ -305,14 +311,14 @@ class StackDatasetWriter(_DatasetWriterBase):
 
         # Enlarge the dataset when needed
         nalloc = self._dataset.shape[:2]
-        istart_dim1 = nflushed_dim1
-        nflushed_dim1 = istart_dim1 + flush_sizes
-        nalloc_new = numpy.array([size_dim0, max(nflushed_dim1)])
+        istart_dim1 = flushed_size_dim1
+        flushed_size_dim1 = istart_dim1 + flush_sizes
+        nalloc_new = numpy.array([size_dim0, max(flushed_size_dim1)])
         if self._chunked and any(nalloc_new > nalloc):
             for axis, n in enumerate(nalloc_new):
                 self._dataset.resize(n, axis=axis)
 
-        # Move data from memory to HDF5
+        # Copy data from buffer to HDF5
         for i0_chunk0 in range(0, size_dim0, chunk_size_dim0):
             idx_dim0 = slice(i0_chunk0, i0_chunk0 + chunk_size_dim0)
             buffers = self._buffers[idx_dim0]
@@ -338,9 +344,10 @@ class StackDatasetWriter(_DatasetWriterBase):
                         i_dim0, istart_dim1 : istart_dim1 + i_flush_size_dim1, ...
                     ] = buffer[:i_flush_size_dim1]
 
+        # Remove copied data from buffer
         for i0 in range(size_dim0):
             self._buffers[i0] = self._buffers[i0][flush_sizes[i0] :]
-            self._nflushed_dim1[i0] = nflushed_dim1[i0]
+            self._flushed_size_dim1[i0] = flushed_size_dim1[i0]
 
         self._flush_hdf5()
         self._last_flush = time.time()
-- 
GitLab