Commit a90f92e4 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Protect volatile members in Stream::_read_zmq_messages

* Create Stream::ImageData from MessageNData and include comp_type
* Add some optimizations
parent edb1f201
......@@ -57,16 +57,17 @@ void _expand(void *src,Data& dst)
Data _DecompressTask::process(Data& out)
{
void *msg_data;
size_t msg_size;
int depth;
void *lima_buffer = out.data();
if(!m_stream.get_msg(lima_buffer,msg_data,msg_size,depth))
Stream::ImageData img_data;
if(!m_stream.get_msg(lima_buffer,img_data))
throw ProcessException("_DecompressTask: can't find compressed message");
void *msg_data;
size_t msg_size;
img_data.getMsgDataNSize(msg_data, msg_size);
const size_t& depth = img_data.depth;
const Camera::CompressionType& type = img_data.comp_type;
bool expand_16_to_32bit = ((out.depth() == 4) && (depth == 2));
int size = out.size() / (expand_16_to_32bit ? 2 : 1);
Camera::CompressionType type;
m_stream.getCompressionType(type);
bool decompress = (type != Camera::NoCompression);
void *aux_buffer = NULL;
if(expand_16_to_32bit && decompress)
......@@ -76,7 +77,8 @@ Data _DecompressTask::process(Data& out)
void *decompress_out = expand_16_to_32bit ? aux_buffer : lima_buffer;
int return_code = 0;
if (type == Camera::LZ4) {
return_code = LZ4_decompress_fast((const char*)msg_data,(char*)decompress_out,size);
return_code = LZ4_decompress_fast((const char*)msg_data,
(char*)decompress_out,size);
} else if (type == Camera::BSLZ4) {
struct bslz4_data {
uint64_t data_size;
......@@ -88,7 +90,8 @@ Data _DecompressTask::process(Data& out)
throw ProcessException("Data size mismatch");
size_t nb_elements = data_size / depth;
size_t block_size = be32toh(d->block_size) / depth;
return_code = bshuf_decompress_lz4(d->data, decompress_out, nb_elements, depth, block_size);
return_code = bshuf_decompress_lz4(d->data, decompress_out, nb_elements,
depth, block_size);
}
if(return_code < 0)
{
......
......@@ -73,6 +73,28 @@ private:
Stream& m_stream;
};
// --- Stream::ImageData ---
void Stream::ImageData::getMsgDataNSize(void*& data, size_t& size) const
{
zmq_msg_t *zmq_msg = msg->get_msg();
data = zmq_msg_data(zmq_msg);
size = zmq_msg_size(zmq_msg);
}
std::ostream& lima::Eiger::operator <<(std::ostream& os,
const Stream::ImageData& img_data)
{
void *msg_data;
size_t msg_size;
img_data.getMsgDataNSize(msg_data, msg_size);
return os << "<"
<< "data=" << msg_data << ", "
<< "size=" << msg_size << ", "
<< "depth=" << img_data.depth << ", "
<< "comp_type=" << img_data.comp_type
<< ">";
}
// --- Stream class ---
Stream::Stream(Camera& cam) :
m_cam(cam),
......@@ -138,11 +160,6 @@ bool Stream::isRunning() const
return m_running;
}
void Stream::getCompressionType(Camera::CompressionType& type) const
{
m_cam.getCompressionType(type);
}
void Stream::getHeaderDetail(Stream::HeaderDetail& detail) const
{
AutoMutex lock(m_cond.mutex());
......@@ -297,6 +314,7 @@ void Stream::_run()
if(m_stop)
break;
m_cam.getTrigMode(m_trigger_mode);
m_cam.getCompressionType(m_comp_type);
bool continue_flag = true;
//open stream socket
......@@ -357,6 +375,45 @@ void Stream::_run()
m_running = false;
}
inline void Stream::_checkCompression(const StreamInfo& info)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(info, m_endianess);
const std::string& encoding = info.encoding;
char endianess = *encoding.rbegin();
if (endianess != m_endianess)
THROW_HW_ERROR(Error) << "Endianess mismatch: "
<< "got " << endianess << ", "
<< "expected " << m_endianess;
CompressionType comp_type;
if (encoding == std::string(1, m_endianess)) {
comp_type = Camera::NoCompression;
} else {
const std::string lz4 = std::string("lz4") + m_endianess;
if (encoding == lz4) {
comp_type = Camera::LZ4;
} else {
const char *bs;
switch (info.frame_dim.getImageType()) {
case Bpp32S: case Bpp32: bs = "bs32-"; break;
case Bpp16S: case Bpp16: bs = "bs16-"; break;
case Bpp8S: case Bpp8: bs = "bs8-"; break;
}
if (encoding == std::string(bs) + lz4)
comp_type = Camera::BSLZ4;
else
THROW_HW_ERROR(Error) << "Unexpected encoding: " << encoding;
}
}
DEB_TRACE() << DEB_VAR1(comp_type);
if (comp_type != m_comp_type)
THROW_HW_ERROR(Error) << "Unexpected compression type: " << comp_type;
}
bool Stream::_read_zmq_messages(void *stream_socket)
{
DEB_MEMBER_FUNCT();
......@@ -404,7 +461,10 @@ bool Stream::_read_zmq_messages(void *stream_socket)
}
Timestamp tstart;
m_buffer_mgr->getStartTimestamp(tstart);
{
AutoMutex lock(m_cond.mutex());
m_buffer_mgr->getStartTimestamp(tstart);
}
if (!tstart.isSet() || (tstart < m_activate_tstamp)) {
DEB_ERROR() << "Frame before start: " << DEB_VAR1(frameid);
return true;
......@@ -437,18 +497,23 @@ bool Stream::_read_zmq_messages(void *stream_socket)
DEB_TRACE() << DEB_VAR1(anImageDim);
if (frameid == 0) {
AutoMutex lock(m_cond.mutex());
m_last_info.encoding = data_header.get("encoding", "").asString();
m_last_info.frame_dim = anImageDim;
m_last_info.packed_size = data_header.get("size", "-1").asInt();
Camera::CompressionType comp_type;
_checkCompression(m_last_info, comp_type);
lock.unlock();
_checkCompression(m_last_info);
}
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = m_buffer_mgr->getFrameBufferPtr(frameid);
m_data_2_msg[buffer_ptr] = MessageNDepth(pending_messages[2],
anImageDim.getDepth());
{
AutoMutex lock(m_cond.mutex());
m_data_2_msg[buffer_ptr] = ImageData{pending_messages[2],
anImageDim.getDepth(),
m_comp_type};
}
#ifdef READ_HEADER
if(nb_messages == 5)
......@@ -470,48 +535,6 @@ bool Stream::_read_zmq_messages(void *stream_socket)
return continue_flag;
}
void Stream::_checkCompression(const StreamInfo& info,
Camera::CompressionType& comp_type)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(info, m_endianess);
const std::string& encoding = info.encoding;
char endianess = *encoding.rbegin();
if (endianess != m_endianess)
THROW_HW_ERROR(Error) << "Endianess mismatch: "
<< "got " << endianess << ", "
<< "expected " << m_endianess;
if (encoding == std::string(1, m_endianess)) {
comp_type = Camera::NoCompression;
} else {
const std::string lz4 = std::string("lz4") + m_endianess;
if (encoding == lz4) {
comp_type = Camera::LZ4;
} else {
const char *bs;
switch (info.frame_dim.getImageType()) {
case Bpp32S: case Bpp32: bs = "bs32-"; break;
case Bpp16S: case Bpp16: bs = "bs16-"; break;
case Bpp8S: case Bpp8: bs = "bs8-"; break;
}
if (encoding == std::string(bs) + lz4)
comp_type = Camera::BSLZ4;
else
THROW_HW_ERROR(Error) << "Unexpected encoding: " << encoding;
}
}
Camera::CompressionType expected_comp_type;
getCompressionType(expected_comp_type);
if (comp_type != expected_comp_type)
THROW_HW_ERROR(Error) << "Unexpected compression type: " << comp_type;
DEB_RETURN() << DEB_VAR1(comp_type);
}
void Stream::getLastStreamInfo(StreamInfo& last_info)
{
DEB_MEMBER_FUNCT();
......@@ -519,7 +542,7 @@ void Stream::getLastStreamInfo(StreamInfo& last_info)
DEB_RETURN() << DEB_VAR1(last_info);
}
bool Stream::get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,int& depth)
bool Stream::get_msg(void* aDataBuffer,ImageData& img_data)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aDataBuffer);
......@@ -528,13 +551,10 @@ bool Stream::get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,int& dep
Data2Message::iterator it = m_data_2_msg.find(aDataBuffer);
if(it == m_data_2_msg.end())
return false;
MessageNDepth message_depth = it->second;
MessagePtr message = message_depth.first;
depth = message_depth.second;
msg_data = zmq_msg_data(message->get_msg());
msg_size = zmq_msg_size(message->get_msg());
DEB_RETURN() << DEB_VAR2(msg_data,msg_size);
img_data = it->second;
lock.unlock();
if (DEB_CHECK_ANY(DebTypeReturn))
DEB_RETURN() << DEB_VAR1(img_data);
return true;
}
......
......@@ -38,9 +38,17 @@ namespace lima
public:
class Message;
typedef std::shared_ptr<Stream::Message> MessagePtr;
typedef Camera::CompressionType CompressionType;
enum HeaderDetail {ALL,BASIC,OFF};
struct ImageData {
MessagePtr msg;
int depth;
CompressionType comp_type;
void getMsgDataNSize(void*& data, size_t& size) const;
};
Stream(Camera&);
~Stream();
......@@ -48,8 +56,6 @@ namespace lima
void stop();
bool isRunning() const;
void getCompressionType(Camera::CompressionType& type) const;
void getHeaderDetail(HeaderDetail&) const;
void setHeaderDetail(HeaderDetail);
......@@ -58,8 +64,7 @@ namespace lima
HwBufferCtrlObj* getBufferCtrlObj();
bool get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,
int& depth);
bool get_msg(void* aDataBuffer,ImageData& img_data);
void release_msg(void* aDataBuffer);
void release_all_msgs();
......@@ -69,16 +74,14 @@ namespace lima
class _BufferCtrlObj;
friend class _BufferCtrlObj;
typedef std::pair<MessagePtr,int> MessageNDepth;
typedef std::map<void*,MessageNDepth> Data2Message;
typedef std::map<void*,ImageData> Data2Message;
static void* _runFunc(void*);
void _run();
bool _read_zmq_messages(void *stream_socket);
void _send_synchro();
void _checkCompression(const StreamInfo& info,
Camera::CompressionType& comp_type);
void _checkCompression(const StreamInfo& info);
Camera& m_cam;
char m_endianess;
......@@ -98,10 +101,14 @@ namespace lima
StreamInfo m_last_info;
Timestamp m_activate_tstamp;
TrigMode m_trigger_mode;
CompressionType m_comp_type;
std::auto_ptr<_BufferCtrlObj> m_buffer_ctrl_obj;
StdBufferCbMgr* m_buffer_mgr;
};
std::ostream& operator <<(std::ostream& os,
const Stream::ImageData& img_data);
}
}
#endif // EIGERSTREAM_H
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment