Commit 109383bb authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by Generic Bliss account for Control Software
Browse files

[SMX] Add average proc_mode with OpenCL implementation

parent a01047da
Pipeline #77479 failed with stages
in 10 minutes and 18 seconds
// Copyright (C) 2018 Alejandro Homs, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <memory>
#include <tbb/flow_graph.h>
#include <lima/exceptions.hpp>
#include <lima/logging.hpp>
#include <lima/core/enums.hpp>
#include <lima/core/image/view.hpp>
#include <lima/processing/fai/average.hpp>
namespace lima
{
namespace processing::pipelines
{
namespace smx
{
namespace bcl = boost::compute;
struct average_node : public tbb::flow::multifunction_node<lima::frame, std::tuple<lima::frame>>
{
using parent_t = tbb::flow::multifunction_node<lima::frame, std::tuple<lima::frame>>;
struct average_body
{
average_body(std::size_t width, std::size_t height, std::size_t start_frame, std::size_t nb_frames,
bcl::context& context, bcl::command_queue& queue) :
context(context),
queue(queue),
nb_pixels(width * height),
raw_d(nb_pixels, context), // Input
ave_d(nb_pixels, context) // Output Average
{
LIMA_LOG(trace, proc) << "nb_pixels:" << nb_pixels;
// Create the average calculator
ave_calc = fai::make_average(context, //
width, height, //
start_frame, nb_frames, queue);
LIMA_LOG(trace, proc) << "Peak finder created";
}
void operator()(lima::frame const& in, typename parent_t::output_ports_type& ports) const
{
assert(nb_pixels == in.size());
// Write input to the device (non-blocking)
auto v_in = lima::const_view<gray16c_view_t>(in);
bcl::copy(v_in.begin(), v_in.end(), raw_d.begin(), queue);
// Run the kernel
auto force_ave = in.metadata.is_final;
auto nb_ave_frames = ave_calc(queue, raw_d, ave_d, force_ave);
LIMA_LOG(trace, proc) << nb_ave_frames << " frames averaged";
if (nb_ave_frames == 0)
return;
LIMA_LOG(trace, proc) << "Average available";
auto dims = in.dimensions();
lima::frame ave(dims, pixel_enum::gray32f);
// Copy metadata - TODO: include nb_ave_frames
ave.metadata = in.metadata;
ave.metadata.idx = ave_frame_idx;
ave.attributes = in.attributes;
// Copy data
auto view = lima::view<lima::gray32f_view_t>(ave);
bcl::copy(ave_d.begin(), ave_d.end(), &boost::gil::at_c<0>(view(0, 0)), queue);
++ave_frame_idx;
std::get<0>(ports).try_put(ave);
}
bcl::context context;
mutable bcl::command_queue queue;
std::size_t nb_pixels;
fai::average ave_calc;
mutable fai::vector<std::uint16_t> raw_d; // Input
mutable fai::vector<float> ave_d; // Output average
mutable std::size_t ave_frame_idx{0}; //
};
average_node(tbb::flow::graph& g, std::size_t width, std::size_t height, pixel_enum pixel,
std::size_t start_frame, std::size_t nb_frames, bcl::context& context,
bcl::command_queue& queue) :
parent_t(g, tbb::flow::serial /*serial fo now*/,
average_body(width, height, start_frame, nb_frames, context, queue))
{
if (pixel != pixel_enum::gray16)
LIMA_THROW_EXCEPTION(lima::invalid_argument("average_node only supports gray16 pixel type"));
}
};
} // namespace smx
} // namespace processing::pipelines
} // namespace lima
// Copyright (C) 2020 Alejandro Homs Puron, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <boost/describe.hpp>
#include <boost/describe/io_enums.hpp>
#include <lima/processing/pipelines/smx/enums.hpp>
namespace lima
{
namespace processing::pipelines
{
namespace smx
{
BOOST_DESCRIBE_ENUM(proc_mode_enum, fai, average)
using boost::describe::operator<<;
using boost::describe::operator>>;
} // namespace smx
} // namespace processing::pipelines
} // namespace lima
// Copyright (C) 2020 Alejandro Homs Puron, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
namespace lima
{
namespace processing::pipelines
{
namespace smx
{
/// Frame (reconstruction) assembly mode
enum class proc_mode_enum : int
{
fai,
average
};
} // namespace smx
} // namespace processing::pipelines
} // namespace lima
......@@ -10,6 +10,7 @@
#include <lima/processing/fai/enums.describe.hpp>
#include <lima/processing/pipelines/smx/params.hpp>
#include <lima/processing/pipelines/smx/enums.describe.hpp>
// This file is part of the user interface of the library and should not include any private dependencies
// or other thing that expose implementation details
......@@ -64,6 +65,18 @@ namespace processing::pipelines
(doc, "The path to the OpenCL kernels"))
// clang-format on
BOOST_DESCRIBE_STRUCT(average_params, (), (start_frame, nb_frames))
// clang-format off
BOOST_ANNOTATE_MEMBER(average_params, start_frame,
(desc, "start frame"),
(doc, "The first frame to start average (nb of frames to skip)"))
BOOST_ANNOTATE_MEMBER(average_params, nb_frames,
(desc, "nb of frames"),
(doc, "The number of frames to average"))
// clang-format on
BOOST_DESCRIBE_STRUCT(gain_pedestal_params, (), (gain_path, pedestal_path, photon_energy))
// clang-format off
......@@ -177,10 +190,15 @@ namespace processing::pipelines
// clang-format on
BOOST_DESCRIBE_STRUCT(proc_params, (),
(fifo, buffers, saving_dense, saving_sparse, saving_accumulation_corrected,
saving_accumulation_peak, gpu, jfrau, fai, counters))
(proc_mode, fifo, buffers, saving_dense, saving_average, saving_sparse,
saving_accumulation_corrected, saving_accumulation_peak, gpu, average, jfrau, fai,
counters))
// clang-format off
BOOST_ANNOTATE_MEMBER(proc_params, proc_mode,
(desc, "processing mode"),
(doc, "The processing mode: fai, average"))
BOOST_ANNOTATE_MEMBER(proc_params, fifo,
(desc, "fifo parameters"),
(doc, "The processing FIFO parameters"))
......@@ -193,6 +211,10 @@ namespace processing::pipelines
(desc, "saving parameters raw"),
(doc, "The HDF5 saving parameters for the raw frame"))
BOOST_ANNOTATE_MEMBER(proc_params, saving_average,
(desc, "saving parameters averaged"),
(doc, "The HDF5 saving parameters for the averaged frame"))
BOOST_ANNOTATE_MEMBER(proc_params, saving_sparse,
(desc, "saving parameters assembled"),
(doc, "The HDF5 saving parameters for the assembled frame"))
......@@ -209,6 +231,10 @@ namespace processing::pipelines
(desc, "gpu params"),
(doc, "The GPU (OpenCL) parameters"))
BOOST_ANNOTATE_MEMBER(proc_params, average,
(desc, "average params"),
(doc, "The averaging parameters"))
BOOST_ANNOTATE_MEMBER(proc_params, jfrau,
(desc, "jungfrau gain pedestal params"),
(doc, "The Gain / Pesdetal correction parameters"))
......
......@@ -12,6 +12,8 @@
#include <lima/processing/fai/enums.hpp>
#include <lima/processing/pipelines/smx/enums.hpp>
// This file is part of the user interface of the library and should not include any private dependencies
// or other thing that expose implementation details
......@@ -45,6 +47,12 @@ namespace processing::pipelines
std::filesystem::path cl_source_path = "detectors/processing/psi/src"; // CL source file path
};
struct average_params
{
std::size_t start_frame = 0;
std::size_t nb_frames = 0;
};
struct gain_pedestal_params
{
std::filesystem::path gain_path; // JungFrau Gain/Pedestal
......@@ -84,13 +92,16 @@ namespace processing::pipelines
struct proc_params
{
proc_mode_enum proc_mode = proc_mode_enum::fai;
fifo_params fifo;
buffer_params buffers;
saving_params saving_dense;
saving_params saving_average;
saving_params saving_sparse;
saving_params saving_accumulation_corrected;
saving_params saving_accumulation_peak;
gpu_params gpu;
average_params average;
gain_pedestal_params jfrau;
fai_params fai;
roi_counters_params counters;
......
......@@ -20,7 +20,7 @@ namespace processing::pipelines
BOOST_DESCRIBE_STRUCT(counters, (),
(nb_frames_source, nb_frames_reconstructed, nb_frames_counters, nb_frames_sparsified,
nb_frames_sparse_saved, nb_frames_dense_saved, nb_frames_acc_corrected_saved,
nb_frames_acc_peak_saved))
nb_frames_acc_peak_saved, nb_frames_average_generated, nb_frames_average_saved))
// clang-format off
BOOST_ANNOTATE_MEMBER(counters, nb_frames_source,
......@@ -54,6 +54,14 @@ namespace processing::pipelines
BOOST_ANNOTATE_MEMBER(counters, nb_frames_acc_peak_saved,
(desc, "nb frames accumulation peak saved"),
(doc, "The number of peak accumulation frame saved to file"))
BOOST_ANNOTATE_MEMBER(counters, nb_frames_average_generated,
(desc, "nb frames average generated"),
(doc, "The number of average frames generated"))
BOOST_ANNOTATE_MEMBER(counters, nb_frames_average_saved,
(desc, "nb frames average saved"),
(doc, "The number of average frames saved to file"))
// clang-format on
using boost::json::tag_invoke;
......
......@@ -33,6 +33,8 @@ namespace processing::pipelines
int nb_frames_dense_saved = 0;
int nb_frames_acc_corrected_saved = 0;
int nb_frames_acc_peak_saved = 0;
int nb_frames_average_generated = 0;
int nb_frames_average_saved = 0;
};
using finished_callback_t = std::function<void(std::exception_ptr)>;
......
......@@ -25,6 +25,7 @@
#include <lima/processing/pipelines/smx/sparse_frame.hpp>
#include <lima/processing/pipelines/smx/sparse_static_data.hpp>
#include <lima/processing/pipelines/smx/peak_finder_node.hpp>
#include <lima/processing/pipelines/smx/average_node.hpp>
#include <lima/processing/pipelines/smx/io_hdf5_sparse_node.hpp>
#include <lima/processing/utils/pipelines.hpp>
......@@ -232,7 +233,9 @@ namespace processing::pipelines
(int) m_nb_frames_sparse_saved,
(int) m_nb_frames_dense_saved,
(int) m_nb_frames_acc_corrected_saved,
(int) m_nb_frames_acc_peak_saved};
(int) m_nb_frames_acc_peak_saved,
(int) 0,
(int) 0};
}
/// Pop a given number of ROI counters
......@@ -349,6 +352,170 @@ namespace processing::pipelines
mutable std::mutex m_frames_buffer_mutex;
};
class average_impl
{
public:
using accum_split_node = tbb::flow::split_node<std::tuple<frame_t, frame_t>>;
average_impl(int rank, lima::frame_info frame_info, std::size_t xfer_frames, graph_ptr_t graph,
source_node_ptr_t src, proc_params_t const& proc_params) :
m_graph(graph),
m_src(src),
m_roi_counters_buffer(proc_params.buffers.nb_roi_counters_buffer),
m_frames_buffer(proc_params.buffers.nb_frames_buffer)
{
cl_device device(proc_params.gpu.device_idx);
auto& context = device.context;
auto& queue = device.queue;
// Average node
auto&& [dense_x, dense_y] = frame_info.dimensions();
average.emplace(*m_graph, dense_x, dense_y, frame_info.pixel_type(), proc_params.average.start_frame,
proc_params.average.nb_frames, context, queue);
checkpoint_average.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_average_generated);
// Dense HDF5 node
if (proc_params.saving_dense.enabled) {
io_hdf5_dense.emplace(*m_graph, proc_params.saving_dense, rank, xfer_frames,
frame_info.dimensions(), frame_info.pixel_type());
checkpoint_hdf5_dense.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_dense_saved);
} else
m_nb_frames_dense_saved = -1;
// Average HDF5 node
if (proc_params.saving_average.enabled) {
io_hdf5_average.emplace(*m_graph, proc_params.saving_average, rank, 1, frame_info.dimensions(),
pixel_enum::gray32f);
checkpoint_hdf5_average.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_average_saved);
} else
m_nb_frames_average_saved = -1;
// ROI counters node (optional)
if (!proc_params.counters.rois.empty()) {
roi_counters.emplace(*m_graph, tbb::flow::unlimited, proc_params.counters.rois);
roi_counters_buffer.emplace(*m_graph, tbb::flow::unlimited, m_roi_counters_buffer,
m_roi_counters_buffer_mutex);
roi_counters_checkpoint.emplace(*m_graph, tbb::flow::unlimited, m_nb_frames_counters);
}
// Graph topology
tbb::flow::make_edge(*src, *average);
tbb::flow::make_edge(*average, *checkpoint_average);
if (proc_params.saving_dense.enabled) {
tbb::flow::make_edge(*src, *io_hdf5_dense);
tbb::flow::make_edge(*io_hdf5_dense, *checkpoint_hdf5_dense);
}
if (proc_params.saving_average.enabled) {
tbb::flow::make_edge(*average, *io_hdf5_average);
tbb::flow::make_edge(*io_hdf5_average, *checkpoint_hdf5_average);
}
if (roi_counters) {
tbb::flow::make_edge(*src, *roi_counters);
tbb::flow::make_edge(*roi_counters, *roi_counters_buffer);
tbb::flow::make_edge(*roi_counters_buffer, *roi_counters_checkpoint);
}
}
/// Returns the progress counters
counters get_counters() const
{
return {(int) 0,
(int) 0,
(int) m_nb_frames_counters,
(int) -1,
(int) -1,
(int) m_nb_frames_dense_saved,
(int) -1,
(int) -1,
(int) m_nb_frames_average_generated,
(int) m_nb_frames_average_saved};
}
/// Pop a given number of ROI counters
std::vector<roi_counters_result> pop_roi_counters(std::size_t nb_frames = 1)
{
std::vector<roi_counters_result> res;
// If ROI counters buffer is empty, returns early
if (m_roi_counters_buffer.empty())
return res;
const std::lock_guard<std::mutex> lock(m_roi_counters_buffer_mutex);
std::size_t nb_elements = std::min(nb_frames, m_roi_counters_buffer.size());
res.resize(nb_elements);
// Copy to results and erase from buffer
std::copy_n(m_roi_counters_buffer.begin(), nb_elements, res.begin());
m_roi_counters_buffer.erase_begin(nb_elements);
return res;
}
/// Pop a given number of peak counters
std::vector<int> pop_peak_counters(std::size_t nb_frames = 1) { return {}; }
/// Returns frame for the given index
frame_t get_frame(std::size_t frame_idx = -1) const
{
frame_t res;
// If frames buffer is empty, returns early
if (m_frames_buffer.empty())
return res;
const std::lock_guard<std::mutex> lock(m_frames_buffer_mutex);
// If frame_idx == -1 returns the latest frame in the buffer
if (frame_idx == -1)
res = m_frames_buffer.back();
else {
auto it = std::find_if(m_frames_buffer.begin(), m_frames_buffer.end(),
[frame_idx](frame_t const& frm) { return frame_idx == frm.metadata.idx; });
if (it != m_frames_buffer.end())
res = *it;
}
return res;
}
private:
// Counters
std::atomic_int m_nb_frames_counters = 0;
std::atomic_int m_nb_frames_dense_saved = 0;
std::atomic_int m_nb_frames_average_generated = 0;
std::atomic_int m_nb_frames_average_saved = 0;
// TBB Flow Graph
graph_ptr_t m_graph;
source_node_ptr_t m_src;
std::optional<circular_buffer_node<frame_t>> frames_buffer;
std::optional<average_node> average;
std::optional<checkpoint_node<frame_t>> checkpoint_average;
std::optional<circular_buffer_node<frame_t>> average_buffer;
std::optional<io_hdf5_node<frame_t>> io_hdf5_dense;
std::optional<checkpoint_node<tbb::flow::continue_msg>> checkpoint_hdf5_dense;
std::optional<io_hdf5_node<frame_t>> io_hdf5_average;
std::optional<checkpoint_node<tbb::flow::continue_msg>> checkpoint_hdf5_average;
std::optional<roi_counters_node<frame>> roi_counters;
std::optional<circular_buffer_node<roi_counters_result>> roi_counters_buffer;
std::optional<checkpoint_node<tbb::flow::continue_msg>> roi_counters_checkpoint;
// ROI counters
boost::circular_buffer<roi_counters_result> m_roi_counters_buffer;
std::mutex m_roi_counters_buffer_mutex;
// Frames
boost::circular_buffer<frame_t> m_frames_buffer;
mutable std::mutex m_frames_buffer_mutex;
};
using proc_impl_t = std::variant<fai_impl, average_impl>;
class pipeline::impl
{
public:
......@@ -368,7 +535,14 @@ namespace processing::pipelines
m_src = std::make_shared<source_node_t>(*m_graph, m_fifo_ptr, m_acq_info, log_frame_limiter,
m_nb_frames_source_ptr, m_nb_frames_reconstructed_ptr);
m_proc_impl.emplace(rank, m_acq_info.frame, xfer_frames, m_graph, m_src, proc_params);
auto proc_builder = [&](auto&& type) {
m_proc_impl.emplace(type, rank, m_acq_info.frame, xfer_frames, m_graph, m_src, proc_params);
};
if (proc_params.proc_mode == proc_mode_enum::fai)
proc_builder(std::in_place_type_t<fai_impl>());
else
proc_builder(std::in_place_type_t<average_impl>());
LIMA_LOG(trace, proc) << "Processing constructed";
}
......@@ -420,26 +594,33 @@ namespace processing::pipelines
/// Returns the progress counters
counters get_counters() const
{
counters c = m_proc_impl->get_counters();
c.nb_frames_source = *m_nb_frames_source_ptr;
c.nb_frames_reconstructed = *m_nb_frames_reconstructed_ptr;
return c;
return std::visit(
[&](auto&& proc) {
counters c = proc.get_counters();
c.nb_frames_source = *m_nb_frames_source_ptr;
c.nb_frames_reconstructed = *m_nb_frames_reconstructed_ptr;
return c;
},
*m_proc_impl);
}
/// Pop a given number of ROI counters
std::vector<roi_counters_result> pop_roi_counters(std::size_t nb_frames = 1)
{
return m_proc_impl->pop_roi_counters(nb_frames);
return std::visit([&](auto&& proc) { return proc.pop_roi_counters(nb_frames); }, *m_proc_impl);
}
/// Pop a given number of peak counters
std::vector<int> pop_peak_counters(std::size_t nb_frames = 1)
{
return m_proc_impl->pop_peak_counters(nb_frames);
return std::visit([&](auto&& proc) { return proc.pop_peak_counters(nb_frames); }, *m_proc_impl);
}
/// Returns frame for the given index
frame_t get_frame(std::size_t frame_idx = -1) const { return m_proc_impl->get_frame(frame_idx); }
frame_t get_frame(std::size_t frame_idx = -1) const
{
return std::visit([&](auto&& proc) { return proc.get_frame(frame_idx); }, *m_proc_impl);
}
private:
// Acquisition info
......@@ -460,7 +641,7 @@ namespace processing::pipelines
std::shared_ptr<source_node_t> m_src;
// Proc implementation
std::optional<fai_impl> m_proc_impl;
std::optional<proc_impl_t> m_proc_impl;
// Finished
std::atomic_bool m_is_finished{false};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment