Commit 6fa5fe08 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by Generic Bliss account for Control Software
Browse files

[CORE] Formalize HW & proc. acq_info_t, data_t and input_t:

* acq_info_t: acq_params, frame_info, reconstruction & metadata_getter/setter
* data_t: std::any (single frame) or std::vector<std::any> (frame block)
* input_t: std::any, can be a frame or data to be reconstructed
parent 82077a7a
Pipeline #75370 passed with stages
in 9 minutes and 6 seconds
......@@ -8,7 +8,7 @@
#include <lima/core/frame.hpp>
#include <lima/hw/frame_info.hpp>
#include <lima/hw/acq_info.hpp>
#include <lima/hw/params.hpp>
#include <lima/detectors/simulator/hw/enums.hpp>
......@@ -34,10 +34,10 @@ namespace detectors::simulator::hw
using acq_params_t = acq_params;
/// Acquisition information passed to processing
using acq_info_t = lima::hw::frame_info;
using acq_info_t = lima::hw::acq_info;
/// Output data type of the acquisition
using data_t = frame;
using data_t = std::any;
};
} // namespace detectors::simulator::hw
} // namespace lima
......@@ -61,7 +61,7 @@ namespace detectors::simulator::hw
std::chrono::microseconds m_expo_time; //<! The exposure time
std::chrono::microseconds m_latency_time; //<! The latency time
using circular_buffer_t = std::vector<acquisition::data_t>;
using circular_buffer_t = std::vector<frame>;
circular_buffer_t m_buffer; //<! A circular buffer of generated frames
circular_buffer_t::const_iterator m_current_frame; //<! The current frame iterator in the circular buffer
......@@ -173,7 +173,8 @@ namespace detectors::simulator::hw
// Reset the current frame iterator
m_current_frame = m_buffer.begin();
return {roi.dimensions, pixel_type};
frame_info frame{roi.dimensions, pixel_type};
return {params, frame};
}
void acquisition::impl::hw_start() { m_nb_generated_images = 0; }
......@@ -184,7 +185,7 @@ namespace detectors::simulator::hw
/// Get image view
acquisition::data_t acquisition::impl::hw_get_frame() noexcept
{
acquisition::data_t res;
frame res;
// Simulate exposure time
std::this_thread::sleep_for(m_expo_time);
......@@ -204,7 +205,7 @@ namespace detectors::simulator::hw
// Simulate readout latency
std::this_thread::sleep_for(m_latency_time);
return res;
return std::make_any<frame>(res);
}
// Pimpl boilerplate
......
// Copyright (C) 2020 Alejandro Homs Puron, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <any>
#include <lima/core/frame_info.hpp>
#include <lima/core/frame.hpp>
#include <lima/hw/params.hpp>
namespace lima
{
namespace hw
{
/// Information passed from acquisition::prepare to processing
struct acq_info
{
using reconstruction_t = std::function<lima::frame(std::any const&)>;
using metadata_getter_t = std::function<lima::frame_metadata(std::any const&)>;
using metadata_setter_t = std::function<void(std::any&, lima::frame_metadata const&)>;
acq_params params; //!< Acquisition parameters
frame_info frame; //!< Frame parameters (once reconstructed)
reconstruction_t reconstruction; //!< Reconstruction function (if data is not a frame)
metadata_getter_t metadata_getter; //!< Metadata retreiver (if data is not a frame)
metadata_setter_t metadata_setter; //!< Metadata setter (if data is not a frame)
};
} // namespace hw
} // namespace lima
// Copyright (C) 2020 Samuel Debionne, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <cassert>
#include <cstdint>
#include <lima/core/point.hpp>
#include <lima/core/pixel.hpp>
namespace lima
{
namespace hw
{
class frame_info
{
public:
using coord_t = std::ptrdiff_t;
using point_t = point<coord_t>;
frame_info(const point_t& dims, const pixel_enum& pixel) : m_dimensions(dims), m_pixel(pixel) {}
frame_info(coord_t width, coord_t height, const pixel_enum& pixel) : m_dimensions(width, height), m_pixel(pixel)
{
}
const point_t& dimensions() const { return m_dimensions; }
coord_t width() const { return m_dimensions.x; }
coord_t height() const { return m_dimensions.y; }
std::size_t size() const { return m_dimensions.x * m_dimensions.y; }
pixel_enum pixel_type() const { return m_pixel; }
private:
point_t m_dimensions; //<! Frame dimensions
pixel_enum m_pixel; //<! Pixel type
};
} // namespace hw
} // namespace lima
......@@ -6,7 +6,6 @@
#pragma once
#include <any>
#include <memory>
#include <unordered_map>
#include <vector>
......@@ -33,20 +32,24 @@ class processing_factory
using acquisition_uid_t = lima::uuid;
using processing_ptr_t = std::shared_ptr<Processing>;
using acq_info_t = typename Processing::acq_info_t;
using proc_params_t = typename Processing::proc_params_t;
//using processing_factory_t = std::function<processing_ptr_t(int, acq_info_t, acq_params_t)>;
/// Processing management
/// \{
template <typename T1, typename T2, typename T3>
processing_ptr_t construct(acquisition_uid_t uuid, int recv_rank, T1 const& acq_info, T2 const& acq_params,
T3 const& proc_params)
processing_ptr_t construct(acquisition_uid_t uuid, int recv_rank, acq_info_t const& acq_info,
proc_params_t const& proc_params)
{
LIMA_LOG_SCOPE("construct");
auto&& acq_params = acq_info.params;
LIMA_LOG(trace, core) << fmt::format("uuid = {}, acq_params = {}", uuid, acq_params);
// Construct the processing
auto [iterator, inserted] =
m_procs.try_emplace(uuid, std::make_shared<Processing>(recv_rank, acq_info, acq_params, proc_params));
m_procs.try_emplace(uuid, std::make_shared<Processing>(recv_rank, acq_info, proc_params));
if (!inserted)
LIMA_THROW_EXCEPTION(lima::runtime_error("Failed to create processing, uuid already exists"));
......
// Copyright (C) 2020 Alejandro Homs Puron, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <any>
#include <optional>
#include <tbb/flow_graph.h>
#include <lima/logging.hpp>
#include <lima/core/frame.hpp>
namespace lima
{
namespace processing::pipelines
{
template <typename AcqInfo, typename Input>
frame_metadata get_input_frame_metadata(AcqInfo const& acq_info, Input const& data)
{
if (acq_info.metadata_getter) {
return acq_info.metadata_getter(data);
} else {
const frame* frm = std::any_cast<frame>(&data);
return frm->metadata;
}
}
template <typename FifoPtr, typename AcqInfo, typename CounterPtr>
class source_node : public tbb::flow::composite_node<std::tuple<>, std::tuple<lima::frame>>
{
public:
using input_t = std::any;
using output_t = lima::frame;
private:
using parent_t = tbb::flow::composite_node<std::tuple<>, std::tuple<output_t>>;
template <typename Output>
struct input : tbb::flow::input_node<Output>
{
input(tbb::flow::graph& graph, FifoPtr const& fifo, AcqInfo const& acq_info, unsigned int log_frame_limiter,
CounterPtr const& counter) :
tbb::flow::input_node<Output>(
graph, [fifo, acq_info, log_frame_limiter, counter, stop = false](tbb::flow_control& fc) mutable {
Output res;
// If input is flagged to stop, return early
if (stop) {
fc.stop();
return res;
}
// Poll for available frames
input_t data;
while (!fifo->pop(data))
;
auto metadata = get_input_frame_metadata(acq_info, data);
if constexpr (std::is_same_v<Output, output_t>)
res = std::any_cast<output_t>(data);
else
res = data;
if (!(metadata.idx % log_frame_limiter))
LIMA_LOG(trace, proc) << "Processing thread got frame " << metadata.idx;
// If the frame is final, flag the input to stop next run
if (metadata.is_final) {
LIMA_LOG(trace, proc) << "Processing thread got final frame";
stop = true;
}
if (counter)
++(*counter);
return res;
})
{
}
};
struct reconstruction : tbb::flow::function_node<input_t, output_t>
{
reconstruction(tbb::flow::graph& graph, AcqInfo const& acq_info, CounterPtr const& counter) :
tbb::flow::function_node<input_t, output_t>(
graph, tbb::flow::unlimited, [acq_info, counter](input_t data) {
output_t res = acq_info.reconstruction(data);
LIMA_LOG(trace, proc) << "Reconstructed frame " << res.metadata.idx;
if (counter)
++(*counter);
return res;
})
{
}
};
std::optional<input<output_t>> src;
std::optional<input<input_t>> raw_src;
std::optional<reconstruction> rec;
public:
source_node(tbb::flow::graph& graph, FifoPtr const& fifo, AcqInfo const& acq_info,
unsigned int log_frame_limiter, CounterPtr const& injected_counter_ptr,
CounterPtr const& reconstructed_counter_ptr) :
parent_t(graph)
{
using output_tuple = typename parent_t::output_ports_type;
if (acq_info.reconstruction) {
raw_src.emplace(graph, fifo, acq_info, log_frame_limiter, injected_counter_ptr);
*reconstructed_counter_ptr = 0;
rec.emplace(graph, acq_info, reconstructed_counter_ptr);
tbb::flow::make_edge(*raw_src, *rec);
parent_t::set_external_ports(output_tuple(*rec));
} else {
src.emplace(graph, fifo, acq_info, log_frame_limiter, injected_counter_ptr);
parent_t::set_external_ports(output_tuple(*src));
*reconstructed_counter_ptr = -1;
}
}
void activate()
{
if (raw_src)
raw_src->activate();
else
src->activate();
}
};
} // namespace processing::pipelines
} //namespace lima
......@@ -6,6 +6,7 @@
#pragma once
#include <any>
#include <functional>
#include <memory>
#include <optional>
......@@ -15,6 +16,7 @@
#include <lima/exceptions.hpp>
#include <lima/logging.hpp>
#include <lima/processing.hpp>
#include <lima/core/frame.hpp>
#include <lima/core/uuid.hpp>
#include <lima/concepts/hw.hpp>
#include <lima/utils/math.hpp>
......@@ -27,8 +29,12 @@ class acquisition_thread
{
public:
using data_t = typename Detector::data_t;
using acq_info_t = typename Detector::acq_info_t;
acquisition_thread(Detector& hw, int nb_frames) : m_hw(hw), m_nb_frames(nb_frames) {}
acquisition_thread(Detector& hw, int nb_frames, acq_info_t const& acq_info) :
m_hw(hw), m_nb_frames(nb_frames), m_acq_info(acq_info)
{
}
~acquisition_thread()
{
......@@ -98,6 +104,22 @@ class acquisition_thread
/// Callback on end acquisition
std::function<void(int)> on_end_acq;
/// Metadata getter & setter
frame_metadata get_frame_metadata(std::any const& data)
{
auto& getter = m_acq_info.metadata_getter;
return getter ? getter(data) : std::any_cast<frame>(&data)->metadata;
}
void set_frame_metadata(std::any& data, frame_metadata const& metadata)
{
auto& setter = m_acq_info.metadata_setter;
if (setter)
setter(data, metadata);
else
std::any_cast<frame>(&data)->metadata = metadata;
}
private:
void acquisition_loop()
{
......@@ -109,13 +131,13 @@ class acquisition_thread
bool is_final = false;
while (!is_final) {
// Get a frame from the HW
data_t frm = m_hw.get_frame();
data_t data = m_hw.get_frame();
if (!(m_nb_frames_xferred % log_frame_limiter))
LIMA_LOG(trace, core) << "Acquisition thread got frame " << m_nb_frames_xferred;
if constexpr (is_vector_v<data_t>)
m_nb_frames_xferred += frm.size();
m_nb_frames_xferred += data.size();
else
m_nb_frames_xferred++;
......@@ -124,14 +146,20 @@ class acquisition_thread
LIMA_LOG(trace, core) << "Acquisition thread got final frame";
auto set_final = [&](auto& data) {
auto metadata = get_frame_metadata(data);
metadata.is_final = true;
set_frame_metadata(data, metadata);
};
if constexpr (is_vector_v<data_t>)
frm.back().metadata.is_final = true;
set_final(data.back());
else
frm.metadata.is_final = true;
set_final(data);
}
// Send the data to the pipeline
on_frame_ready(frm);
on_frame_ready(data);
}
}
......@@ -145,6 +173,8 @@ class acquisition_thread
int m_nb_frames; //<! The number of frames to acquire
Detector& m_hw; //<! The dectector HW interface, for get_frame()
acq_info_t m_acq_info;
};
namespace detail
......@@ -216,7 +246,7 @@ class receiver : public detail::receiver_init_mpi
auto acq_info = m_detector.prepare(acq_params);
// Construct the acquisition thread
m_acquisition_thread.emplace(m_detector, acq_params.xfer.time_slice.count);
m_acquisition_thread.emplace(m_detector, acq_params.xfer.time_slice.count, acq_info);
return acq_info;
}
......
......@@ -29,6 +29,6 @@ struct is_vector<std::vector<T, Allocator>> : std::true_type
};
template <typename T>
inline constexpr bool is_vector_v = is_vector<T>::value;
constexpr bool is_vector_v = is_vector<T>::value;
} //namespace lima
......@@ -8,7 +8,7 @@
#include <memory>
#include <lima/hw/frame_info.hpp>
#include <lima/hw/acq_info.hpp>
#include <lima/hw/params.hpp>
#include <lima/processing/pipelines/cuda/params.hpp>
......@@ -32,10 +32,11 @@ namespace processing::pipelines
public:
using proc_params_t = proc_params;
using data_t = lima::frame;
using acq_info_t = hw::acq_info;
using input_t = std::any;
using frame_t = frame;
pipeline(int rank, hw::frame_info const& frame_info, hw::acq_params const& acq_params,
proc_params_t const& proc_params);
pipeline(int rank, acq_info_t const& acq_info, proc_params_t const& proc_params);
~pipeline(); // defined in the implementation file, where impl is a complete type
pipeline(pipeline&&); // defined in the implementation file
pipeline(const pipeline&) = delete;
......@@ -52,13 +53,20 @@ namespace processing::pipelines
void abort();
/// Process a frame
void process(data_t const& frm);
void process(input_t const& frm);
/// Process multiple frames
void process(std::vector<input_t> const& frms)
{
for (auto&& frm : frms)
process(frm);
}
/// Returns the progress counters
counters get_counters() const;
/// Returns frame for the given index
data_t get_frame(std::size_t frame_idx = -1) const;
frame_t get_frame(std::size_t frame_idx = -1) const;
private:
class impl;
......
......@@ -13,6 +13,7 @@
#include <lima/utils/math.hpp>
#include <lima/processing/pipelines/cuda.hpp>
#include <lima/processing/utils/pipelines.hpp>
namespace lima
{
......@@ -23,17 +24,16 @@ namespace processing::pipelines
/// Implementation of the receiver
class pipeline::impl
{
using frame_t = lima::frame;
public:
using proc_params_t = proc_params;
impl(int rank, hw::frame_info const& frame_info, hw::acq_params const& acq_params,
proc_params_t const& proc_params) :
m_fifo(proc_params.fifo.nb_fifo_frames), m_frames_buffer(100)
impl(int rank, acq_info_t const& acq_info, proc_params_t const& proc_params) :
m_acq_info(acq_info), m_fifo(proc_params.fifo.nb_fifo_frames), m_frames_buffer(100)
{
using namespace std::chrono_literals;
auto&& acq_params = m_acq_info.params;
// Limit the number of log traces given the order of magnitude of m_nb_frames
const unsigned int log_frame_limiter = order_of_magnitude_base10(acq_params.xfer.time_slice.count, 1);
......@@ -43,10 +43,13 @@ namespace processing::pipelines
return (int) m_nb_frames_source;
// Poll for available frames
data_t frame;
while (!m_fifo.pop(frame))
input_t data;
while (!m_fifo.pop(data))
;
auto& reconstruction = m_acq_info.reconstruction;
frame_t frame = reconstruction ? reconstruction(data) : std::any_cast<frame_t>(data);
if (!(frame.metadata.idx % log_frame_limiter))
LIMA_LOG(trace, proc) << "Processing thread got frame " << frame.metadata.idx;
......@@ -85,7 +88,15 @@ namespace processing::pipelines
void abort() {}
/// Push the given frame to the FIFO of frames to process
void process(frame_t const& frm) { m_fifo.push(frm); }
void process(input_t const& data)
{
auto metadata = get_input_frame_metadata(m_acq_info, data);
if (metadata.is_final)
while (!m_fifo.push(data))
;
else if (!m_fifo.push(data))
LIMA_LOG(error, det) << "Buffer overrun: frame #" << metadata.idx;
}
counters get_counters() const { return {(int) m_nb_frames_source, (int) m_nb_frames_processed}; }
......@@ -113,8 +124,11 @@ namespace processing::pipelines
}
private:
// Acquisition info
acq_info_t m_acq_info;
// Queue of frame (runtime sized)
boost::lockfree::spsc_queue<data_t> m_fifo;
boost::lockfree::spsc_queue<input_t> m_fifo;
std::future<int> m_cuda;
// Counters
......@@ -127,9 +141,8 @@ namespace processing::pipelines
};
// Pimpl boilerplate
pipeline::pipeline(int rank, hw::frame_info const& frame_info, hw::acq_params const& acq_params,
proc_params_t const& proc_params) :
m_pimpl{std::make_unique<impl>(rank, frame_info, acq_params, proc_params)}
pipeline::pipeline(int rank, acq_info_t const& acq_info, proc_params_t const& proc_params) :
m_pimpl{std::make_unique<impl>(rank, acq_info, proc_params)}
{
}
......@@ -145,11 +158,11 @@ namespace processing::pipelines
void pipeline::abort() { m_pimpl->abort(); }
void pipeline::process(data_t const& frm) { m_pimpl->process(frm); }
void pipeline::process(input_t const& data) { m_pimpl->process(data); }
counters pipeline::get_counters() const { return m_pimpl->get_counters(); }
pipeline::data_t pipeline::get_frame(std::size_t frame_idx) const { return m_pimpl->get_frame(frame_idx); }
pipeline::frame_t pipeline::get_frame(std::size_t frame_idx) const { return m_pimpl->get_frame(frame_idx); }
} // namespace cuda
} // namespace processing::pipelines
......
......@@ -17,13 +17,18 @@ namespace processing::pipelines
{
namespace legacy
{
BOOST_DESCRIBE_STRUCT(counters, (), (nb_frames_source, nb_frames_counters, nb_frames_saved))
BOOST_DESCRIBE_STRUCT(counters, (),
(nb_frames_source, nb_frames_reconstructed, nb_frames_counters, nb_frames_saved))
// clang-format off
BOOST_ANNOTATE_MEMBER(counters, nb_frames_source,
(desc, "nb frames source"),
(doc, "The number of frame poped from the FIFO"))
BOOST_ANNOTATE_MEMBER(counters, nb_frames_reconstructed,
(desc, "nb frames reconstructed"),
(doc, "The number of frame reconstructed (if applies)"))
BOOST_ANNOTATE_MEMBER(counters, nb_frames_counters,
(desc, "nb frames roi counters"),
(doc, "The number of frame processed with ROI counters"))
......