Implement concurrency for shifting at the single dataset level
This works:
import multiprocessing as mp
import queue
import scipy.ndimage as ndi
import shutil
import glob
import os
import time
kmaplist = glob.glob(f"{path_out}/{name_sample}*.1.h5")
subh5 = kmaplist[30]
root = subh5.split("_")[-1][:-3]
h5f = h5py.File(subh5, "r", libver="latest")
data = h5f[f"/{root}/instrument/detector/data"]
eta = str(np.round(h5f[f"{root}/instrument/positioners/eta"][()], 4))
shift = etashift[eta]
map_sh = [h5f[f"{root}/scan/motor_{i}_steps"][()] for i in (0, 1)] # (x, y)
data_sh = data.shape # (x*y, detx, dety)
n_chunks = os.cpu_count() - 1
frames_sh = data.shape[1]
chunk_size = frames_sh // n_chunks
chunk_size_last = frames_sh - chunk_size * n_chunks
c0 = [x for x in range(0, frames_sh, chunk_size)]
c1 = [x for x in c0.copy()[1:]] + [frames_sh]
idxs = list(zip(c0, c1))
tq = mp.JoinableQueue()
[tq.put(i) for i in idxs]
fname_subh5_shifted = os.path.abspath(subh5).split(".")[0]
fname_subh5_shifted = shutil.copy(subh5, f"{fname_subh5_shifted}.1_shifted.h5")
f = h5py.File(fname_subh5_shifted, "a", libver="latest")
det_shift = f[f"{root}/instrument/detector/"]
data_shift_link = [f"{root}measurement/image/data"]
del det_shift["data"]
data_shift = det_shift.create_dataset(
"data",
shape=data_sh,
chunks=(1, len(idxs), len(idxs)),
dtype=data.dtype,
**hdf5plugin.Bitshuffle(nelems=0, lz4=True),
)
data_shift_link = data_shift
sh_mem = mp.shared_memory.SharedMemory(create=True, size=data_shift.nbytes)
arr = np.ndarray(data_shift.shape, dtype=data_shift.dtype, buffer=sh_mem.buf)
def process_consumer(task_queue):
while True:
try:
# load and reshape
i0, i1 = task_queue.get_nowait()
dmap = data[:, i0:i1, i0:i1]
dmap = dmap.reshape(*map_sh, (i1 - i0) ** 2)
# shift
if not np.allclose(shift, 0, atol=1e-3):
for i in range(dmap.shape[-1]):
dmap[..., i] = ndi.shift(dmap[..., i], shift)
# reshape and write
dmap = dmap.reshape(data_sh[0], i1 - i0, i1 - i0)
arr[:, i0:i1, i0:i1] = dmap
del dmap
task_queue.task_done()
except queue.Empty:
print('done')
return
pool = [
mp.Process(target=process_consumer, args=(tq,)) for _ in range(os.cpu_count())
]
_ = [p.start() for p in pool]
_ = [p.join() for p in pool]
_ = [p.close() for p in pool]
data_shift[...] = arr # <-- takes ages
f.close()
h5f.close()
But the shared array arr
is written as a whole at the end, plus it has to be somehow re-loaded into memory first. To be better investigated. The current solution is for each spawned process to treat each sub h5 file, meaning that lots of time is spent doing I/O, i.e. the process is sleeping.
Useful links:
- shared_memory docs(stdlib)
- benchmarks of different ways to share numpy arrays between processes
-
easy way to do it using
mp.Array
.