Commit 870e9b8a authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Remove Stream::_BufferCallback, release zmq messages after decompressing

parent 51a2f794
......@@ -102,6 +102,8 @@ Data _DecompressTask::process(Data& out)
if(aux_buffer)
free(aux_buffer);
m_stream.release_msg(lima_buffer);
return out;
}
......
......@@ -99,6 +99,8 @@ void Interface::prepareAcq()
m_stream->setActive(!use_filewriter);
m_decompress->setActive(!use_filewriter);
m_stream->release_all_msgs();
m_cam.prepareAcq();
int serie_id; m_cam.getSerieId(serie_id);
m_saving->setSerieId(serie_id);
......
......@@ -56,79 +56,6 @@ struct Stream::Message
zmq_msg_t msg;
};
// --- Compression buffer management ---
class Stream::_BufferCallback : public HwBufferCtrlObj::Callback
{
DEB_CLASS_NAMESPC(DebModCamera,"Stream","_BufferCallback");
typedef std::pair<std::shared_ptr<Stream::Message>,int> MessageNDepth;
typedef std::map<void*,MessageNDepth> Data2Message;
typedef std::multiset<void *> BufferList;
public:
_BufferCallback() : HwBufferCtrlObj::Callback() {}
virtual ~_BufferCallback() {releaseAll();}
virtual void map(void* address)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(address);
AutoMutex lock(m_mutex);
m_buffer_in_use.insert(address);
}
virtual void release(void* address)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(address);
AutoMutex lock(m_mutex);
BufferList::iterator it = m_buffer_in_use.find(address);
if(it == m_buffer_in_use.end())
THROW_HW_ERROR(Error) << "Internal error: releasing buffer not in used list";
m_buffer_in_use.erase(it++);
if(it == m_buffer_in_use.end() || *it != address)
m_data_2_msg.erase(address);
}
virtual void releaseAll()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_mutex);
m_buffer_in_use.clear();
m_data_2_msg.clear();
}
void register_new_msg(std::shared_ptr<Stream::Message>& msg,void* aDataBuffer,int depth)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aDataBuffer);
AutoMutex lock(m_mutex);
m_data_2_msg[aDataBuffer] = MessageNDepth(msg,depth);
}
bool get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,int& depth)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aDataBuffer);
AutoMutex lock(m_mutex);
Data2Message::iterator it = m_data_2_msg.find(aDataBuffer);
if(it == m_data_2_msg.end())
return false;
MessageNDepth message_depth = it->second;
std::shared_ptr<Stream::Message> message = message_depth.first;
depth = message_depth.second;
msg_data = zmq_msg_data(message->get_msg());
msg_size = zmq_msg_size(message->get_msg());
DEB_RETURN() << DEB_VAR2(msg_data,msg_size);
return true;
}
private:
Mutex m_mutex;
Data2Message m_data_2_msg;
BufferList m_buffer_in_use;
};
// --- buffer management ---
class Stream::_BufferCtrlObj : public SoftBufferCtrlObj
{
......@@ -138,10 +65,6 @@ public:
m_stream(stream)
{
}
virtual HwBufferCtrlObj::Callback* getBufferCallback()
{
return m_stream.m_buffer_cbk;
}
private:
Stream& m_stream;
};
......@@ -155,7 +78,6 @@ Stream::Stream(Camera& cam) :
m_wait(true),
m_running(false),
m_stop(false),
m_buffer_cbk(new Stream::_BufferCallback()),
m_buffer_ctrl_obj(new Stream::_BufferCtrlObj(*this))
{
DEB_CONSTRUCTOR();
......@@ -184,7 +106,6 @@ Stream::~Stream()
close(m_pipes[0]),close(m_pipes[1]);
zmq_ctx_destroy(m_zmq_context);
delete m_buffer_cbk;
delete m_buffer_ctrl_obj;
}
......@@ -286,11 +207,6 @@ HwBufferCtrlObj* Stream::getBufferCtrlObj()
return m_buffer_ctrl_obj;
}
bool Stream::get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,int &depth)
{
return m_buffer_cbk->get_msg(aDataBuffer,msg_data,msg_size,depth);
}
void* Stream::_runFunc(void *streamPt)
{
((Stream*)streamPt)->_run();
......@@ -485,8 +401,9 @@ void Stream::_run()
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frameid;
void* buffer_ptr = buffer_mgr.getFrameBufferPtr(frameid);
m_buffer_cbk->register_new_msg(pending_messages[2],buffer_ptr,
anImageDim.getDepth());
m_data_2_msg[buffer_ptr] = MessageNDepth(pending_messages[2],
anImageDim.getDepth());
#ifdef READ_HEADER
if(nb_messages == 5)
{
......@@ -563,3 +480,43 @@ void Stream::_checkEncoding(const std::string& encoding,
DEB_RETURN() << DEB_VAR1(comp_type);
}
bool Stream::get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,int& depth)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aDataBuffer);
AutoMutex lock(m_cond.mutex());
Data2Message::iterator it = m_data_2_msg.find(aDataBuffer);
if(it == m_data_2_msg.end())
return false;
MessageNDepth message_depth = it->second;
std::shared_ptr<Stream::Message> message = message_depth.first;
depth = message_depth.second;
msg_data = zmq_msg_data(message->get_msg());
msg_size = zmq_msg_size(message->get_msg());
DEB_RETURN() << DEB_VAR2(msg_data,msg_size);
return true;
}
void Stream::release_msg(void* aDataBuffer)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aDataBuffer);
AutoMutex lock(m_cond.mutex());
Data2Message::iterator it = m_data_2_msg.find(aDataBuffer);
if(it == m_data_2_msg.end())
THROW_HW_ERROR(Error) << "Internal error: releasing buffer not in list";
m_data_2_msg.erase(it);
}
void Stream::release_all_msgs()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
m_data_2_msg.clear();
}
......@@ -54,13 +54,19 @@ namespace lima
bool isActive() const;
HwBufferCtrlObj* getBufferCtrlObj();
bool get_msg(void* aDataBuffer,void*& msg_data,size_t& msg_size,
int& depth);
void release_msg(void* aDataBuffer);
void release_all_msgs();
private:
class _BufferCallback;
class _BufferCtrlObj;
friend class _BufferCtrlObj;
typedef std::pair<std::shared_ptr<Stream::Message>,int> MessageNDepth;
typedef std::map<void*,MessageNDepth> Data2Message;
static void* _runFunc(void*);
void _run();
void _send_synchro();
......@@ -83,7 +89,7 @@ namespace lima
pthread_t m_thread_id;
void* m_zmq_context;
int m_pipes[2];
_BufferCallback* m_buffer_cbk;
Data2Message m_data_2_msg;
_BufferCtrlObj* m_buffer_ctrl_obj;
};
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment