Commit 09ff676c authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Improve Stream reliability:

* Add Starting and Quitting states, renamed Aborted to Aborting
* Modify remote stream_mode only when changed
* Improve protection against race conditions
parent eb835bc1
......@@ -175,6 +175,7 @@ class LIBEIGER Camera : public HwMaxImageSizeCallbackGen, public EventCallbackGe
void _updateImageSize();
void newFrameAcquired();
bool allFramesAcquired();
template <typename C>
......
......@@ -555,7 +555,8 @@ void Camera::getNbFrames(int& nb_frames) ///< [out] current number of frames to
//-----------------------------------------------------------------------------
void Camera::getNbHwAcquiredFrames(int &nb_acq_frames) ///< [out] number of acquired files
{
DEB_MEMBER_FUNCT();
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
nb_acq_frames = m_image_number;
}
......@@ -731,6 +732,14 @@ void Camera::_trigger_finished(bool ok)
m_trigger_state = ok ? IDLE : ERROR;
}
void Camera::newFrameAcquired()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
m_image_number++;
DEB_TRACE() << DEB_VAR1(m_image_number);
}
bool Camera::allFramesAcquired()
{
DEB_MEMBER_FUNCT();
......
......@@ -406,7 +406,7 @@ void SavingCtrlObj::_download_finished(std::string filename, bool ok,
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR3(filename, ok, error);
m_cam.m_image_number++;
m_cam.newFrameAcquired();
AutoMutex lock(m_cond.mutex());
if(!ok)
......
......@@ -86,12 +86,14 @@ std::ostream& lima::Eiger::operator <<(std::ostream& os, Stream::State state)
switch (state) {
case Stream::State::Init: name = "Init"; break;
case Stream::State::Idle: name = "Idle"; break;
case Stream::State::Starting: name = "Starting"; break;
case Stream::State::Connected: name = "Connected"; break;
case Stream::State::Failed: name = "Failed"; break;
case Stream::State::Armed: name = "Armed"; break;
case Stream::State::Running: name = "Running"; break;
case Stream::State::Failed: name = "Failed"; break;
case Stream::State::Stopped: name = "Stopped"; break;
case Stream::State::Aborted: name = "Aborted"; break;
case Stream::State::Aborting: name = "Aborting"; break;
case Stream::State::Quitting: name = "Quitting"; break;
default: name = "Unknown";
}
return os << name;
......@@ -168,11 +170,9 @@ inline Json::Value Stream::_get_global_header(const Json::Value& stream_header,
Stream::Stream(Camera& cam) :
m_cam(cam),
m_active(false),
m_header_detail(OFF),
m_dirty_flag(true),
m_state(Init),
m_quit(false),
m_buffer_ctrl_obj(new Stream::_BufferCtrlObj(*this))
{
DEB_CONSTRUCTOR();
......@@ -181,6 +181,7 @@ Stream::Stream(Camera& cam) :
m_endianess = (is_le ? '<' : '>');
m_buffer_mgr = &m_buffer_ctrl_obj->getBuffer();
m_active = _getStreamMode();
m_zmq_context = zmq_ctx_new();
if(pipe(m_pipes))
......@@ -195,11 +196,15 @@ Stream::Stream(Camera& cam) :
Stream::~Stream()
{
AutoMutex aLock(m_cond.mutex());
m_quit = true;
m_cond.broadcast();
aLock.unlock();
_send_synchro();
DEB_DESTRUCTOR();
{
AutoMutex aLock(m_cond.mutex());
DEB_TRACE() << "Quitting";
m_state = Quitting;
m_cond.broadcast();
_send_synchro();
}
if(m_thread_id > 0)
pthread_join(m_thread_id,NULL);
......@@ -208,6 +213,26 @@ Stream::~Stream()
zmq_ctx_destroy(m_zmq_context);
}
void Stream::_setStreamMode(bool enabled)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(enabled);
std::string enabled_str = enabled ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
setEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
}
bool Stream::_getStreamMode()
{
DEB_MEMBER_FUNCT();
std::string enabled_str;
getEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
bool enabled = (enabled_str == "enabled");
DEB_RETURN() << DEB_VAR1(enabled);
return enabled;
}
void Stream::start()
{
DEB_MEMBER_FUNCT();
......@@ -229,6 +254,13 @@ void Stream::stop()
m_state = Stopped;
}
void Stream::abort()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
_abort();
}
void Stream::_send_synchro()
{
DEB_MEMBER_FUNCT();
......@@ -237,6 +269,24 @@ void Stream::_send_synchro()
DEB_ERROR() << "Something wrong happened!";
}
void Stream::_abort()
{
DEB_MEMBER_FUNCT();
if((m_state == Running) || (m_state == Stopped)) {
DEB_TRACE() << "Aborting";
m_state = Aborting;
_send_synchro();
m_cond.broadcast();
while((m_state != Idle) && (m_state != Failed))
m_cond.wait();
}
if (m_state == Failed) {
m_state = Idle;
THROW_HW_ERROR(Error) << "Stream failed";
}
}
bool Stream::isRunning() const
{
DEB_MEMBER_FUNCT();
......@@ -266,16 +316,13 @@ void Stream::setActive(bool active)
AutoMutex lock(m_cond.mutex());
DEB_TRACE() << DEB_VAR2(m_active, m_state);
if(active == m_active)
return;
if (!active) {
DEB_TRACE() << "Aborted";
m_state = Aborted;
_send_synchro();
m_cond.broadcast();
while(m_active)
m_cond.wait();
// wait for previous sequence to finish
while((m_state == Running) || (m_state == Stopped))
m_cond.wait();
if(!active) {
_abort();
} else if(m_dirty_flag) { //Send parameters only if changed
switch(m_header_detail) {
case ALL:
......@@ -291,18 +338,24 @@ void Stream::setActive(bool active)
m_dirty_flag = false;
}
const char* active_str = active ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(active_str);
setEigerParam(m_cam,Requests::STREAM_MODE,active_str);
if(active != m_active) {
_setStreamMode(active);
m_active = active;
}
if(!m_active || (m_state == Connected) || (m_state == Armed))
return;
m_active = active;
m_state = Starting;
m_cond.broadcast();
while(m_active == (m_state == Idle))
while(m_state == Starting)
m_cond.wait();
if (m_state == Failed) {
m_state = Idle;
THROW_HW_ERROR(Error) << "Error starting stream";
} else if (m_state != Connected) {
THROW_HW_ERROR(Error) << "Internal error: " << DEB_VAR1(m_state);
}
}
......@@ -347,22 +400,24 @@ void Stream::_run()
DEB_TRACE() << "Wait";
m_cond.broadcast();
while(!m_active && !m_quit)
while((m_state == Idle) || (m_state == Failed))
m_cond.wait();
if(m_quit)
if(m_state == Quitting)
break;
DEB_TRACE() << "Running";
lock.unlock();
else if(m_state == Starting)
DEB_TRACE() << "Running: " << DEB_VAR1(m_state);
else
DEB_ERROR() << "Invalid " << DEB_VAR1(m_state);
try {
_run_sequence();
{
AutoMutexUnlock u(lock);
_run_sequence();
}
m_state = Idle;
} catch (Exception& e) {
m_state = Failed;
}
lock.lock();
m_active = false;
}
}
......@@ -390,16 +445,18 @@ void Stream::_run_sequence()
<< DEB_VAR2(errno, error_msg);
}
AutoMutex lock(m_cond.mutex());
TrigMode trigger_mode;
m_cam.getTrigMode(trigger_mode);
m_ext_trigger = ((trigger_mode != IntTrig) && (trigger_mode != IntTrigMult));
m_cam.getCompressionType(m_comp_type);
DEB_TRACE() << "Connected to " << stream_endpoint;
m_state = Connected;
m_cond.broadcast();
lock.unlock();
{
AutoMutex lock(m_cond.mutex());
TrigMode trigger_mode;
m_cam.getTrigMode(trigger_mode);
m_ext_trigger = ((trigger_mode != IntTrig) &&
(trigger_mode != IntTrigMult));
m_cam.getCompressionType(m_comp_type);
DEB_TRACE() << "Connected to " << stream_endpoint;
m_state = Connected;
m_cond.broadcast();
}
// Initialize poll set
zmq_pollitem_t items [] = {
......@@ -418,9 +475,10 @@ void Stream::_run_sequence()
if(read(m_pipes[0],buffer,sizeof(buffer)) == -1)
DEB_WARNING() << "Something strange happened!";
lock.lock();
continue_flag = !((m_state == Aborted) || m_quit);
lock.unlock();
{
AutoMutex lock(m_cond.mutex());
continue_flag = !((m_state == Aborting) || (m_state == Quitting));
}
if (!continue_flag)
break;
}
......@@ -529,8 +587,8 @@ bool Stream::_read_zmq_messages(void *stream_socket)
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
lock.unlock();
_checkCompression(m_last_info);
lock.unlock();
}
Json::Value config_header = _get_json_header(pending_messages[3]);
......@@ -554,7 +612,7 @@ bool Stream::_read_zmq_messages(void *stream_socket)
}
bool continue_flag = m_buffer_mgr->newFrameReady(frame_info);
m_cam.m_image_number++;
m_cam.newFrameAcquired();
bool do_disarm = (m_ext_trigger && m_cam.allFramesAcquired());
if (!continue_flag && !do_disarm) {
DEB_WARNING() << "Unexpected " << DEB_VAR1(continue_flag) << ": "
......@@ -565,6 +623,7 @@ bool Stream::_read_zmq_messages(void *stream_socket)
m_cam.disarm();
return true;
} else if (htype.find("dseries_end-") != std::string::npos) {
DEB_TRACE() << "Finishing";
return false;
} else {
DEB_WARNING() << "Unknown header: " << htype;
......
......@@ -43,7 +43,8 @@ namespace lima
typedef Camera::CompressionType CompressionType;
enum HeaderDetail {ALL,BASIC,OFF};
enum State {Init,Idle,Connected,Armed,Running,Failed,Stopped,Aborted};
enum State {Init,Idle,Starting,Connected,Failed,Armed,Running,
Stopped,Aborting,Quitting};
struct ImageData {
MessagePtr msg;
......@@ -57,6 +58,7 @@ namespace lima
void start();
void stop();
void abort();
bool isRunning() const;
void waitArmed(double timeout);
......@@ -91,9 +93,12 @@ namespace lima
Json::Value _get_json_header(MessagePtr &msg);
bool _read_zmq_messages(void *stream_socket);
void _send_synchro();
void _abort();
void _checkCompression(const StreamInfo& info);
void _setStreamMode(bool enabled);
bool _getStreamMode();
Camera& m_cam;
char m_endianess;
bool m_active;
......@@ -103,7 +108,6 @@ namespace lima
State m_state;
mutable Cond m_cond;
bool m_quit;
pthread_t m_thread_id;
void* m_zmq_context;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment