Commit a020ccc3 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by Generic Bliss account for Control Software
Browse files

[SMX] Move FAI-related processing to fai_impl helper class

parent 9d2a26a6
......@@ -40,27 +40,22 @@ namespace processing::pipelines
{
namespace smx
{
class pipeline::impl
using fifo_t = boost::lockfree::spsc_queue<pipeline::input_t>;
using fifo_ptr_t = std::shared_ptr<fifo_t>;
using counter_ptr_t = std::shared_ptr<std::atomic_int>;
using source_node_t = source_node<fifo_ptr_t, pipeline::acq_info_t, counter_ptr_t>;
using source_node_ptr_t = std::shared_ptr<source_node_t>;
using graph_ptr_t = std::shared_ptr<tbb::flow::graph>;
using frame_t = pipeline::frame_t;
using proc_params_t = proc_params;
struct cl_device
{
using fifo_t = boost::lockfree::spsc_queue<input_t>;
using fifo_ptr_t = std::shared_ptr<fifo_t>;
using counter_ptr_t = std::shared_ptr<std::atomic_int>;
using source_node_t = source_node<fifo_ptr_t, acq_info_t, counter_ptr_t>;
public:
using proc_params_t = proc_params;
bcl::device device;
bcl::context context;
bcl::command_queue queue;
using accum_split_node = tbb::flow::split_node<std::tuple<frame_t, frame_t>>;
impl(int rank, acq_info_t const& acq_info, proc_params_t const& proc_params) :
m_acq_info(acq_info),
m_fifo_ptr(std::make_shared<fifo_t>(proc_params.fifo.nb_fifo_frames)),
m_nb_frames_source_ptr(std::make_shared<std::atomic_int>(0)),
m_nb_frames_reconstructed_ptr(std::make_shared<std::atomic_int>(0)),
m_graph(m_task_group_context),
m_roi_counters_buffer(proc_params.buffers.nb_roi_counters_buffer),
m_peak_counters_buffer(proc_params.buffers.nb_peak_counters_buffer),
m_frames_buffer(proc_params.buffers.nb_frames_buffer)
static bcl::device get_device(std::size_t device_idx)
{
// Filter for a 1.2 platform and set it as the default
auto platforms = bcl::system::platforms();
......@@ -83,28 +78,44 @@ namespace processing::pipelines
auto devices = plat->devices(CL_DEVICE_TYPE_GPU);
if (devices.empty()) {
LIMA_THROW_EXCEPTION(lima::runtime_error("No device found. Check OpenCL installation!"));
} else if (device_idx >= devices.size()) {
std::ostringstream os;
os << "Invalid GPU device " << std::to_string(device_idx) << ". "
<< "Only " << devices.size() << " devices found";
LIMA_THROW_EXCEPTION(lima::runtime_error(os.str()));
}
bcl::device device = devices[proc_params.gpu.device_idx];
// Getting information about used queue and device
const size_t max_compute_units = device.get_info<CL_DEVICE_MAX_COMPUTE_UNITS>();
const size_t max_work_group_size = device.get_info<CL_DEVICE_MAX_WORK_GROUP_SIZE>();
bcl::device device = devices[device_idx];
LIMA_LOG(trace, proc) << "Dev: " << device.name();
return device;
}
// Create OCL context
bcl::context context(device);
// Create OCL queue
bcl::command_queue queue(context, context.get_device());
cl_device(std::size_t device_idx) :
device(get_device(device_idx)), context(device), queue(context, context.get_device())
{
}
auto&& acq_params = m_acq_info.params;
// Getting information about used queue and device
size_t max_compute_units() { return device.get_info<CL_DEVICE_MAX_COMPUTE_UNITS>(); }
size_t max_work_group_size() { return device.get_info<CL_DEVICE_MAX_WORK_GROUP_SIZE>(); }
};
// Limit the number of log traces given the order of magnitude of m_nb_frames
const unsigned int log_frame_limiter = order_of_magnitude_base10(acq_params.xfer.time_slice.count, 1);
class fai_impl
{
public:
using accum_split_node = tbb::flow::split_node<std::tuple<frame_t, frame_t>>;
src.emplace(m_graph, m_fifo_ptr, m_acq_info, log_frame_limiter, m_nb_frames_source_ptr,
m_nb_frames_reconstructed_ptr);
fai_impl(int rank, lima::frame_info frame_info, std::size_t xfer_frames, graph_ptr_t graph,
source_node_ptr_t src, proc_params_t const& proc_params) :
m_graph(graph),
m_src(src),
m_roi_counters_buffer(proc_params.buffers.nb_roi_counters_buffer),
m_peak_counters_buffer(proc_params.buffers.nb_peak_counters_buffer),
m_frames_buffer(proc_params.buffers.nb_frames_buffer)
{
cl_device device(proc_params.gpu.device_idx);
auto& context = device.context;
auto& queue = device.queue;
// Peak finder node
fai::kernel_params params = fai::read_kernel_params(
......@@ -120,23 +131,21 @@ namespace processing::pipelines
LIMA_LOG(trace, proc) << "Peak finder kernel parameters parsed";
auto&& frame_info = m_acq_info.frame;
auto&& [dense_x, dense_y] = frame_info.dimensions();
peak_finder.emplace(m_graph, dense_x, dense_y, frame_info.pixel_type(), params,
peak_finder.emplace(*m_graph, dense_x, dense_y, frame_info.pixel_type(), params,
proc_params.gpu.cl_source_path, context, queue);
checkpoint_peak_finder.emplace(m_graph, tbb::flow::unlimited, m_nb_frames_sparsified);
peak_counter.emplace(m_graph, tbb::flow::unlimited, [](auto ptr) { return ptr->peak_indices.size(); });
peak_counters_buffer.emplace(m_graph, tbb::flow::unlimited, m_peak_counters_buffer,
checkpoint_peak_finder.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_sparsified);
peak_counter.emplace(*m_graph, tbb::flow::unlimited, [](auto ptr) { return ptr->peak_indices.size(); });
peak_counters_buffer.emplace(*m_graph, tbb::flow::unlimited, m_peak_counters_buffer,
m_peak_counters_buffer_mutex);
int nb_bins = params.csr_indptr.size() - 1;
// Dense HDF5 node
if (proc_params.saving_dense.enabled) {
io_hdf5_dense.emplace(m_graph, proc_params.saving_dense, rank, acq_params.xfer.time_slice.count,
io_hdf5_dense.emplace(*m_graph, proc_params.saving_dense, rank, xfer_frames,
frame_info.dimensions(), frame_info.pixel_type());
checkpoint_hdf5_dense.emplace(m_graph, tbb::flow::unlimited, m_nb_frames_dense_saved);
checkpoint_hdf5_dense.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_dense_saved);
} else
m_nb_frames_dense_saved = -1;
......@@ -145,38 +154,38 @@ namespace processing::pipelines
// Sparse HDF5 node
if (proc_params.saving_sparse.enabled) {
io_hdf5_sparse.emplace(m_graph, proc_params.saving_sparse, rank, acq_params.xfer.time_slice.count,
nb_bins, frame_info.dimensions(), radius, mask);
checkpoint_hdf5_sparse.emplace(m_graph, tbb::flow::unlimited, m_nb_frames_sparse_saved);
io_hdf5_sparse.emplace(*m_graph, proc_params.saving_sparse, rank, xfer_frames, nb_bins,
frame_info.dimensions(), radius, mask);
checkpoint_hdf5_sparse.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_sparse_saved);
} else
m_nb_frames_sparse_saved = -1;
// Accumulation HDF5 node
auto do_acc = (proc_params.fai.acc_update_freq > 0);
if (do_acc) {
accum_split.emplace(m_graph);
auto acc_frames = (acq_params.xfer.time_slice.count - 1) / proc_params.fai.acc_update_freq + 1;
accum_split.emplace(*m_graph);
auto acc_frames = (xfer_frames - 1) / proc_params.fai.acc_update_freq + 1;
if (proc_params.saving_accumulation_corrected.enabled) {
acc_saving_corrected.emplace(m_graph, proc_params.saving_accumulation_corrected, rank,
acc_saving_corrected.emplace(*m_graph, proc_params.saving_accumulation_corrected, rank,
acc_frames, frame_info.dimensions(), pixel_enum::gray32f);
checkpoint_hdf5_acc_corrected.emplace(m_graph, tbb::flow::unlimited,
checkpoint_hdf5_acc_corrected.emplace(*m_graph, tbb::flow::unlimited,
m_nb_frames_acc_corrected_saved);
} else
m_nb_frames_acc_corrected_saved = -1;
if (proc_params.saving_accumulation_peak.enabled) {
acc_saving_peak.emplace(m_graph, proc_params.saving_accumulation_peak, rank, acc_frames,
acc_saving_peak.emplace(*m_graph, proc_params.saving_accumulation_peak, rank, acc_frames,
frame_info.dimensions(), pixel_enum::gray32f);
checkpoint_hdf5_acc_peak.emplace(m_graph, tbb::flow::unlimited, m_nb_frames_acc_peak_saved);
checkpoint_hdf5_acc_peak.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_acc_peak_saved);
} else
m_nb_frames_acc_peak_saved = -1;
}
// ROI counters node (optional)
if (!proc_params.counters.rois.empty()) {
roi_counters.emplace(m_graph, tbb::flow::unlimited, proc_params.counters.rois);
roi_counters_buffer.emplace(m_graph, tbb::flow::unlimited, m_roi_counters_buffer,
roi_counters.emplace(*m_graph, tbb::flow::unlimited, proc_params.counters.rois);
roi_counters_buffer.emplace(*m_graph, tbb::flow::unlimited, m_roi_counters_buffer,
m_roi_counters_buffer_mutex);
roi_counters_checkpoint.emplace(m_graph, tbb::flow::unlimited, m_nb_frames_counters);
roi_counters_checkpoint.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_counters);
}
// Graph topology
......@@ -211,59 +220,13 @@ namespace processing::pipelines
tbb::flow::make_edge(*roi_counters, *roi_counters_buffer);
tbb::flow::make_edge(*roi_counters_buffer, *roi_counters_checkpoint);
}
LIMA_LOG(trace, proc) << "Processing constructed";
}
/// Activate the processing (start poping data from the queue)
void activate()
{
src->activate();
m_finished_future = std::async(std::launch::async, [this] {
std::exception_ptr eptr;
try {
m_graph.wait_for_all();
LIMA_LOG(trace, proc) << "Processing finished";
} catch (...) {
LIMA_LOG(error, proc) << "Processing failed";
eptr = std::current_exception();
}
// Set the is_finished flag and call the callback if registered
m_is_finished = true;
if (m_on_finished)
m_on_finished(eptr);
});
}
/// Returns true when the processing has finished
bool is_finished() const { return m_is_finished; }
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished) { m_on_finished = on_finished; }
/// Abort the pipeline
void abort() { m_task_group_context.cancel_group_execution(); }
/// Push the given frame to the FIFO of frames to process
void process(input_t const& data)
{
auto metadata = get_input_frame_metadata(m_acq_info, data);
LIMA_LOG(trace, det) << "Frame: " << metadata.idx;
if (metadata.is_final)
while (!m_fifo_ptr->push(data))
;
else if (!m_fifo_ptr->push(data))
LIMA_LOG(error, det) << "Buffer overrun: frame #" << metadata.idx;
}
/// Returns the progress counters
counters get_counters() const
{
return {(int) *m_nb_frames_source_ptr,
(int) *m_nb_frames_reconstructed_ptr,
return {(int) 0,
(int) 0,
(int) m_nb_frames_counters,
(int) m_nb_frames_sparsified,
(int) m_nb_frames_sparse_saved,
......@@ -339,15 +302,7 @@ namespace processing::pipelines
}
private:
// Acquisition info
acq_info_t m_acq_info;
// Queue of frame (runtime sized)
fifo_ptr_t m_fifo_ptr;
// Counters
counter_ptr_t m_nb_frames_source_ptr;
counter_ptr_t m_nb_frames_reconstructed_ptr;
std::atomic_int m_nb_frames_counters = 0;
std::atomic_int m_nb_frames_sparsified = 0;
std::atomic_int m_nb_frames_sparse_saved = 0;
......@@ -356,11 +311,8 @@ namespace processing::pipelines
std::atomic_int m_nb_frames_acc_peak_saved = 0;
// TBB Flow Graph
tbb::task_arena m_arena;
tbb::task_group_context m_task_group_context;
tbb::flow::graph m_graph;
std::optional<source_node_t> src;
graph_ptr_t m_graph;
source_node_ptr_t m_src;
std::optional<circular_buffer_node<frame_t>> frames_buffer;
std::optional<peak_finder_node> peak_finder;
......@@ -395,6 +347,120 @@ namespace processing::pipelines
// Frames
boost::circular_buffer<frame_t> m_frames_buffer;
mutable std::mutex m_frames_buffer_mutex;
};
class pipeline::impl
{
public:
impl(int rank, acq_info_t const& acq_info, proc_params_t const& proc_params) :
m_acq_info(acq_info),
m_fifo_ptr(std::make_shared<fifo_t>(proc_params.fifo.nb_fifo_frames)),
m_nb_frames_source_ptr(std::make_shared<std::atomic_int>(0)),
m_nb_frames_reconstructed_ptr(std::make_shared<std::atomic_int>(0)),
m_graph(std::make_shared<tbb::flow::graph>(m_task_group_context))
{
auto&& acq_params = m_acq_info.params;
auto xfer_frames = acq_params.xfer.time_slice.count;
// Limit the number of log traces given the order of magnitude of m_nb_frames
const unsigned int log_frame_limiter = order_of_magnitude_base10(xfer_frames, 1);
m_src = std::make_shared<source_node_t>(*m_graph, m_fifo_ptr, m_acq_info, log_frame_limiter,
m_nb_frames_source_ptr, m_nb_frames_reconstructed_ptr);
m_proc_impl.emplace(rank, m_acq_info.frame, xfer_frames, m_graph, m_src, proc_params);
LIMA_LOG(trace, proc) << "Processing constructed";
}
/// Activate the processing (start poping data from the queue)
void activate()
{
m_src->activate();
m_finished_future = std::async(std::launch::async, [this] {
std::exception_ptr eptr;
try {
m_graph->wait_for_all();
LIMA_LOG(trace, proc) << "Processing finished";
} catch (...) {
LIMA_LOG(error, proc) << "Processing failed";
eptr = std::current_exception();
}
// Set the is_finished flag and call the callback if registered
m_is_finished = true;
if (m_on_finished)
m_on_finished(eptr);
});
}
/// Returns true when the processing has finished
bool is_finished() const { return m_is_finished; }
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished) { m_on_finished = on_finished; }
/// Abort the pipeline
void abort() { m_task_group_context.cancel_group_execution(); }
/// Push the given frame to the FIFO of frames to process
void process(input_t const& data)
{
auto metadata = get_input_frame_metadata(m_acq_info, data);
LIMA_LOG(trace, det) << "Frame: " << metadata.idx;
if (metadata.is_final)
while (!m_fifo_ptr->push(data))
;
else if (!m_fifo_ptr->push(data))
LIMA_LOG(error, det) << "Buffer overrun: frame #" << metadata.idx;
}
/// Returns the progress counters
counters get_counters() const
{
counters c = m_proc_impl->get_counters();
c.nb_frames_source = *m_nb_frames_source_ptr;
c.nb_frames_reconstructed = *m_nb_frames_reconstructed_ptr;
return c;
}
/// Pop a given number of ROI counters
std::vector<roi_counters_result> pop_roi_counters(std::size_t nb_frames = 1)
{
return m_proc_impl->pop_roi_counters(nb_frames);
}
/// Pop a given number of peak counters
std::vector<int> pop_peak_counters(std::size_t nb_frames = 1)
{
return m_proc_impl->pop_peak_counters(nb_frames);
}
/// Returns frame for the given index
frame_t get_frame(std::size_t frame_idx = -1) const { return m_proc_impl->get_frame(frame_idx); }
private:
// Acquisition info
acq_info_t m_acq_info;
// Queue of frame (runtime sized)
fifo_ptr_t m_fifo_ptr;
// Counters
counter_ptr_t m_nb_frames_source_ptr;
counter_ptr_t m_nb_frames_reconstructed_ptr;
// TBB Flow Graph
tbb::task_arena m_arena;
tbb::task_group_context m_task_group_context;
graph_ptr_t m_graph;
std::shared_ptr<source_node_t> m_src;
// Proc implementation
std::optional<fai_impl> m_proc_impl;
// Finished
std::atomic_bool m_is_finished{false};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment