Commit 67e4e9d9 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Reduce overhead and thread contention in Stream::_read_zmq_messages

* Use sequence loop-dedicated variables
* Protect StreamStatistics with dedicated mutex
parent 7672f671
Pipeline #21085 failed with stages
in 1 minute and 15 seconds
......@@ -259,6 +259,7 @@ void Stream::stop()
return;
DEB_TRACE() << "Stopped";
m_state = Stopped;
_send_synchro();
while (_isRunning())
m_cond.wait();
}
......@@ -471,6 +472,9 @@ void Stream::_run_sequence()
m_cond.broadcast();
}
m_stopped = false;
m_waiting_global_header = true;
// Initialize poll set
zmq_pollitem_t items [] = {
{ NULL, m_pipes[0], ZMQ_POLLIN, 0 },
......@@ -491,6 +495,7 @@ void Stream::_run_sequence()
{
AutoMutex lock(m_cond.mutex());
continue_flag = !((m_state == Aborting) || (m_state == Quitting));
m_stopped = (m_state == Stopped);
}
if (!continue_flag)
break;
......@@ -543,24 +548,22 @@ bool Stream::_read_zmq_messages(void *stream_socket)
if (nb_messages == 0)
return true;
AutoMutex lock(m_cond.mutex());
bool waiting_global_header = (m_state == Connected);
bool stopped = (m_state == Stopped);
lock.unlock();
Timestamp data_rx_tstamp = Timestamp::now();
Json::Value stream_header = _get_json_header(pending_messages[0]);
std::string htype = stream_header.get("htype","").asString();
DEB_TRACE() << DEB_VAR1(htype);
bool is_global_header = (htype.find("dheader-") != std::string::npos);
if (is_global_header != waiting_global_header) {
if (is_global_header != m_waiting_global_header) {
DEB_WARNING() << "Global header mismatch: "
<< DEB_VAR2(is_global_header, waiting_global_header)
<< DEB_VAR2(is_global_header, m_waiting_global_header)
<< ": " << htype;
return true;
} else if (is_global_header) {
Json::Value header = _get_global_header(stream_header,pending_messages);
lock.lock();
m_waiting_global_header = false;
AutoMutex lock(m_cond.mutex());
m_state = Armed;
DEB_TRACE() << "Global header received: " << DEB_VAR1(m_state);
m_cond.broadcast();
......@@ -574,42 +577,45 @@ bool Stream::_read_zmq_messages(void *stream_socket)
<< "only received " << nb_messages;
Json::Value data_header = _get_json_header(pending_messages[1]);
//Data size (width,height)
Json::Value shape = data_header.get("shape","");
if (!shape.isArray() || shape.size() != 2)
THROW_HW_ERROR(Error) << "Invalid data shape: " << shape.asString();
FrameDim anImageDim;
anImageDim.setSize(Size(shape[0u].asInt(),shape[1u].asInt()));
//data type
ImageType image_type;
std::string dtype = data_header.get("type","none").asString();
if(dtype == "int32")
image_type = Bpp32S;
else if(dtype == "uint32")
image_type = Bpp32;
else if(dtype == "int16")
image_type = Bpp16S;
else if(dtype == "uint16")
image_type = Bpp16;
else
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(dtype);
anImageDim.setImageType(image_type);
DEB_TRACE() << DEB_VAR1(anImageDim);
if (frameid == 0) {
lock.lock();
//Data size (width,height)
Json::Value shape = data_header.get("shape","");
if (!shape.isArray() || shape.size() != 2)
THROW_HW_ERROR(Error) << "Invalid data shape: " << shape.asString();
FrameDim anImageDim;
anImageDim.setSize(Size(shape[0u].asInt(),shape[1u].asInt()));
//data type
ImageType image_type;
if(dtype == "int32")
image_type = Bpp32S;
else if(dtype == "uint32")
image_type = Bpp32;
else if(dtype == "int16")
image_type = Bpp16S;
else if(dtype == "uint16")
image_type = Bpp16;
else
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(dtype);
anImageDim.setImageType(image_type);
DEB_TRACE() << DEB_VAR1(anImageDim);
m_dtype_str = dtype;
m_depth = anImageDim.getDepth();
AutoMutex lock(m_cond.mutex());
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
_checkCompression(m_last_info);
lock.unlock();
}
} else if (dtype != m_dtype_str)
THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(dtype) << ", "
<< "expected " << DEB_VAR1(m_dtype_str);
Json::Value config_header = _get_json_header(pending_messages[3]);
if (frameid == 0)
DEB_TRACE() << DEB_VAR1(config_header["start_time"].asString());
if (stopped) {
if (m_stopped) {
DEB_TRACE() << "Stopped: ignoring data";
return true;
}
......@@ -618,18 +624,18 @@ bool Stream::_read_zmq_messages(void *stream_socket)
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = m_buffer_mgr->getFrameBufferPtr(frameid);
int data_size = data_header.get("size",-1).asInt();
Timestamp t = Timestamp::now();
{
lock.lock();
m_data_2_msg[buffer_ptr] = ImageData{pending_messages[2],
anImageDim.getDepth(),
AutoMutex lock(m_cond.mutex());
m_data_2_msg[buffer_ptr] = ImageData{pending_messages[2], m_depth,
m_comp_type};
}
{
AutoMutex stat_lock(m_stat_lock);
if (frameid > 0) {
double transfer_time = t - m_last_data_tstamp;
double transfer_time = data_rx_tstamp - m_last_data_tstamp;
m_stat.add(data_size, transfer_time);
}
m_last_data_tstamp = t;
lock.unlock();
m_last_data_tstamp = data_rx_tstamp;
}
m_cam.newFrameAcquired();
......@@ -726,14 +732,14 @@ void Stream::release_all_msgs()
void Stream::resetStatistics()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
AutoMutex stat_lock(m_stat_lock);
m_stat.reset();
}
void Stream::latchStatistics(StreamStatistics& stat, bool reset)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
AutoMutex stat_lock(m_stat_lock);
stat = m_stat;
if (reset)
m_stat.reset();
......
......@@ -125,8 +125,14 @@ namespace lima
std::unique_ptr<_BufferCtrlObj> m_buffer_ctrl_obj;
StdBufferCbMgr* m_buffer_mgr;
Timestamp m_last_data_tstamp;
bool m_stopped;
bool m_waiting_global_header;
int m_depth;
std::string m_dtype_str;
Mutex m_stat_lock;
StreamStatistics m_stat;
Timestamp m_last_data_tstamp;
};
std::ostream& operator <<(std::ostream& os, Stream::State state);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment