Interleaved reading/processing
Current benchmarks indicate that I/O account for roughly 50% of total processing time. We might consider interleaving I/O and processing.
Model (0): Ideal setting
In the ideal setting, reading data takes 50% of the time, and processing takes the other 50%. In this case, the sequence
[Read][Process][Read][Process][Read][Process][Read][Process]
could become
[Read][Process][Process][Process][Process]
[Read]...[Read]...[Read]............
It's important to notice that reader and the processor will both have their own memory banks:
- Reader reads data from disks and fills a CPU memory bank
- Processor acts on a GPU memory bank, but it might also need a full CPU memory bank (for example if building sinograms for half tomography must be done on CPU). In order to avoid bad surprises in the future, it's best to assume that the Processor also have it's own CPU memory bank - which is reasonable because CPU memory size is usually much more than twice the GPU memory size.
The overall speed-up would be 2*n/(n+1)
, for n
chunks to reconstruct, so approximately a factor of two if the volume has many chunks.
This speed-up is close to the maximum achievable speed, as reading data is an "incompressible" step.
Model (1): Accounting for transfer to GPU
In reality, the data that was just read has to be transferred to GPU. Let's denote [T]
this transfer step. The sequence become
[Read][T][Process][Read][T][Process][Read][T][Process][Read][T][Process]
could become
[Read][T][Process][T][Process][T][Process][T][Process]
[Read]......[Read]......[Read]...............
importantly, the [T]
step is part of the same thread doing processing, because the thread doing [Read]
will fill up memory. In other word, we can't do [T]
and [Read]
in parallel, as the involved memory banks are the same. The reader has to wait for the transfer to be finished before reading the next chunk.
Model (2): Accounting for data saving
In our case, I/O stands for both reading data and writing data. Although writing data is usually significantly faster (because the access pattern is ideal, while for reading it's sub-optimal), "reading data" alone does not account for the full 50%.
-
[Read]
data fills the memory bank ofReader
-
[T]
empties the memory bank ofReader
and fills the (GPU) memory bank ofProcessor
. This step only holds when using GPU (otherwise we could just "swap pointers" betweenReader
/Processor
memory banks). -
[Save]
empties the memory bank of[Processor]
Unfortunately, [Save]
cannot be done in parallel of [Process]
because both act on the same memory bank.
A simple prototype
The following code was tested for both "processing slower than reading" and "processing faster than reading", by tuning the size=
kwarg of Processor
class. It seems to work well - the HDF5File
from tomoscan/h5py
releases the GIL, enabling parallel load/process.
from time import time
from threading import Thread
import numpy as np
from tomoscan.io import HDF5File
class Processor:
def __init__(self, size=8000):
self.A = np.random.rand(size, size).astype("f")
self._cnt = 0
def process(self):
print("Processing %d..." % self._cnt)
t0 = time()
_ = self.A.T.dot(self.A)
el = time() - t0
print("process OK in %.2f s" % el)
self._cnt += 1
class Reader:
def __init__(self, fname, h5path, start_z=None, end_z=None):
self.fname = fname
self.h5path = h5path
self.set_subregion(start_z, end_z)
def set_subregion(self, start_z, end_z):
self.start_z = start_z
self.end_z = end_z
def read(self):
print("Reading (%d, %d)..." % (self.start_z, self.end_z))
t0 = time()
with HDF5File(self.fname, "r") as f:
d = f[self.h5path][:, self.start_z:self.end_z, :]
el = time() - t0
print(
"Read (%d, %d) in %.2f s"
% (self.start_z, self.end_z, el)
)
return d
def interleaved_read_proc(fname, h5path, n_steps, chunk_size=100):
R = Reader(fname, h5path, start_z=0, end_z=chunk_size)
P = Processor()
def read_in_thread(start_z, end_z):
R.set_subregion(start_z, end_z)
reader_thread = Thread(target=R.read)
reader_thread.start()
return reader_thread
def wait_for_available_data(reader_thread):
print("[Processor] waiting for available data...")
reader_thread.join()
print("[Processor] OK!")
t0 = time()
reader_thread = read_in_thread(0, chunk_size)
for i in range(1, n_steps+1):
wait_for_available_data(reader_thread)
reader_thread = read_in_thread(i * chunk_size, (i + 1) * chunk_size)
P.process()
el = time() - t0
print("Total time: %.3fs" % el)
Things to change in the nabu pipelines
In nabu, a Reconstructor
class instantiates one (or more) class of Pipeline
tied to a certain chunk size. Something like
# in the Reconstructor class
def reconstruct(self):
self.pipeline = Pipeline(..., chunk_size=self.chunk_size)
for task in tasks:
self.pipeline.set_subregion(task["sub_region"])
self.pipeline.process_chunk() # read, transfer, process and save
The data reading has to be decoupled from the processing in the Pipeline
class. The above code would become
# in the Reconstructor class
def reconstruct(self):
self.pipeline = Pipeline(..., chunk_size=self.chunk_size, create_reader=False)
self.reader = Reader(..., chunk_size=self.chunk_size)
reader_thread = read_in_thread(*tasks[0]["sub_region"]) # fill self.reader.data
for task in tasks[1:]:
wait_for_available_data(reader_thread) # wait for self.reader.data
self.pipeline.set_subregion(task["sub_region"])
self.pipeline.set_data(self.reader.data) # transfer
reader_thread = read_in_thread(*task["sub_region"]) # read the next chunk in another thread
self.pipeline.process_chunk(data=self.current_data) # process and save