Commit a71c65bb authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Improve Stream reliability by implementing state machine

* Wait for global header before considering Stream is Armed
* Stream sequence finishes only when the series end header is received
* Stream::stop just sets state to Stopped, ignoring received data messages;
  add abort, which really interrupts Stream sequence
* Add EventCtrlObj, trigger Error Events on _read_zmq_messages exceptions
* Add Camera::State::Armed, remove obsolete Readout
parent 95e0fccc
......@@ -86,6 +86,7 @@ add_library(eiger SHARED
src/EigerInterface.cpp
src/EigerDetInfoCtrlObj.cpp
src/EigerSyncCtrlObj.cpp
src/EigerEventCtrlObj.cpp
src/EigerDecompress.cpp
src/EigerSavingCtrlObj.cpp
src/EigerStream.cpp
......
......@@ -65,7 +65,7 @@ class LIBEIGER Camera : public HwMaxImageSizeCallbackGen, public EventCallbackGe
public:
enum ApiGeneration { Eiger1, Eiger2 };
enum Status { Ready, Initializing, Exposure, Readout, Fault };
enum Status { Initializing, Ready, Armed, Exposure, Fault };
enum CompressionType {NoCompression,LZ4,BSLZ4};
Camera(const std::string& detector_ip, ApiGeneration api = Eiger1);
......@@ -148,7 +148,7 @@ class LIBEIGER Camera : public HwMaxImageSizeCallbackGen, public EventCallbackGe
void getCompression(bool&);
void setCompression(bool);
void getCompressionType(CompressionType&) const;
void getCompressionType(CompressionType&);
void setCompressionType(CompressionType);
void getSerieId(int&);
void deleteMemoryFiles();
......@@ -171,6 +171,7 @@ class LIBEIGER Camera : public HwMaxImageSizeCallbackGen, public EventCallbackGe
void _synchronize(); /// Used during plug-in initialization
void _trigger_finished(bool);
void _initialization_finished(bool ok);
void _disarm();
void _updateImageSize();
......@@ -212,10 +213,11 @@ class LIBEIGER Camera : public HwMaxImageSizeCallbackGen, public EventCallbackGe
InternalStatus m_initialize_state;
InternalStatus m_trigger_state;
bool m_armed;
int m_serie_id;
//- EigerAPI stuff
eigerapi::Requests* m_requests;
double m_temperature;
double m_humidity;
Cache<double> m_exp_time;
......
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2014
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#ifndef EIGEREVENTCTRLOBJ_H
#define EIGEREVENTCTRLOBJ_H
#include "lima/HwEventCtrlObj.h"
#include "EigerCamera.h"
namespace lima
{
namespace Eiger
{
/*******************************************************************
* \class EventCtrlObj
* \brief Control object providing Eiger event interface
*******************************************************************/
class /*LIBEIGER*/ EventCtrlObj : public HwEventCtrlObj
{
DEB_CLASS_NAMESPC(DebModCamera, "EventCtrlObj", "Eiger");
public:
EventCtrlObj(Camera& cam);
virtual ~EventCtrlObj();
private:
class EventCallback : public lima::EventCallback {
public:
EventCallback(EventCtrlObj& ctrl_obj)
: m_ctrl_obj(ctrl_obj) {}
virtual void processEvent(Event *event)
{ m_ctrl_obj.reportEvent(event); }
private:
EventCtrlObj& m_ctrl_obj;
};
Camera& m_cam;
std::unique_ptr<EventCallback> m_cbk;
};
} // namespace Eiger
} // namespace lima
#endif // EIGEREVENTCTRLOBJ
......@@ -36,6 +36,7 @@ namespace lima
class DetInfoCtrlObj;
class SyncCtrlObj;
class SavingCtrlObj;
class EventCtrlObj;
class Camera;
class Stream;
class StreamInfo;
......@@ -73,6 +74,7 @@ namespace lima
DetInfoCtrlObj* m_det_info;
SyncCtrlObj* m_sync;
SavingCtrlObj* m_saving;
EventCtrlObj* m_event;
Stream* m_stream;
Decompress* m_decompress;
};
......
......@@ -29,7 +29,7 @@ namespace Eiger
public:
enum ApiGeneration { Eiger1, Eiger2 };
enum Status { Ready, Initializing, Exposure, Readout, Fault };
enum Status { Initializing, Ready, Armed, Exposure, Fault };
enum CompressionType {NoCompression,LZ4,BSLZ4};
Camera(const std::string& detector_ip,
......@@ -102,7 +102,7 @@ namespace Eiger
void getCompression(bool& /Out/);
void setCompression(const bool);
void getCompressionType(CompressionType&) const;
void getCompressionType(CompressionType& /Out/);
void setCompressionType(CompressionType);
void getSerieId(int& /Out/);
......
......@@ -223,6 +223,7 @@ Camera::Camera(const std::string& detector_ip, ///< [in] Ip address of the dete
m_detectorImageType(Bpp16),
m_initialize_state(IDLE),
m_trigger_state(IDLE),
m_armed(false),
m_serie_id(0),
m_requests(new Requests(detector_ip)),
m_exp_time(1.),
......@@ -313,8 +314,8 @@ void Camera::prepareAcq()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if(m_trigger_state != IDLE)
disarm();
if(m_armed)
THROW_HW_ERROR(Error) << "Camera already armed";
unsigned nb_images, nb_triggers;
switch(m_trig_mode)
......@@ -359,6 +360,7 @@ void Camera::prepareAcq()
arm_cmd->wait(timeout);
DEB_TRACE() << "Arm end";
m_serie_id = arm_cmd->get_serie_id();
m_armed = true;
}
catch(const eigerapi::EigerException &e)
{
......@@ -377,7 +379,6 @@ void Camera::startAcq()
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
if(m_trig_mode == IntTrig ||
m_trig_mode == IntTrigMult)
{
......@@ -401,6 +402,7 @@ void Camera::startAcq()
void Camera::stopAcq()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
EIGER_SYNC_CMD(Requests::ABORT);
}
......@@ -696,6 +698,8 @@ Camera::Status Camera::getStatus() ///< [out] current camera status
status = Fault;
else if(m_trigger_state == RUNNING)
status = Exposure;
else if(m_armed)
status = Armed;
else if(m_initialize_state == RUNNING)
status = Initializing;
else
......@@ -1177,9 +1181,10 @@ void Camera::setCompression(bool value)
EIGER_SYNC_SET_PARAM(Requests::FILEWRITER_COMPRESSION,value);
}
void Camera::getCompressionType(Camera::CompressionType& type) const
void Camera::getCompressionType(Camera::CompressionType& type)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
type = m_compression_type;
DEB_RETURN() << DEB_VAR1(type);
}
......@@ -1214,6 +1219,7 @@ void Camera::setCompressionType(Camera::CompressionType type)
<< " not allowed";
EIGER_SYNC_SET_PARAM(Requests::COMPRESSION_TYPE, s);
AutoMutex lock(m_cond.mutex());
m_compression_type = type;
}
......@@ -1233,7 +1239,19 @@ void Camera::deleteMemoryFiles()
void Camera::disarm()
{
DEB_MEMBER_FUNCT();
EIGER_SYNC_CMD(Requests::DISARM);
AutoMutex lock(m_cond.mutex());
_disarm();
}
void Camera::_disarm()
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(m_armed);
if (m_armed) {
DEB_TRACE() << "Disarming";
EIGER_SYNC_CMD(Requests::DISARM);
m_armed = false;
}
}
const std::string& Camera::getDetectorIp() const
......
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2014
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#include "EigerEventCtrlObj.h"
using namespace lima;
using namespace lima::Eiger;
using namespace std;
//-----------------------------------------------------
// @brief Ctor
//-----------------------------------------------------
EventCtrlObj::EventCtrlObj(Camera& cam)
: m_cam(cam), m_cbk(new EventCallback(*this))
{
DEB_CONSTRUCTOR();
m_cam.registerEventCallback(*m_cbk);
}
//-----------------------------------------------------
// @brief Dtor
//-----------------------------------------------------
EventCtrlObj::~EventCtrlObj()
{
DEB_DESTRUCTOR();
m_cam.unregisterEventCallback(*m_cbk);
}
......@@ -23,6 +23,7 @@
#include "EigerCamera.h"
#include "EigerDetInfoCtrlObj.h"
#include "EigerSyncCtrlObj.h"
#include "EigerEventCtrlObj.h"
#include "EigerSavingCtrlObj.h"
#include "EigerStream.h"
#include "EigerDecompress.h"
......@@ -47,6 +48,9 @@ Interface::Interface(Camera& cam) : m_cam(cam)
m_saving = new SavingCtrlObj(cam);
m_cap_list.push_back(HwCap(m_saving));
m_event = new EventCtrlObj(cam);
m_cap_list.push_back(HwCap(m_event));
m_stream = new Stream(cam);
HwBufferCtrlObj* buffer = m_stream->getBufferCtrlObj();
......@@ -95,6 +99,10 @@ void Interface::reset(ResetLevel reset_level)
void Interface::prepareAcq()
{
DEB_MEMBER_FUNCT();
if (m_cam.getStatus() == Camera::Armed)
m_cam.disarm();
bool use_filewriter = m_saving->isActive();
m_stream->setActive(!use_filewriter);
m_decompress->setActive(!use_filewriter);
......@@ -104,6 +112,10 @@ void Interface::prepareAcq()
m_cam.prepareAcq();
int serie_id; m_cam.getSerieId(serie_id);
m_saving->setSerieId(serie_id);
if (!use_filewriter) {
double stream_armed_timeout = 5.0;
m_stream->waitArmed(stream_armed_timeout);
}
}
//-----------------------------------------------------
......@@ -112,11 +124,20 @@ void Interface::prepareAcq()
void Interface::startAcq()
{
DEB_MEMBER_FUNCT();
// either we use eiger saving or the raw stream
if(m_saving->isActive())
m_saving->start();
else
m_stream->start();
// start data retrieval subsystems only in first call
if (getNbHwAcquiredFrames() == 0) {
// either we use eiger saving or the raw stream
if(m_saving->isActive())
m_saving->start();
else
m_stream->start();
} else {
TrigMode trig_mode;
m_cam.getTrigMode(trig_mode);
if (trig_mode != IntTrigMult)
DEB_WARNING() << "Unexpected start";
}
m_cam.startAcq();
}
......@@ -173,8 +194,8 @@ void Interface::getStatus(StatusType& status)
status.set(HwInterface::StatusType::Exposure);
break;
case Camera::Readout:
status.set(HwInterface::StatusType::Readout);
case Camera::Armed:
status.set(HwInterface::StatusType::Ready);
break;
case Camera::Fault:
......
......@@ -27,8 +27,6 @@
#include <zmq.h>
#include <json/json.h>
#include <eigerapi/Requests.h>
#include <eigerapi/EigerDefines.h>
......@@ -43,7 +41,6 @@ using namespace lima::Eiger;
using namespace eigerapi;
typedef Requests::ParamReq ParamReq;
typedef Stream::MessagePtr MessagePtr;
// --- Message struct ---
struct Stream::Message
......@@ -58,6 +55,12 @@ struct Stream::Message
}
zmq_msg_t* get_msg() {return &msg;}
void get_msg_data_n_size(void*& data, size_t& size)
{
data = zmq_msg_data(&msg);
size = zmq_msg_size(&msg);
}
zmq_msg_t msg;
};
// --- buffer management ---
......@@ -76,9 +79,24 @@ private:
// --- Stream::ImageData ---
void Stream::ImageData::getMsgDataNSize(void*& data, size_t& size) const
{
zmq_msg_t *zmq_msg = msg->get_msg();
data = zmq_msg_data(zmq_msg);
size = zmq_msg_size(zmq_msg);
msg->get_msg_data_n_size(data, size);
}
std::ostream& lima::Eiger::operator <<(std::ostream& os, Stream::State state)
{
const char *name;
switch (state) {
case Stream::State::Init: name = "Init"; break;
case Stream::State::Idle: name = "Idle"; break;
case Stream::State::Connected: name = "Connected"; break;
case Stream::State::Armed: name = "Armed"; break;
case Stream::State::Running: name = "Running"; break;
case Stream::State::Failed: name = "Failed"; break;
case Stream::State::Stopped: name = "Stopped"; break;
case Stream::State::Aborted: name = "Aborted"; break;
default: name = "Unknown";
}
return os << name;
}
std::ostream& lima::Eiger::operator <<(std::ostream& os,
......@@ -96,14 +114,67 @@ std::ostream& lima::Eiger::operator <<(std::ostream& os,
}
// --- Stream class ---
inline bool Stream::_isRunning() const
{
return ((m_state == Connected) || (m_state == Armed) || (m_state == Running));
}
inline Json::Value Stream::_get_json_header(MessagePtr &msg)
{
DEB_MEMBER_FUNCT();
void* data;
size_t data_size;
msg->get_msg_data_n_size(data, data_size);
DEB_TRACE() << "json_header=" << std::string((char *) data, data_size);
const char* begin = (const char*)data;
const char* end = begin + data_size;
Json::Value header;
Json::Reader reader;
if (!reader.parse(begin,end,header))
THROW_HW_ERROR(Error) << "Error parsing header: " << std::string(begin, end);
return header;
}
inline Json::Value Stream::_get_global_header(const Json::Value& stream_header,
MessageList& pending_messages)
{
DEB_MEMBER_FUNCT();
std::string header_detail = stream_header.get("header_detail","").asString();
if (header_detail != m_header_detail_str)
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(header_detail) << ", "
<< "expected " << m_header_detail_str;
int nb_parts;
int header_message_id;
switch (m_header_detail) {
case OFF:
nb_parts = 1;
header_message_id = 0;
break;
case BASIC:
nb_parts = 2;
header_message_id = 1;
break;
case ALL:
nb_parts = 8;
header_message_id = 1;
break;
}
int nb_messages = pending_messages.size();
if (nb_messages < nb_parts)
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(nb_messages)
<< " for " << DEB_VAR1(m_header_detail_str);
return _get_json_header(pending_messages[header_message_id]);
}
Stream::Stream(Camera& cam) :
m_cam(cam),
m_active(false),
m_header_detail(OFF),
m_dirty_flag(true),
m_wait(true),
m_running(false),
m_stop(false),
m_state(Init),
m_quit(false),
m_buffer_ctrl_obj(new Stream::_BufferCtrlObj(*this))
{
DEB_CONSTRUCTOR();
......@@ -118,12 +189,16 @@ Stream::Stream(Camera& cam) :
THROW_HW_ERROR(Error) << "Can't open pipe";
pthread_create(&m_thread_id,NULL,_runFunc,this);
AutoMutex lock(m_cond.mutex());
while (m_state != Idle)
m_cond.wait();
}
Stream::~Stream()
{
AutoMutex aLock(m_cond.mutex());
m_stop = true;
m_quit = true;
m_cond.broadcast();
aLock.unlock();
_send_synchro();
......@@ -137,13 +212,23 @@ Stream::~Stream()
void Stream::start()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if (m_state != Armed)
THROW_HW_ERROR(Error) << "Stream is not Armed (no global header)";
DEB_TRACE() << "Running";
m_state = Running;
m_buffer_mgr->setStartTimestamp(Timestamp::now());
}
void Stream::stop()
{
setActive(false);
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if (!_isRunning())
return;
DEB_TRACE() << "Stopped";
m_state = Stopped;
}
void Stream::_send_synchro()
......@@ -156,8 +241,12 @@ void Stream::_send_synchro()
bool Stream::isRunning() const
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
return m_running;
DEB_TRACE() << DEB_VAR1(m_state);
bool running = _isRunning();
DEB_RETURN() << DEB_VAR1(running);
return running;
}
void Stream::getHeaderDetail(Stream::HeaderDetail& detail) const
......@@ -178,55 +267,78 @@ void Stream::setActive(bool active)
DEB_PARAM() << DEB_VAR1(active);
AutoMutex lock(m_cond.mutex());
//Don't resend parameters if not changed
if(active != m_active || m_dirty_flag)
{
const char* header_detail_str;
switch(m_header_detail)
{
case ALL:
header_detail_str = "all";break;
case BASIC:
header_detail_str = "basic";break;
default:
header_detail_str = "none";break;
}
DEB_TRACE() << "STREAM_HEADER_DETAIL:" << DEB_VAR1(header_detail_str);
ParamReq header_detail_req = m_cam.m_requests->set_param(Requests::STREAM_HEADER_DETAIL,
header_detail_str);
try {
header_detail_req->wait();
} catch (const eigerapi::EigerException &e) {
m_cam.m_requests->cancel(header_detail_req);
THROW_HW_ERROR(Error) << e.what();
}
const char* active_str = active ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(active_str);
ParamReq active_req = m_cam.m_requests->set_param(Requests::STREAM_MODE,
active_str);
try {
active_req->wait();
} catch (const eigerapi::EigerException &e) {
m_cam.m_requests->cancel(active_req);
THROW_HW_ERROR(Error) << e.what();
}
}
m_dirty_flag = false;
DEB_TRACE() << DEB_VAR2(m_active, m_state);
if(active == m_active)
return;
m_active = active;
m_wait = !m_active;
if(m_active)
m_activate_tstamp = Timestamp::now();
else
if (!active) {
DEB_TRACE() << "Aborted";
m_state = Aborted;
_send_synchro();
m_cond.broadcast();
while(m_active)
m_cond.wait();
} else if(m_dirty_flag) { //Send parameters only if changed
switch(m_header_detail) {
case ALL:
m_header_detail_str = "all";break;
case BASIC:
m_header_detail_str = "basic";break;
default:
m_header_detail_str = "none";break;
}
DEB_TRACE() << "STREAM_HEADER_DETAIL:" << DEB_VAR1(m_header_detail_str);
ParamReq header_detail_req = m_cam.m_requests->set_param(Requests::STREAM_HEADER_DETAIL,
m_header_detail_str);
try {
header_detail_req->wait();
} catch (const eigerapi::EigerException &e) {
m_cam.m_requests->cancel(header_detail_req);
THROW_HW_ERROR(Error) << e.what();
}
m_dirty_flag = false;
}
const char* active_str = active ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(active_str);
ParamReq active_req = m_cam.m_requests->set_param(Requests::STREAM_MODE,
active_str);
try {
active_req->wait();
} catch (const eigerapi::EigerException &e) {
m_cam.m_requests->cancel(active_req);
THROW_HW_ERROR(Error) << e.what();
}