Commit eb1f4462 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Refactor Stream: create _ZmqThread and remove obsolete _BufferCtrlObj

* Correct debug class/namespace on SavingCtrlObj classes
parent ce8e1c60
Pipeline #21263 failed with stages
in 1 minute and 7 seconds
......@@ -59,7 +59,7 @@ static HeaderKey2Index available_header[] = {
----------------------------------------------------------------------------*/
class SavingCtrlObj::_PollingThread : public Thread
{
DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj","_PollingThread");
DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj::_PollingThread","Eiger");
public:
_PollingThread(SavingCtrlObj&,eigerapi::Requests*);
virtual ~_PollingThread();
......@@ -95,7 +95,7 @@ SavingCtrlObj::SavingCtrlObj(Camera& cam) :
----------------------------------------------------------------------------*/
class SavingCtrlObj::_EndDownloadCallback : public CurlLoop::FutureRequest::Callback
{
DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj","_EndDownloadCallback");
DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj::_EndDownloadCallback","Eiger");
public:
_EndDownloadCallback(SavingCtrlObj&,const std::string &filename);
......
......@@ -61,18 +61,6 @@ struct Stream::Message
zmq_msg_t msg;
};
// --- buffer management ---
class Stream::_BufferCtrlObj : public SoftBufferCtrlObj
{
DEB_CLASS_NAMESPC(DebModCamera,"Stream","_BufferCtrlObj");
public:
_BufferCtrlObj(Stream& stream) :
m_stream(stream)
{
}
private:
Stream& m_stream;
};
// --- Stream::ImageData ---
void Stream::ImageData::getMsgDataNSize(void*& data, size_t& size) const
......@@ -113,13 +101,65 @@ std::ostream& lima::Eiger::operator <<(std::ostream& os,
<< ">";
}
// --- Stream class ---
inline bool Stream::_isRunning() const
// --- Zmq thread ---
class Stream::_ZmqThread : public Thread
{
return ((m_state != Idle) && (m_state != Failed));
DEB_CLASS_NAMESPC(DebModCamera,"Stream::_ZmqThread","Eiger");
public:
_ZmqThread(Stream& stream);
virtual ~_ZmqThread();
protected:
virtual void threadFunction();
private:
void _run_sequence();
Json::Value _get_global_header(const Json::Value& stream_header,
MessageList& pending_messages);
Json::Value _get_json_header(MessagePtr &msg);
bool _read_zmq_messages(void *stream_socket);
void _checkCompression(const StreamInfo& info);
Stream& m_stream;
Cond& m_cond;
State& m_state;
char m_endianess;
void* m_zmq_context;
bool m_stopped;
bool m_ext_trigger;
CompressionType m_comp_type;
bool m_waiting_global_header;
int m_depth;
std::string m_dtype_str;
Timestamp m_last_data_tstamp;
};
Stream::_ZmqThread::_ZmqThread(Stream& stream)
: m_stream(stream),
m_cond(m_stream.m_cond),
m_state(m_stream.m_state)
{
DEB_CONSTRUCTOR();
bool is_le = (htole16(0x1234) == 0x1234);
m_endianess = (is_le ? '<' : '>');
m_zmq_context = zmq_ctx_new();
start();
}
inline Json::Value Stream::_get_json_header(MessagePtr &msg)
Stream::_ZmqThread::~_ZmqThread()
{
DEB_DESTRUCTOR();
zmq_ctx_destroy(m_zmq_context);
}
inline Json::Value Stream::_ZmqThread::_get_json_header(MessagePtr &msg)
{
DEB_MEMBER_FUNCT();
void* data;
......@@ -136,18 +176,19 @@ inline Json::Value Stream::_get_json_header(MessagePtr &msg)
return header;
}
inline Json::Value Stream::_get_global_header(const Json::Value& stream_header,
MessageList& pending_messages)
inline Json::Value
Stream::_ZmqThread::_get_global_header(const Json::Value& stream_header,
MessageList& pending_messages)
{
DEB_MEMBER_FUNCT();
HeaderDetail header_detail;
std::string s = stream_header.get("header_detail","").asString();
{
AutoMutex lock(m_cond.mutex());
const std::string& expected = m_header_detail_str.value();
const std::string& expected = m_stream.m_header_detail_str.value();
if (s != expected)
THROW_HW_ERROR(Error) << "Error: got" << s << ", " << DEB_VAR1(expected);
header_detail = m_header_detail;
header_detail = m_stream.m_header_detail;
}
int nb_parts;
int header_message_id;
......@@ -173,242 +214,7 @@ inline Json::Value Stream::_get_global_header(const Json::Value& stream_header,
return _get_json_header(pending_messages[header_message_id]);
}
Stream::Stream(Camera& cam) :
m_cam(cam),
m_header_detail(OFF),
m_state(Init),
m_buffer_ctrl_obj(new Stream::_BufferCtrlObj(*this))
{
DEB_CONSTRUCTOR();
bool is_le = (htole16(0x1234) == 0x1234);
m_endianess = (is_le ? '<' : '>');
m_buffer_mgr = &m_buffer_ctrl_obj->getBuffer();
m_active = _getStreamMode();
getEigerParam(m_cam,Requests::STREAM_HEADER_DETAIL,m_header_detail_str);
DEB_TRACE() << DEB_VAR1(m_header_detail_str.value());
m_zmq_context = zmq_ctx_new();
if(pipe(m_pipes))
THROW_HW_ERROR(Error) << "Can't open pipe";
pthread_create(&m_thread_id,NULL,_runFunc,this);
AutoMutex lock(m_cond.mutex());
while (m_state != Idle)
m_cond.wait();
}
Stream::~Stream()
{
DEB_DESTRUCTOR();
{
AutoMutex aLock(m_cond.mutex());
DEB_TRACE() << "Quitting";
m_state = Quitting;
m_cond.broadcast();
_send_synchro();
}
if(m_thread_id > 0)
pthread_join(m_thread_id,NULL);
close(m_pipes[0]),close(m_pipes[1]);
zmq_ctx_destroy(m_zmq_context);
}
void Stream::_setStreamMode(bool enabled)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(enabled);
std::string enabled_str = enabled ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
setEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
}
bool Stream::_getStreamMode()
{
DEB_MEMBER_FUNCT();
std::string enabled_str;
getEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
bool enabled = (enabled_str == "enabled");
DEB_RETURN() << DEB_VAR1(enabled);
return enabled;
}
void Stream::start()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if (m_state != Armed)
THROW_HW_ERROR(Error) << "Stream is not Armed (no global header)";
DEB_TRACE() << "Running";
m_state = Running;
m_buffer_mgr->setStartTimestamp(Timestamp::now());
}
void Stream::stop()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
bool connected = (m_state == Connected);
if (!_isRunning() || connected) {
if (connected)
_abort();
return;
}
DEB_TRACE() << "Stopped";
m_state = Stopped;
_send_synchro();
while (_isRunning())
m_cond.wait();
}
void Stream::abort()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
_abort();
}
void Stream::_send_synchro()
{
DEB_MEMBER_FUNCT();
if(write(m_pipes[1],"|",1) == -1)
DEB_ERROR() << "Something wrong happened!";
}
void Stream::_abort()
{
DEB_MEMBER_FUNCT();
if(m_state == Failed) {
m_state = Idle;
THROW_HW_ERROR(Error) << "Stream failed";
} else if(!_isRunning())
return;
DEB_TRACE() << "Aborting";
m_state = Aborting;
_send_synchro();
m_cond.broadcast();
while(_isRunning())
m_cond.wait();
}
bool Stream::isRunning() const
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
DEB_TRACE() << DEB_VAR1(m_state);
bool running = _isRunning();
DEB_RETURN() << DEB_VAR1(running);
return running;
}
void Stream::getHeaderDetail(Stream::HeaderDetail& detail) const
{
AutoMutex lock(m_cond.mutex());
detail = m_header_detail;
}
void Stream::setHeaderDetail(Stream::HeaderDetail detail)
{
AutoMutex lock(m_cond.mutex());
m_header_detail = detail;
}
void Stream::setActive(bool active)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(active);
AutoMutex lock(m_cond.mutex());
DEB_TRACE() << DEB_VAR2(m_active.value(), m_state);
bool is_ready = ((m_state == Connected) || (m_state == Armed));
bool do_abort = (!active && is_ready);
if(!do_abort && _isRunning()) {
DEB_WARNING() << "Stream is Running: Aborting!";
do_abort = true;
}
if(do_abort) {
_abort();
is_ready = false;
}
if(active) {
std::string s;
switch(m_header_detail) {
case ALL:
s = "all";break;
case BASIC:
s = "basic";break;
default:
s = "none";
}
if(m_header_detail_str.changed(s)) {
DEB_TRACE() << "STREAM_HEADER_DETAIL:" << DEB_VAR1(m_header_detail_str.value());
setEigerParam(m_cam,Requests::STREAM_HEADER_DETAIL,m_header_detail_str);
}
}
if(m_active.changed(active))
_setStreamMode(m_active);
if(!m_active || is_ready)
return;
m_state = Starting;
m_cond.broadcast();
while(m_state == Starting)
m_cond.wait();
if (m_state == Failed) {
m_state = Idle;
THROW_HW_ERROR(Error) << "Error starting stream";
} else if (m_state != Connected) {
THROW_HW_ERROR(Error) << "Internal error: " << DEB_VAR1(m_state);
}
}
void Stream::waitArmed(double timeout)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(timeout);
AutoMutex lock(m_cond.mutex());
Timestamp t0 = Timestamp::now();
DEB_TRACE() << DEB_VAR1(m_state);
while(m_state == Connected) {
double elapsed = Timestamp::now() - t0;
if (elapsed >= timeout)
break;
m_cond.wait(timeout - elapsed);
}
if (m_state == Failed)
m_state = Idle;
if (m_state != Armed)
THROW_HW_ERROR(Error) << "Global header not received";
}
HwBufferCtrlObj* Stream::getBufferCtrlObj()
{
DEB_MEMBER_FUNCT();
return m_buffer_ctrl_obj.get();
}
void* Stream::_runFunc(void *streamPt)
{
((Stream*)streamPt)->_run();
return NULL;
}
void Stream::_run()
void Stream::_ZmqThread::threadFunction()
{
DEB_MEMBER_FUNCT();
......@@ -439,7 +245,7 @@ void Stream::_run()
}
}
void Stream::_run_sequence()
void Stream::_ZmqThread::_run_sequence()
{
DEB_MEMBER_FUNCT();
......@@ -453,9 +259,11 @@ void Stream::_run_sequence()
};
std::unique_ptr<void, zmq_socket_deleter> socket_ptr(stream_socket);
Camera& cam = m_stream.m_cam;
char stream_endpoint[256];
snprintf(stream_endpoint,sizeof(stream_endpoint),
"tcp://%s:9999",m_cam.getDetectorIp().c_str());
"tcp://%s:9999",cam.getDetectorIp().c_str());
if(zmq_connect(stream_socket,stream_endpoint) != 0) {
char error_buffer[256];
const char *error_msg = strerror_r(errno,error_buffer,sizeof(error_buffer));
......@@ -466,10 +274,10 @@ void Stream::_run_sequence()
{
AutoMutex lock(m_cond.mutex());
TrigMode trigger_mode;
m_cam.getTrigMode(trigger_mode);
cam.getTrigMode(trigger_mode);
m_ext_trigger = ((trigger_mode != IntTrig) &&
(trigger_mode != IntTrigMult));
m_cam.getCompressionType(m_comp_type);
cam.getCompressionType(m_comp_type);
DEB_TRACE() << "Connected to " << stream_endpoint;
m_state = Connected;
......@@ -479,9 +287,11 @@ void Stream::_run_sequence()
m_stopped = false;
m_waiting_global_header = true;
int read_pipe = m_stream.m_pipes[0];
// Initialize poll set
zmq_pollitem_t items [] = {
{ NULL, m_pipes[0], ZMQ_POLLIN, 0 },
{ NULL, read_pipe, ZMQ_POLLIN, 0 },
{ stream_socket, 0, ZMQ_POLLIN, 0 }
};
......@@ -493,7 +303,7 @@ void Stream::_run_sequence()
if(items[0].revents & ZMQ_POLLIN) { // reading synchro pipe
char buffer[1024];
if(read(m_pipes[0],buffer,sizeof(buffer)) == -1)
if(read(read_pipe,buffer,sizeof(buffer)) == -1)
DEB_WARNING() << "Something strange happened!";
{
......@@ -515,13 +325,13 @@ void Stream::_run_sequence()
Event *event = new Event(Hardware, Event::Error, Event::Camera,
err_code, err_msg.str());
DEB_EVENT(*event) << DEB_VAR1(*event);
m_cam.reportEvent(event);
cam.reportEvent(event);
}
}
}
}
bool Stream::_read_zmq_messages(void *stream_socket)
bool Stream::_ZmqThread::_read_zmq_messages(void *stream_socket)
{
DEB_MEMBER_FUNCT();
......@@ -610,11 +420,12 @@ bool Stream::_read_zmq_messages(void *stream_socket)
m_dtype_str = dtype;
m_depth = anImageDim.getDepth();
StreamInfo& last_info = m_stream.m_last_info;
AutoMutex lock(m_cond.mutex());
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
_checkCompression(m_last_info);
last_info.encoding = data_header.get("encoding", "").asString();
last_info.frame_dim = anImageDim;
last_info.packed_size = data_header.get("size", "-1").asInt();
_checkCompression(last_info);
} else if (dtype != m_dtype_str)
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(dtype) << ", "
<< "expected " << DEB_VAR1(m_dtype_str);
......@@ -630,32 +441,35 @@ bool Stream::_read_zmq_messages(void *stream_socket)
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = m_buffer_mgr->getFrameBufferPtr(frameid);
StdBufferCbMgr *buffer_mgr = m_stream.m_buffer_mgr;
void* buffer_ptr = buffer_mgr->getFrameBufferPtr(frameid);
int data_size = data_header.get("size",-1).asInt();
{
Data2Message& data_2_msg = m_stream.m_data_2_msg;
AutoMutex lock(m_cond.mutex());
m_data_2_msg[buffer_ptr] = ImageData{pending_messages[2], m_depth,
m_comp_type};
data_2_msg[buffer_ptr] = ImageData{pending_messages[2], m_depth,
m_comp_type};
}
{
AutoMutex stat_lock(m_stat_lock);
AutoMutex stat_lock(m_stream.m_stat_lock);
if (frameid > 0) {
double transfer_time = data_rx_tstamp - m_last_data_tstamp;
m_stat.add(data_size, transfer_time);
m_stream.m_stat.add(data_size, transfer_time);
}
m_last_data_tstamp = data_rx_tstamp;
}
m_cam.newFrameAcquired();
bool continue_flag = m_buffer_mgr->newFrameReady(frame_info);
bool do_disarm = (m_ext_trigger && m_cam.allFramesAcquired());
Camera& cam = m_stream.m_cam;
cam.newFrameAcquired();
bool continue_flag = buffer_mgr->newFrameReady(frame_info);
bool do_disarm = (m_ext_trigger && cam.allFramesAcquired());
if (!continue_flag && !do_disarm) {
DEB_WARNING() << "Unexpected " << DEB_VAR1(continue_flag) << ": "
<< "Disarming camera";
do_disarm = true;
}
if (do_disarm)
m_cam.disarm();
cam.disarm();
return true;
} else if (htype.find("dseries_end-") != std::string::npos) {
DEB_TRACE() << "Finishing";
......@@ -666,7 +480,7 @@ bool Stream::_read_zmq_messages(void *stream_socket)
}
}
void Stream::_checkCompression(const StreamInfo& info)
void Stream::_ZmqThread::_checkCompression(const StreamInfo& info)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(info, m_endianess);
......@@ -705,6 +519,236 @@ void Stream::_checkCompression(const StreamInfo& info)
THROW_HW_ERROR(Error) << "Unexpected compression type: " << comp_type;
}
// --- Stream class ---
inline bool Stream::_isRunning() const
{
return ((m_state != Idle) && (m_state != Failed));
}
Stream::Stream(Camera& cam) :
m_cam(cam),
m_header_detail(OFF),
m_buffer_ctrl_obj(new SoftBufferCtrlObj())
{
DEB_CONSTRUCTOR();
m_buffer_mgr = &m_buffer_ctrl_obj->getBuffer();
m_active = _getStreamMode();
getEigerParam(m_cam,Requests::STREAM_HEADER_DETAIL,m_header_detail_str);
DEB_TRACE() << DEB_VAR1(m_header_detail_str.value());
if(pipe(m_pipes))
THROW_HW_ERROR(Error) << "Can't open pipe";
m_state = Init;
m_thread.reset(new _ZmqThread(*this));
AutoMutex lock(m_cond.mutex());
while (m_state != Idle)
m_cond.wait();
}
Stream::~Stream()
{
DEB_DESTRUCTOR();
{
AutoMutex aLock(m_cond.mutex());
DEB_TRACE() << "Quitting";
m_state = Quitting;
m_cond.broadcast();
_send_synchro();
}
m_thread->join();
close(m_pipes[0]),close(m_pipes[1]);
}
void Stream::_setStreamMode(bool enabled)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(enabled);
std::string enabled_str = enabled ? "enabled" : "disabled";
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
setEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
}
bool Stream::_getStreamMode()
{
DEB_MEMBER_FUNCT();
std::string enabled_str;
getEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
bool enabled = (enabled_str == "enabled");
DEB_RETURN() << DEB_VAR1(enabled);
return enabled;
}
void Stream::start()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if (m_state != Armed)
THROW_HW_ERROR(Error) << "Stream is not Armed (no global header)";
DEB_TRACE() << "Running";
m_state = Running;
m_buffer_mgr->setStartTimestamp(Timestamp::now());
}
void Stream::stop()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
bool connected = (m_state == Connected);
if (!_isRunning() || connected) {
if (connected)
_abort();
return;
}
DEB_TRACE() << "Stopped";
m_state = Stopped;
_send_synchro();
while (_isRunning())
m_cond.wait();
}
void Stream::abort()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
_abort();
}
void Stream::_send_synchro()
{