Commit edb1f201 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Move internal Stream code to _read_zmq_messages

parent a8f0779d
......@@ -89,6 +89,8 @@ Stream::Stream(Camera& cam) :
bool is_le = (htole16(0x1234) == 0x1234);
m_endianess = (is_le ? '<' : '>');
m_buffer_mgr = &m_buffer_ctrl_obj->getBuffer();
m_zmq_context = zmq_ctx_new();
if(pipe(m_pipes))
THROW_HW_ERROR(Error) << "Can't open pipe";
......@@ -109,14 +111,12 @@ Stream::~Stream()
close(m_pipes[0]),close(m_pipes[1]);
zmq_ctx_destroy(m_zmq_context);
delete m_buffer_ctrl_obj;
}
void Stream::start()
{
AutoMutex aLock(m_cond.mutex());
m_buffer_ctrl_obj->getBuffer().setStartTimestamp(Timestamp::now());
m_buffer_mgr->setStartTimestamp(Timestamp::now());
}
void Stream::stop()
......@@ -215,7 +215,7 @@ void Stream::setActive(bool active)
HwBufferCtrlObj* Stream::getBufferCtrlObj()
{
DEB_MEMBER_FUNCT();
return m_buffer_ctrl_obj;
return m_buffer_ctrl_obj.get();
}
void* Stream::_runFunc(void *streamPt)
......@@ -281,7 +281,6 @@ void Stream::_run()
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
StdBufferCbMgr& buffer_mgr = m_buffer_ctrl_obj->getBuffer();
while(1)
{
......@@ -295,11 +294,9 @@ void Stream::_run()
m_running = true;
DEB_TRACE() << "Running";
}
if(m_stop) break;
int nb_frames;
m_cam.getNbFrames(nb_frames);
TrigMode trigger_mode;
m_cam.getTrigMode(trigger_mode);
if(m_stop)
break;
m_cam.getTrigMode(m_trigger_mode);
bool continue_flag = true;
//open stream socket
......@@ -334,112 +331,12 @@ void Stream::_run()
aLock.lock();
continue_flag = !m_wait && !m_stop;
aLock.unlock();
if (!continue_flag)
break;
}
if(items[1].revents & ZMQ_POLLIN) // reading stream
{
std::vector<MessagePtr> pending_messages;
pending_messages.reserve(9);
int more;
do {
MessagePtr msg(new Stream::Message());
_CHECK_RETURN(zmq_msg_recv(msg->get_msg(),stream_socket,0));
more = zmq_msg_more(msg->get_msg());
pending_messages.emplace_back(msg);
} while(more);
int nb_messages = pending_messages.size();
DEB_TRACE() << DEB_VAR1(nb_messages);
if(nb_messages > 0)
{
Json::Value stream_header;
continue_flag = _get_json_header(pending_messages[0],stream_header);
if(continue_flag)
{
std::string htype = stream_header.get("htype","").asString();
DEB_TRACE() << DEB_VAR1(htype);
#ifdef READ_HEADER
if(htype.find("dheader-") != std::string::npos)
{
Json::Value header;
continue_flag = _get_header(stream_header,nb_messages,
pending_messages,header);
}
else
#endif
if(htype.find("dimage-") != std::string::npos)
{
int frameid = stream_header.get("frame",-1).asInt();
DEB_TRACE() << DEB_VAR1(frameid);
//stream_header.get("hash","md5sum")
if(nb_messages < 3)
{
DEB_ERROR() << "Should receive at least 3 messages part, only received "
<< nb_messages;
break;
}
Timestamp tstart;
buffer_mgr.getStartTimestamp(tstart);
if (!tstart.isSet() || (tstart < m_activate_tstamp)) {
DEB_ERROR() << "Frame before start: " << DEB_VAR1(frameid);
continue;
}
Json::Value data_header;
if(!_get_json_header(pending_messages[1],data_header)) break;
//Data size (width,height)
Json::Value shape = data_header.get("shape","");
if(!shape.isArray() || shape.size() != 2) break;
FrameDim anImageDim;
anImageDim.setSize(Size(shape[0u].asInt(),shape[1u].asInt()));
//data type
ImageType image_type;
std::string dtype = data_header.get("type","none").asString();
if(dtype == "int32")
image_type = Bpp32S;
else if(dtype == "uint32")
image_type = Bpp32;
else if(dtype == "int16")
image_type = Bpp16S;
else if(dtype == "uint16")
image_type = Bpp16;
else
break;
anImageDim.setImageType(image_type);
DEB_TRACE() << DEB_VAR1(anImageDim);
if (frameid == 0) {
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
Camera::CompressionType comp_type;
_checkCompression(m_last_info, comp_type);
}
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = buffer_mgr.getFrameBufferPtr(frameid);
m_data_2_msg[buffer_ptr] = MessageNDepth(pending_messages[2],
anImageDim.getDepth());
#ifdef READ_HEADER
if(nb_messages == 5)
{
zmq_msg_t& msg = pending_messages[4]->msg;
char* headerpt = (char*)zmq_msg_data(&msg);
size_t header_size = zmq_msg_size(&msg);
}
#endif
continue_flag = buffer_mgr.newFrameReady(frame_info);
m_cam.m_image_number++;
if(trigger_mode != IntTrig && trigger_mode != IntTrigMult && !--nb_frames)
m_cam.disarm();
}
else if(htype.find("dseries_end-") != std::string::npos)
continue_flag = false;
}
}
continue_flag = _read_zmq_messages(stream_socket);
}
}
}
......@@ -448,7 +345,7 @@ void Stream::_run()
char error_buffer[256];
char* error_msg = strerror_r(errno,error_buffer,sizeof(error_buffer));
DEB_ERROR() << "Connection error: " << DEB_VAR2(errno,error_msg);
aLock.unlock();
aLock.unlock();
}
if(stream_socket) zmq_close(stream_socket);
......@@ -460,6 +357,119 @@ void Stream::_run()
m_running = false;
}
bool Stream::_read_zmq_messages(void *stream_socket)
{
DEB_MEMBER_FUNCT();
bool continue_flag = true;
std::vector<MessagePtr> pending_messages;
pending_messages.reserve(9);
int more;
do {
MessagePtr msg(new Stream::Message());
_CHECK_RETURN(zmq_msg_recv(msg->get_msg(),stream_socket,0));
more = zmq_msg_more(msg->get_msg());
pending_messages.emplace_back(msg);
} while(more);
int nb_messages = pending_messages.size();
DEB_TRACE() << DEB_VAR1(nb_messages);
if(nb_messages > 0)
{
Json::Value stream_header;
continue_flag = _get_json_header(pending_messages[0],stream_header);
if(!continue_flag)
return continue_flag;
std::string htype = stream_header.get("htype","").asString();
DEB_TRACE() << DEB_VAR1(htype);
#ifdef READ_HEADER
if(htype.find("dheader-") != std::string::npos)
{
Json::Value header;
continue_flag = _get_header(stream_header,nb_messages,
pending_messages,header);
}
else
#endif
if(htype.find("dimage-") != std::string::npos)
{
int frameid = stream_header.get("frame",-1).asInt();
DEB_TRACE() << DEB_VAR1(frameid);
//stream_header.get("hash","md5sum")
if(nb_messages < 3)
{
DEB_ERROR() << "Should receive at least 3 messages part, only received "
<< nb_messages;
return false;
}
Timestamp tstart;
m_buffer_mgr->getStartTimestamp(tstart);
if (!tstart.isSet() || (tstart < m_activate_tstamp)) {
DEB_ERROR() << "Frame before start: " << DEB_VAR1(frameid);
return true;
}
Json::Value data_header;
if(!_get_json_header(pending_messages[1],data_header))
return false;
//Data size (width,height)
Json::Value shape = data_header.get("shape","");
if(!shape.isArray() || shape.size() != 2)
return false;
FrameDim anImageDim;
anImageDim.setSize(Size(shape[0u].asInt(),shape[1u].asInt()));
//data type
ImageType image_type;
std::string dtype = data_header.get("type","none").asString();
if(dtype == "int32")
image_type = Bpp32S;
else if(dtype == "uint32")
image_type = Bpp32;
else if(dtype == "int16")
image_type = Bpp16S;
else if(dtype == "uint16")
image_type = Bpp16;
else
return false;
anImageDim.setImageType(image_type);
DEB_TRACE() << DEB_VAR1(anImageDim);
if (frameid == 0) {
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
Camera::CompressionType comp_type;
_checkCompression(m_last_info, comp_type);
}
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = m_buffer_mgr->getFrameBufferPtr(frameid);
m_data_2_msg[buffer_ptr] = MessageNDepth(pending_messages[2],
anImageDim.getDepth());
#ifdef READ_HEADER
if(nb_messages == 5)
{
zmq_msg_t& msg = pending_messages[4]->msg;
char* headerpt = (char*)zmq_msg_data(&msg);
size_t header_size = zmq_msg_size(&msg);
}
#endif
continue_flag = m_buffer_mgr->newFrameReady(frame_info);
m_cam.m_image_number++;
if((m_trigger_mode != IntTrig) &&
(m_trigger_mode != IntTrigMult) && m_cam.allFramesAcquired())
m_cam.disarm();
}
else if(htype.find("dseries_end-") != std::string::npos)
continue_flag = false;
}
return continue_flag;
}
void Stream::_checkCompression(const StreamInfo& info,
Camera::CompressionType& comp_type)
{
......
......@@ -74,6 +74,7 @@ namespace lima
static void* _runFunc(void*);
void _run();
bool _read_zmq_messages(void *stream_socket);
void _send_synchro();
void _checkCompression(const StreamInfo& info,
......@@ -96,7 +97,10 @@ namespace lima
Data2Message m_data_2_msg;
StreamInfo m_last_info;
Timestamp m_activate_tstamp;
_BufferCtrlObj* m_buffer_ctrl_obj;
TrigMode m_trigger_mode;
std::auto_ptr<_BufferCtrlObj> m_buffer_ctrl_obj;
StdBufferCbMgr* m_buffer_mgr;
};
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment