Commit dcbb77d4 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Remove Model processing threads, move packet RX to Camera::AcqThread

parent 83021557
......@@ -186,7 +186,6 @@ private:
public:
AcqThread(Camera *cam);
void queueFinishedFrame(FrameType frame);
virtual void start(AutoMutex& l);
void stop(AutoMutex& l, bool wait);
......@@ -208,6 +207,7 @@ private:
AutoMutex& m_lock;
};
DetFrameImagePackets readRecvPackets(FrameType frame);
Status newFrameReady(FrameType frame);
void startAcq();
void stopAcq();
......@@ -216,7 +216,8 @@ private:
Camera *m_cam;
Cond& m_cond;
AcqState& m_state;
FrameQueue m_frame_queue;
FramePacketMap m_frame_packet_map;
bool m_acq_stopped;
};
friend class BufferMgr;
......@@ -248,8 +249,6 @@ private:
void removeSharedMem();
void createReceivers();
DetFrameImagePackets readRecvPackets();
void publishFrame(FrameType frame);
void assemblePackets(DetFrameImagePackets&& det_frame_packets);
bool checkLostPackets();
......@@ -317,7 +316,6 @@ private:
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
AutoPtr<AcqThread> m_acq_thread;
FramePacketMap m_frame_packet_map;
};
} // namespace SlsDetector
......
......@@ -268,8 +268,6 @@ class Eiger : public Model
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
......@@ -290,57 +288,6 @@ class Eiger : public Model
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stopping, Quitting, End,
};
Thread(Eiger *eiger, int idx);
virtual ~Thread();
void setCPUAffinity(CPUAffinity aff);
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{
setState(Stopping);
AutoMutex l = lock();
while (m_state != Ready)
wait();
}
protected:
virtual void threadFunction();
private:
friend class Eiger;
AutoMutex lock()
{ return m_eiger->lock(); }
void wait()
{ m_eiger->wait(); }
void broadcast()
{ m_eiger->broadcast(); }
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
Eiger *m_eiger;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
class CorrBase
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::CorrBase",
......@@ -532,21 +479,8 @@ class Eiger : public Model
const FrameDim& getRecvFrameDim()
{ return m_geom.m_recv_frame_dim; }
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
CorrBase *createBadRecvFrameCorr();
CorrBase *createChipBorderCorr(ImageType image_type);
CorrBase *createInterModGapCorr();
......@@ -586,14 +520,11 @@ class Eiger : public Model
static const unsigned long BebFpgaReadPtrAddr;
static const unsigned long BebFpgaPtrRange;
Cond m_cond;
BebList m_beb_list;
Geometry m_geom;
CorrList m_corr_list;
RecvList m_recv_list;
ModelReconstruction *m_reconstruction;
FrameType m_nb_frames;
ThreadList m_thread_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
};
......
......@@ -199,8 +199,6 @@ class Jungfrau : public Model
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
......@@ -214,57 +212,6 @@ class Jungfrau : public Model
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stopping, Quitting, End,
};
Thread(Jungfrau *jungfrau, int idx);
virtual ~Thread();
void setCPUAffinity(CPUAffinity aff);
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{
setState(Stopping);
AutoMutex l = lock();
while (m_state != Ready)
wait();
}
protected:
virtual void threadFunction();
private:
friend class Jungfrau;
AutoMutex lock()
{ return m_jungfrau->lock(); }
void wait()
{ m_jungfrau->wait(); }
void broadcast()
{ m_jungfrau->broadcast(); }
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
Jungfrau *m_jungfrau;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
class ImgProcBase
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::ImgProcBase",
......@@ -593,22 +540,8 @@ class Jungfrau : public Model
FrameDim getModuleFrameDim(int idx, bool raw);
int getModuleDataOffset(int idx, bool raw);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
Cond m_cond;
AutoPtr<GainPedImgProc> m_gain_ped_img_proc;
AutoPtr<GainADCMapImgProc> m_gain_adc_map_img_proc;
AutoPtr<AveImgProc> m_ave_img_proc;
......@@ -617,8 +550,6 @@ class Jungfrau : public Model
ImgSrc m_img_src;
ModelReconstruction *m_reconstruction;
RecvList m_recv_list;
FrameType m_nb_frames;
ThreadList m_thread_list;
};
std::ostream& operator <<(std::ostream& os, Jungfrau::GainPed::MapType map_type);
......
......@@ -47,8 +47,6 @@ class Model
public:
typedef Defs::Settings Settings;
typedef FrameMap::FinishInfo FinishInfo;
Model(Camera *cam, Type type);
virtual ~Model();
......@@ -86,12 +84,8 @@ class Model
virtual bool isAcqActive();
virtual bool isXferActive() = 0;
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list) = 0;
virtual Reconstruction *getReconstruction();
void processPackets();
protected:
void updateCameraModel();
void updateCameraImageSize();
......@@ -110,8 +104,6 @@ class Model
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
void processFinishInfo(const FinishInfo& finfo);
BufferMgr *getBuffer();
private:
......@@ -125,8 +117,6 @@ class Model
protected:
AutoPtr<sls::Detector> m_det;
FrameType m_next_frame;
FrameType m_last_frame;
};
......
......@@ -103,9 +103,6 @@ class Eiger : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......
......@@ -201,9 +201,6 @@ class Jungfrau : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......
......@@ -34,9 +34,6 @@ public:
Model(SlsDetector::Camera *cam, SlsDetector::Type type);
virtual ~Model();
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list) = 0;
virtual void getFrameDim(FrameDim& frame_dim /Out/,
bool raw = false) = 0;
virtual void getAcqFrameDim(FrameDim& frame_dim /Out/,
......
......@@ -1618,8 +1618,6 @@ void GlobalCPUAffinityMgr::setModelAffinity(
if (model_affinity_list == m_curr.model_threads)
return;
m_cam->m_model->setThreadCPUAffinity(model_affinity_list);
CPUAffinity buffer_affinity = CPUAffinityList_all(model_affinity_list);
m_cam->m_buffer.setAcqBufferCPUAffinity(buffer_affinity);
......
......@@ -103,7 +103,8 @@ Camera::AcqThread::ExceptionCleanUp::~ExceptionCleanUp()
}
Camera::AcqThread::AcqThread(Camera *cam)
: m_cam(cam), m_cond(m_cam->m_cond), m_state(m_cam->m_state)
: m_cam(cam), m_cond(m_cam->m_cond), m_state(m_cam->m_state),
m_acq_stopped(false)
{
DEB_CONSTRUCTOR();
}
......@@ -136,10 +137,29 @@ void Camera::AcqThread::stop(AutoMutex& l, bool wait)
m_state = StopReq;
m_cond.broadcast();
{
AutoMutexUnlock u(l);
stopAcq();
}
while (wait && (m_state != Stopped) && (m_state != Idle))
m_cond.wait();
}
inline
Camera::AcqThread::Status Camera::AcqThread::newFrameReady(FrameType frame)
{
DEB_MEMBER_FUNCT();
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frame;
StdBufferCbMgr *cb_mgr = m_cam->m_buffer.getBufferCbMgr(LimaBuffer);
bool cont_acq = cb_mgr->newFrameReady(frame_info);
bool acq_end = (frame == m_cam->m_lima_nb_frames - 1);
cont_acq &= !acq_end;
return Status(cont_acq, acq_end);
}
void Camera::AcqThread::threadFunction()
{
DEB_MEMBER_FUNCT();
......@@ -160,43 +180,39 @@ void Camera::AcqThread::threadFunction()
DEB_TRACE() << DEB_VAR1(m_state);
m_cond.broadcast();
Reconstruction *reconstruct = m_cam->m_model->getReconstruction();
SeqFilter seq_filter;
bool had_frames = false;
bool cont_acq = true;
bool acq_end = false;
do {
while ((m_state != StopReq) && m_frame_queue.empty()) {
if (!m_cond.wait(m_cam->m_new_frame_timeout)) {
FrameType next_frame = 0;
auto get_next_frame = [&]() {
DetFrameImagePackets packets = readRecvPackets(next_frame++);
FrameType frame = packets.first;
reconstruct->addFramePackets(std::move(packets));
return frame;
};
while ((m_state != StopReq) && cont_acq &&
(next_frame < m_cam->m_lima_nb_frames)) {
FrameType frame;
{
AutoMutexUnlock u(l);
try {
m_cam->checkLostPackets();
} catch (Exception& e) {
string name = ("Camera::AcqThread: "
"checkLostPackets");
m_cam->reportException(e, name);
}
}
frame = get_next_frame();
DEB_TRACE() << DEB_VAR2(next_frame, frame);
}
if (!m_frame_queue.empty()) {
FrameType frame = m_frame_queue.front();
m_frame_queue.pop();
DEB_TRACE() << DEB_VAR1(frame);
seq_filter.addVal(frame);
SeqFilter::Range frames = seq_filter.getSeqRange();
if (frames.nb > 0) {
int f = frames.first;
do {
m_cam->m_buffer.waitLimaFrame(f, l);
m_cam->m_buffer.waitLimaFrame(frame, l);
{
AutoMutexUnlock u(l);
DEB_TRACE() << DEB_VAR1(f);
Status status = newFrameReady(f);
Status status = newFrameReady(frame);
cont_acq = status.first;
acq_end = status.second;
had_frames = true;
} while ((++f != frames.end()) && cont_acq);
}
}
} while ((m_state != StopReq) && cont_acq);
AcqState prev_state = m_state;
if (acq_end && m_cam->m_skip_frame_freq) {
......@@ -238,13 +254,47 @@ void Camera::AcqThread::threadFunction()
m_cond.broadcast();
}
void Camera::AcqThread::queueFinishedFrame(FrameType frame)
DetFrameImagePackets Camera::AcqThread::readRecvPackets(FrameType frame)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame);
AutoMutex l = m_cam->lock();
m_frame_queue.push(frame);
m_cond.broadcast();
DetFrameImagePackets det_frame_packets{frame, {}};
DetImagePackets& det_packets = det_frame_packets.second;
FramePacketMap::iterator it = m_frame_packet_map.find(frame);
if (it != m_frame_packet_map.end()) {
det_packets = std::move(it->second);
m_frame_packet_map.erase(it);
}
auto stopped = [&]() { return (m_cam->getAcqState() == StopReq); };
int nb_recvs = m_cam->getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
while ((det_packets.find(i) == det_packets.end()) &&
!stopped()) {
Receiver::ImagePackets *image_packets;
Receiver *recv = m_cam->m_recv_list[i];
image_packets = recv->readImagePackets();
if (!image_packets)
break;
typedef DetImagePackets::value_type MapEntry;
FrameType f = image_packets->frame;
if (f == frame) {
det_packets.emplace(MapEntry(i, image_packets));
break;
} else {
DetImagePackets& other = m_frame_packet_map[f];
other.emplace(MapEntry(i, image_packets));
}
}
}
if (stopped()) {
det_frame_packets.second.clear();
m_frame_packet_map.clear();
}
return det_frame_packets;
}
void Camera::AcqThread::cleanUp(AutoMutex& l)
......@@ -292,6 +342,14 @@ void Camera::AcqThread::startAcq()
void Camera::AcqThread::stopAcq()
{
DEB_MEMBER_FUNCT();
{
AutoMutex l(m_cam->lock());
if (m_acq_stopped)
return;
m_acq_stopped = true;
}
sls::Detector *det = m_cam->m_det;
DetStatus det_status = m_cam->getDetStatus();
bool xfer_active = m_cam->m_model->isXferActive();
......@@ -311,18 +369,6 @@ void Camera::AcqThread::stopAcq()
m_cam->m_model->stopAcq();
}
Camera::AcqThread::Status Camera::AcqThread::newFrameReady(FrameType frame)
{
DEB_MEMBER_FUNCT();
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frame;
StdBufferCbMgr *cb_mgr = m_cam->m_buffer.getBufferCbMgr(LimaBuffer);
bool cont_acq = cb_mgr->newFrameReady(frame_info);
bool acq_end = (frame == m_cam->m_lima_nb_frames - 1);
cont_acq &= !acq_end;
return Status(cont_acq, acq_end);
}
Camera::Camera(string config_fname, int det_id)
: m_det_id(det_id),
m_model(NULL),
......@@ -363,7 +409,7 @@ Camera::Camera(string config_fname, int det_id)
EXC_CHECK(m_det->setRxSilentMode(1));
EXC_CHECK(m_det->setRxFrameDiscardPolicy(
slsDetectorDefs::DISCARD_PARTIAL_FRAMES));
setReceiverFifoDepth(4);
setReceiverFifoDepth(16);
sls::Result<int> dr_res;
EXC_CHECK(dr_res = m_det->getDynamicRange());
......@@ -874,6 +920,10 @@ void Camera::prepareAcq()
m_next_ready_ts = Timestamp();
}
RecvList::iterator rit, rend = m_recv_list.end();
for (rit = m_recv_list.begin(); rit != rend; ++rit)
(*rit)->prepareAcq();
m_model->prepareAcq();
m_global_cpu_affinity_mgr.prepareAcq();
m_model->getReconstruction()->prepare();
......@@ -948,54 +998,6 @@ Camera::DetStatus Camera::getDetTrigStatus()
return trig_status;
}
DetFrameImagePackets Camera::readRecvPackets()
{
DEB_MEMBER_FUNCT();
FrameType frame = m_model->m_next_frame++;
DetFrameImagePackets det_frame_packets{frame, {}};
DetImagePackets& det_packets = det_frame_packets.second;
FramePacketMap::iterator it = m_frame_packet_map.find(frame);
if (it != m_frame_packet_map.end()) {
det_packets = std::move(det_packets);
m_frame_packet_map.erase(it);
}
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
while (det_packets.find(i) == det_packets.end()) {
Receiver::ImagePackets *image_packets;
image_packets = m_recv_list[i]->readImagePackets();
if (!image_packets)
break;
typedef DetImagePackets::value_type MapEntry;
FrameType f = image_packets->frame;
if (f == frame) {
det_packets.emplace(MapEntry(i, image_packets));
break;
} else {
DetImagePackets& other = m_frame_packet_map[f];
other.emplace(MapEntry(i, image_packets));
}
}
}
FrameType& m_last_frame = m_model->m_last_frame;
if ((int(m_last_frame) == -1) || (frame > m_last_frame))
m_last_frame = frame;
return det_frame_packets;
}
void Camera::publishFrame(FrameType frame)
{
DEB_MEMBER_FUNCT();
FrameMap::Item *mi = m_frame_map.getItem(0);
Model::FinishInfo finfo = mi->frameFinished(frame, true, true);
m_model->processFinishInfo(finfo);
}
void Camera::assemblePackets(DetFrameImagePackets&& det_frame_packets)
{
DEB_MEMBER_FUNCT();
......@@ -1017,56 +1019,6 @@ void Camera::assemblePackets(DetFrameImagePackets&& det_frame_packets)
}
}
bool Camera::checkLostPackets()
{
DEB_MEMBER_FUNCT();
FrameArray ifa = m_frame_map.getItemFrameArray();
if (ifa != m_prev_ifa) {
m_prev_ifa = ifa;
return false;
}
FrameType last_frame = getLatestFrame(ifa);
if (getOldestFrame(ifa) == last_frame) {
DEB_RETURN() << DEB_VAR1(false);
return false;
}
if (!m_tol_lost_packets) {
ostringstream err_msg;
err_msg << "checkLostPackets: frame_map=" << m_frame_map;
Event::Code err_code = Event::CamOverrun;
Event *event = new Event(Hardware, Event::Error, Event::Camera,
err_code, err_msg.str());
DEB_EVENT(*event) << DEB_VAR1(*event);
reportEvent(event);
DEB_RETURN() << DEB_VAR1(true);
return true;
}
int nb_items = m_model->getNbFrameMapItems();
IntList first_bad(nb_items);