Commit ba474ad7 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Move FinishInfo preparation from recv. cb. to BufferThread

parent 02f0af25
......@@ -175,6 +175,9 @@ private:
typedef std::queue<int> FrameQueue;
typedef FrameMap::FinishInfo FinishInfo;
typedef FrameMap::FinishInfoList FinishInfoList;
struct AppInputData
{
DEB_CLASS_NAMESPC(DebModCamera, "Camera::AppInputData",
......@@ -237,36 +240,15 @@ private:
DEB_CLASS_NAMESPC(DebModCamera, "Camera::BufferThread",
"SlsDetector");
public:
typedef FrameMap::FinishInfo FinishInfo;
typedef std::vector<FinishInfo> FinishInfoArray;
BufferThread();
~BufferThread();
void init(Camera *cam, int port_idx, int size);
void init(Camera *cam, int port_idx);
void prepareAcq();
void getNewFrameEntry(int& idx, FinishInfo*& finfo)
{
DEB_MEMBER_FUNCT();
AutoMutex l = lock();
if (m_free_idx == m_finish_idx)
THROW_HW_ERROR(Error) << "BufferThread overrun";
idx = m_free_idx;
m_free_idx = getIndex(m_free_idx + 1);
l.unlock();
finfo = &m_finfo_array[idx];
}
void putNewFrameEntry(int idx, FinishInfo* /*finfo*/)
{
DEB_MEMBER_FUNCT();
AutoMutex l = lock();
m_ready_idx = idx;
m_cond.broadcast();
}
pid_t getTID()
{ return m_tid; }
......@@ -295,25 +277,14 @@ private:
AutoMutex lock()
{ return m_cond.mutex(); }
int getIndex(int i)
{
while (i < 0)
i += m_size;
return i % m_size;
}
void processFinishInfo(FinishInfo& finfo);
void processFinishInfo(const FinishInfo& finfo);
Camera *m_cam;
FrameMap *m_frame_map;
int m_port_idx;
pid_t m_tid;
bool m_end;
Cond m_cond;
int m_size;
FinishInfoArray m_finfo_array;
int m_free_idx;
int m_ready_idx;
int m_finish_idx;
IntList m_bad_frame_list;
};
......
......@@ -437,6 +437,7 @@ class FrameMap
int nb_lost;
SortedIntList finished;
};
typedef std::vector<FinishInfo> FinishInfoList;
FrameMap();
~FrameMap();
......@@ -446,8 +447,10 @@ class FrameMap
void clear();
void checkFinishedFrameItem(FrameType frame, int item);
FinishInfo frameItemFinished(FrameType frame, int item,
bool no_check, bool valid);
void frameItemFinished(FrameType frame, int item,
bool no_check, bool valid);
FinishInfoList pollFrameItemFinished(int item);
void stopPollFrameItemFinished(int item);
FrameArray getItemFrameArray() const
{ return m_last_item_frame; }
......@@ -459,6 +462,25 @@ class FrameMap
{ return getOldestFrame(m_last_item_frame); }
private:
typedef std::pair<int, bool> FrameData;
typedef std::set<FrameData> FrameDataList;
struct FrameQueue {
FrameDataList list;
Cond cond;
bool stopped;
FrameQueue();
void clear();
void push(FrameData data);
FrameDataList pop_all();
void stop();
};
typedef std::vector<FrameQueue> FrameQueueList;
friend bool SlsDetector::operator <(FrameData a, FrameData b);
struct AtomicCounter {
int count;
Mutex mutex;
......@@ -479,11 +501,18 @@ class FrameMap
typedef std::vector<AtomicCounter> CounterList;
int m_nb_items;
FrameQueueList m_frame_queue_list;
FrameArray m_last_item_frame;
int m_buffer_size;
CounterList m_frame_item_count;
};
inline bool operator <(FrameMap::FrameData a, FrameMap::FrameData b)
{
return (a.first < b.first);
}
std::ostream& operator <<(std::ostream& os, const FrameMap& m);
......
......@@ -199,23 +199,18 @@ void Camera::Receiver::portCallback(FrameType frame, int port, char *dptr,
}
Camera::BufferThread::BufferThread()
: m_cam(NULL)
: m_cam(NULL), m_port_idx(-1)
{
DEB_CONSTRUCTOR();
}
void Camera::BufferThread::init(Camera *cam, int port_idx, int size)
void Camera::BufferThread::init(Camera *cam, int port_idx)
{
DEB_MEMBER_FUNCT();
m_cam = cam;
m_frame_map = &m_cam->m_frame_map;
m_port_idx = port_idx;
m_size = size;
m_finfo_array.resize(size);
m_free_idx = getIndex(0);
m_ready_idx = getIndex(-1);
m_finish_idx = getIndex(-1);
start();
}
......@@ -234,9 +229,13 @@ Camera::BufferThread::~BufferThread()
if (!hasStarted())
return;
AutoMutex l = lock();
m_end = true;
m_cond.broadcast();
{
AutoMutex l = lock();
m_end = true;
m_cond.broadcast();
}
m_frame_map->stopPollFrameItemFinished(m_port_idx);
}
void Camera::BufferThread::start()
......@@ -268,21 +267,19 @@ void Camera::BufferThread::threadFunction()
m_end = false;
m_cond.broadcast();
while (!m_end) {
while (!m_end && (m_finish_idx == m_ready_idx))
m_cond.wait();
if (m_finish_idx != m_ready_idx) {
int new_idx = getIndex(m_finish_idx + 1);
{
AutoMutexUnlock u(l);
FinishInfo& finfo = m_finfo_array[new_idx];
AutoMutexUnlock u(l);
FinishInfoList finfo_list;
finfo_list = m_frame_map->pollFrameItemFinished(m_port_idx);
FinishInfoList::const_iterator it, end = finfo_list.end();
for (it = finfo_list.begin(); it != end; ++it) {
const FinishInfo& finfo = *it;
if ((finfo.nb_lost != 0) || !finfo.finished.empty())
processFinishInfo(finfo);
}
m_finish_idx = new_idx;
}
}
}
void Camera::BufferThread::processFinishInfo(FinishInfo& finfo)
void Camera::BufferThread::processFinishInfo(const FinishInfo& finfo)
{
DEB_MEMBER_FUNCT();
......@@ -546,11 +543,11 @@ void Camera::setModel(Model *model)
m_recv_ports = m_model->getRecvPorts();
int nb_ports = getTotNbPorts();
m_frame_map.setNbItems(nb_ports);
if (!m_buffer_thread) {
int buffer_size = 1000;
m_buffer_thread = new BufferThread[nb_ports];
for (int i = 0; i < nb_ports; ++i)
m_buffer_thread[i].init(this, i, buffer_size);
m_buffer_thread[i].init(this, i);
}
if (m_port_stats.empty())
m_port_stats.resize(nb_ports);
......@@ -912,7 +909,6 @@ void Camera::prepareAcq()
{
AutoMutex l = lock();
m_frame_map.setNbItems(nb_ports);
m_frame_map.setBufferSize(nb_buffers);
m_frame_map.clear();
DEB_TRACE() << DEB_VAR1(m_frame_queue.size());
......@@ -980,18 +976,8 @@ void Camera::processRecvPort(int port_idx, FrameType frame, char *dptr,
char *bptr = getFrameBufferPtr(frame);
m_model->processRecvPort(port_idx, frame, dptr, dsize, bptr);
}
FrameMap::FinishInfo finfo;
finfo = m_frame_map.frameItemFinished(frame, port_idx, true, valid);
if ((finfo.nb_lost == 0) && finfo.finished.empty())
return;
Timestamp t0 = Timestamp::now();
BufferThread& buffer = m_buffer_thread[port_idx];
int idx;
BufferThread::FinishInfo *buffer_finfo;
buffer.getNewFrameEntry(idx, buffer_finfo);
*buffer_finfo = finfo;
buffer.putNewFrameEntry(idx, buffer_finfo);
m_frame_map.frameItemFinished(frame, port_idx, true, valid);
Timestamp t1 = Timestamp::now();
PortStats& port_stats = m_port_stats[port_idx];
port_stats.stats.new_finish.add(t1 - t0);
......
......@@ -562,6 +562,45 @@ TimeRangesChangedCallback::~TimeRangesChangedCallback()
m_cam->unregisterTimeRangesChangedCallback(*this);
}
FrameMap::FrameQueue::FrameQueue() : stopped(false)
{
}
void FrameMap::FrameQueue::clear()
{
cond.acquire();
list.clear();
stopped = false;
cond.release();
}
void FrameMap::FrameQueue::push(FrameData data)
{
cond.acquire();
list.insert(data);
cond.signal();
cond.release();
}
FrameMap::FrameDataList FrameMap::FrameQueue::pop_all()
{
cond.acquire();
while (list.empty() && !stopped)
cond.wait();
FrameDataList ret = list;
list.clear();
cond.release();
return ret;
}
void FrameMap::FrameQueue::stop()
{
cond.acquire();
stopped = true;
cond.signal();
cond.release();
}
FrameMap::FrameMap()
: m_nb_items(0), m_buffer_size(0)
{
......@@ -580,6 +619,7 @@ void FrameMap::setNbItems(int nb_items)
if (nb_items == m_nb_items)
return;
m_frame_queue_list.resize(nb_items);
m_last_item_frame.resize(nb_items);
m_nb_items = nb_items;
}
......@@ -598,8 +638,10 @@ void FrameMap::setBufferSize(int buffer_size)
void FrameMap::clear()
{
DEB_MEMBER_FUNCT();
for (int i = 0; i < m_nb_items; ++i)
for (int i = 0; i < m_nb_items; ++i) {
m_frame_queue_list[i].clear();
m_last_item_frame[i] = -1;
}
for (int i = 0; i < m_buffer_size; ++i)
m_frame_item_count[i].set(m_nb_items);
}
......@@ -620,8 +662,8 @@ void FrameMap::checkFinishedFrameItem(FrameType frame, int item)
THROW_HW_ERROR(Error) << DEB_VAR1(frame) << " finished already";
}
FrameMap::FinishInfo FrameMap::frameItemFinished(FrameType frame, int item,
bool no_check, bool valid)
void FrameMap::frameItemFinished(FrameType frame, int item,
bool no_check, bool valid)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR4(frame, item, no_check, m_nb_items);
......@@ -629,26 +671,54 @@ FrameMap::FinishInfo FrameMap::frameItemFinished(FrameType frame, int item,
if (!no_check)
checkFinishedFrameItem(frame, item);
FinishInfo finfo;
m_frame_queue_list[item].push(FrameData(frame, valid));
}
FrameMap::FinishInfoList FrameMap::pollFrameItemFinished(int item)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(item);
FrameDataList data_list = m_frame_queue_list[item].pop_all();
FinishInfoList finfo_list;
FrameDataList::const_iterator it, end = data_list.end();
FrameType &last = m_last_item_frame[item];
finfo.first_lost = last + 1;
finfo.nb_lost = frame - finfo.first_lost + (!valid ? 1 : 0);
for (FrameType f = last + 1; f != (frame + 1); ++f) {
int idx = f % m_buffer_size;
AtomicCounter& count = m_frame_item_count[idx];
bool frame_finished = count.dec_test_and_reset(m_nb_items);
if (!frame_finished)
continue;
finfo.finished.insert(f);
for (it = data_list.begin(); it != end; ++it) {
FrameType frame = it->first;
bool valid = it->second;
FinishInfo finfo;
finfo.first_lost = last + 1;
finfo.nb_lost = frame - finfo.first_lost + (!valid ? 1 : 0);
for (FrameType f = last + 1; f != (frame + 1); ++f) {
int idx = f % m_buffer_size;
AtomicCounter& count = m_frame_item_count[idx];
bool finished = count.dec_test_and_reset(m_nb_items);
if (finished)
finfo.finished.insert(f);
}
last = frame;
if (DEB_CHECK_ANY(DebTypeReturn)) {
PrettySortedList finished_list(finfo.finished);
DEB_RETURN() << DEB_VAR3(finfo.first_lost,
finfo.nb_lost, finished_list);
}
finfo_list.push_back(finfo);
}
last = frame;
if (DEB_CHECK_ANY(DebTypeReturn))
DEB_RETURN() << DEB_VAR3(finfo.first_lost, finfo.nb_lost,
PrettySortedList(finfo.finished));
return finfo;
return finfo_list;
}
void FrameMap::stopPollFrameItemFinished(int item)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(item);
m_frame_queue_list[item].stop();
}
ostream& lima::SlsDetector::operator <<(ostream& os, const FrameMap& m)
{
os << "<";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment