Commit b2a79dc4 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by Generic Bliss account for Control Software
Browse files

[PROC] Add is_finished to legacy and smx pipelines

parent fd919a8d
......@@ -8,6 +8,7 @@
#include <memory>
#include <vector>
#include <exception>
#include <lima/hw/frame_info.hpp>
#include <lima/hw/params.hpp>
......@@ -30,6 +31,8 @@ namespace processing::pipelines
int nb_frames_saved = 0;
};
using finished_callback_t = std::function<void(std::exception_ptr)>;
class PIPELINE_LEGACY_EXPORT pipeline
{
public:
......@@ -48,8 +51,11 @@ namespace processing::pipelines
/// Activate the processing (start poping data from the queue)
void activate();
/// Returns when the processing has finished
void wait_for_all();
/// Return the finished state
bool is_finished();
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished);
/// Abort the pipeline
void abort();
......
......@@ -6,6 +6,8 @@
#include <atomic>
#include <optional>
#include <stdexcept>
#include <future>
#include <boost/lockfree/spsc_queue.hpp>
......@@ -103,10 +105,33 @@ namespace processing::pipelines
}
/// Activate the processing (start poping data from the queue)
void activate() { src->activate(); }
void activate()
{
src->activate();
m_finished_future = std::async(std::launch::async, [this] {
std::exception_ptr eptr;
try {
m_graph.wait_for_all();
LIMA_LOG(trace, proc) << "Processing finished";
} catch (...) {
LIMA_LOG(error, proc) << "Processing failed";
eptr = std::current_exception();
}
// Set the is_finished flag and call the callback if registered
m_is_finished = true;
if (m_on_finished)
m_on_finished(eptr);
});
}
/// Returns true when the processing has finished
bool is_finished() const { return m_is_finished; }
/// Returns when the processing has finished
void wait_for_all() { m_graph.wait_for_all(); }
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished) { m_on_finished = on_finished; }
/// Abort the pipeline
void abort() { m_task_group_context.cancel_group_execution(); }
......@@ -193,6 +218,11 @@ namespace processing::pipelines
// Frames
boost::circular_buffer<frame_t> m_frames_buffer;
mutable std::mutex m_frames_buffer_mutex;
// Finished
std::atomic_bool m_is_finished{false};
finished_callback_t m_on_finished;
std::future<void> m_finished_future; // last member
};
// Pimpl boilerplate
......@@ -210,7 +240,12 @@ namespace processing::pipelines
void pipeline::activate() { m_pimpl->activate(); }
void pipeline::wait_for_all() { m_pimpl->wait_for_all(); }
bool pipeline::is_finished() { return m_pimpl->is_finished(); }
void pipeline::register_on_finished(finished_callback_t on_finished)
{
m_pimpl->register_on_finished(on_finished);
}
void pipeline::abort() { m_pimpl->abort(); }
......
......@@ -42,7 +42,7 @@ void processing::read_attr_hardware(TANGO_UNUSED(vector<long>& attr_list))
void processing::write_attr_hardware(TANGO_UNUSED(vector<long>& attr_list))
{
DEBUG_STREAM << "control::write_attr_hardware(vector<long> &attr_list) entering... " << std::endl;
DEBUG_STREAM << "processing::write_attr_hardware(vector<long> &attr_list) entering... " << std::endl;
// Add your own code
}
......@@ -52,6 +52,12 @@ void processing::add_dynamic_attributes()
// Add your own code to create and add dynamic attributes if any
}
bool processing::is_finished()
{
DEBUG_STREAM << "processing::is_finished() entering... " << std::endl;
return m_proc->is_finished();
}
struct roi_counter
{
std::size_t frame_idx;
......
......@@ -54,6 +54,9 @@ class processing : public TANGO_BASE_CLASS
/// Add dynamic attributes if any
void add_dynamic_attributes();
/// Get is_finished attribute
bool is_finished();
///}
/// Command related methods
......
......@@ -22,6 +22,29 @@
namespace lima::tango
{
/// Processing is_finished attribute
class is_finished_attr : public Tango::Attr
{
public:
is_finished_attr() : Tango::Attr("is_finished", Tango::DEV_BOOLEAN, Tango::READ)
{
Tango::UserDefaultAttrProp attr_prop;
attr_prop.description = "Processing finished state";
attr_prop.label = "is_finished";
set_default_properties(attr_prop);
}
void read(Tango::DeviceImpl* dev, Tango::Attribute& attr) override
{
auto d = static_cast<processing*>(dev);
auto is_finished = new Tango::DevBoolean(d->is_finished());
attr.set_value(is_finished, 1, 0, true);
}
bool is_allowed(Tango::DeviceImpl* dev, Tango::AttReqType ty) override { return true; }
};
processing_class* processing_class::_instance = nullptr;
processing_class::processing_class(std::string name) : device_class(name)
......@@ -240,8 +263,11 @@ void processing_class::attribute_factory(std::vector<Tango::Attr*>& att_list)
att_list.push_back(attr);
}
// Add is_finished attribute
att_list.push_back(new is_finished_attr());
// Create a list of static attributes
create_static_attribute_list(get_class_attr()->get_attr_list());
create_static_attribute_list(att_list);
}
void processing_class::pipe_factory() {}
......
......@@ -60,13 +60,13 @@ class processing_class : public device_class
processing_class(std::string name);
/// Create the command object(s) and store them in the command list
void command_factory();
void command_factory() override;
/// Create the attribute object(s) and store them in the attribute list
void attribute_factory(vector<Tango::Attr*>&);
void attribute_factory(vector<Tango::Attr*>&) override;
/// Create the pipe object(s) and store them in the pipe list
void pipe_factory();
void pipe_factory() override;
/// Properties management
///{
......
......@@ -33,6 +33,8 @@ namespace processing::pipelines
int nb_frames_dense_saved = 0;
};
using finished_callback_t = std::function<void(std::exception_ptr)>;
class PIPELINE_SMX_EXPORT pipeline
{
public:
......@@ -52,8 +54,11 @@ namespace processing::pipelines
/// Activate the pipeline (start poping data from the queue)
void activate();
/// Returns when the processing has finished
void wait_for_all();
/// Return the finished state
bool is_finished();
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished);
/// Abort the processing
void abort();
......
......@@ -7,6 +7,8 @@
#include <atomic>
#include <optional>
#include <mutex>
#include <stdexcept>
#include <future>
#include <boost/lockfree/spsc_queue.hpp>
......@@ -183,10 +185,33 @@ namespace processing::pipelines
}
/// Activate the processing (start poping data from the queue)
void activate() { src->activate(); }
void activate()
{
src->activate();
m_finished_future = std::async(std::launch::async, [this] {
std::exception_ptr eptr;
try {
m_graph.wait_for_all();
LIMA_LOG(trace, proc) << "Processing finished";
} catch (...) {
LIMA_LOG(error, proc) << "Processing failed";
eptr = std::current_exception();
}
// Set the is_finished flag and call the callback if registered
m_is_finished = true;
if (m_on_finished)
m_on_finished(eptr);
});
}
/// Returns true when the processing has finished
bool is_finished() const { return m_is_finished; }
/// Returns when the processing has finished
void wait_for_all() { m_graph.wait_for_all(); }
/// Register on_finished callback
void register_on_finished(finished_callback_t on_finished) { m_on_finished = on_finished; }
/// Abort the pipeline
void abort() { m_task_group_context.cancel_group_execution(); }
......@@ -320,6 +345,11 @@ namespace processing::pipelines
// Frames
boost::circular_buffer<frame_t> m_frames_buffer;
mutable std::mutex m_frames_buffer_mutex;
// Finished
std::atomic_bool m_is_finished{false};
finished_callback_t m_on_finished;
std::future<void> m_finished_future; // last member
};
// Pimpl boilerplate
......@@ -337,7 +367,12 @@ namespace processing::pipelines
void pipeline::activate() { m_pimpl->activate(); }
void pipeline::wait_for_all() { m_pimpl->wait_for_all(); }
bool pipeline::is_finished() { return m_pimpl->is_finished(); }
void pipeline::register_on_finished(finished_callback_t on_finished)
{
m_pimpl->register_on_finished(on_finished);
}
void pipeline::abort() { m_pimpl->abort(); }
......
......@@ -42,7 +42,7 @@ void processing::read_attr_hardware(TANGO_UNUSED(vector<long>& attr_list))
void processing::write_attr_hardware(TANGO_UNUSED(vector<long>& attr_list))
{
DEBUG_STREAM << "control::write_attr_hardware(vector<long> &attr_list) entering... " << std::endl;
DEBUG_STREAM << "processing::write_attr_hardware(vector<long> &attr_list) entering... " << std::endl;
// Add your own code
}
......@@ -52,6 +52,12 @@ void processing::add_dynamic_attributes()
// Add your own code to create and add dynamic attributes if any
}
bool processing::is_finished()
{
DEBUG_STREAM << "processing::is_finished() entering... " << std::endl;
return m_proc->is_finished();
}
struct roi_counter
{
std::size_t frame_idx;
......
......@@ -54,6 +54,9 @@ class processing : public TANGO_BASE_CLASS
/// Add dynamic attributes if any
void add_dynamic_attributes();
/// Get is_finished attribute
bool is_finished();
///}
/// Command related methods
......
......@@ -22,6 +22,29 @@
namespace lima::tango
{
/// Processing is_finished attribute
class is_finished_attr : public Tango::Attr
{
public:
is_finished_attr() : Tango::Attr("is_finished", Tango::DEV_BOOLEAN, Tango::READ)
{
Tango::UserDefaultAttrProp attr_prop;
attr_prop.description = "Processing finished state";
attr_prop.label = "is_finished";
set_default_properties(attr_prop);
}
void read(Tango::DeviceImpl* dev, Tango::Attribute& attr) override
{
auto d = static_cast<processing*>(dev);
auto is_finished = new Tango::DevBoolean(d->is_finished());
attr.set_value(is_finished, 1, 0, true);
}
bool is_allowed(Tango::DeviceImpl* dev, Tango::AttReqType ty) override { return true; }
};
processing_class* processing_class::_instance = nullptr;
processing_class::processing_class(std::string name) : device_class(name)
......@@ -263,8 +286,11 @@ void processing_class::attribute_factory(std::vector<Tango::Attr*>& att_list)
att_list.push_back(attr);
}
// Add is_finished attribute
att_list.push_back(new is_finished_attr());
// Create a list of static attributes
create_static_attribute_list(get_class_attr()->get_attr_list());
create_static_attribute_list(att_list);
}
void processing_class::pipe_factory() {}
......
......@@ -60,13 +60,13 @@ class processing_class : public device_class
processing_class(std::string name);
/// Create the command object(s) and store them in the command list
void command_factory();
void command_factory() override;
/// Create the attribute object(s) and store them in the attribute list
void attribute_factory(vector<Tango::Attr*>&);
void attribute_factory(vector<Tango::Attr*>&) override;
/// Create the pipe object(s) and store them in the pipe list
void pipe_factory();
void pipe_factory() override;
/// Properties management
///{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment