GitLab will be upgraded on June 23th evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit e612787c authored by Alejandro Homs Puron's avatar Alejandro Homs Puron

Remove Model processing threads, move packet RX to Camera::AcqThread

parent 83021557
Pipeline #45770 failed with stages
in 6 minutes and 52 seconds
......@@ -186,7 +186,6 @@ private:
public:
AcqThread(Camera *cam);
void queueFinishedFrame(FrameType frame);
virtual void start(AutoMutex& l);
void stop(AutoMutex& l, bool wait);
......@@ -208,6 +207,7 @@ private:
AutoMutex& m_lock;
};
DetFrameImagePackets readRecvPackets(FrameType frame);
Status newFrameReady(FrameType frame);
void startAcq();
void stopAcq();
......@@ -216,7 +216,8 @@ private:
Camera *m_cam;
Cond& m_cond;
AcqState& m_state;
FrameQueue m_frame_queue;
FramePacketMap m_frame_packet_map;
bool m_acq_stopped;
};
friend class BufferMgr;
......@@ -248,8 +249,6 @@ private:
void removeSharedMem();
void createReceivers();
DetFrameImagePackets readRecvPackets();
void publishFrame(FrameType frame);
void assemblePackets(DetFrameImagePackets&& det_frame_packets);
bool checkLostPackets();
......@@ -317,7 +316,6 @@ private:
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
AutoPtr<AcqThread> m_acq_thread;
FramePacketMap m_frame_packet_map;
};
} // namespace SlsDetector
......
......@@ -268,8 +268,6 @@ class Eiger : public Model
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
......@@ -290,57 +288,6 @@ class Eiger : public Model
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stopping, Quitting, End,
};
Thread(Eiger *eiger, int idx);
virtual ~Thread();
void setCPUAffinity(CPUAffinity aff);
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{
setState(Stopping);
AutoMutex l = lock();
while (m_state != Ready)
wait();
}
protected:
virtual void threadFunction();
private:
friend class Eiger;
AutoMutex lock()
{ return m_eiger->lock(); }
void wait()
{ m_eiger->wait(); }
void broadcast()
{ m_eiger->broadcast(); }
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
Eiger *m_eiger;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
class CorrBase
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::CorrBase",
......@@ -532,21 +479,8 @@ class Eiger : public Model
const FrameDim& getRecvFrameDim()
{ return m_geom.m_recv_frame_dim; }
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
CorrBase *createBadRecvFrameCorr();
CorrBase *createChipBorderCorr(ImageType image_type);
CorrBase *createInterModGapCorr();
......@@ -586,14 +520,11 @@ class Eiger : public Model
static const unsigned long BebFpgaReadPtrAddr;
static const unsigned long BebFpgaPtrRange;
Cond m_cond;
BebList m_beb_list;
Geometry m_geom;
CorrList m_corr_list;
RecvList m_recv_list;
ModelReconstruction *m_reconstruction;
FrameType m_nb_frames;
ThreadList m_thread_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
};
......
......@@ -199,8 +199,6 @@ class Jungfrau : public Model
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
......@@ -214,57 +212,6 @@ class Jungfrau : public Model
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stopping, Quitting, End,
};
Thread(Jungfrau *jungfrau, int idx);
virtual ~Thread();
void setCPUAffinity(CPUAffinity aff);
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{
setState(Stopping);
AutoMutex l = lock();
while (m_state != Ready)
wait();
}
protected:
virtual void threadFunction();
private:
friend class Jungfrau;
AutoMutex lock()
{ return m_jungfrau->lock(); }
void wait()
{ m_jungfrau->wait(); }
void broadcast()
{ m_jungfrau->broadcast(); }
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
Jungfrau *m_jungfrau;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
class ImgProcBase
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::ImgProcBase",
......@@ -593,22 +540,8 @@ class Jungfrau : public Model
FrameDim getModuleFrameDim(int idx, bool raw);
int getModuleDataOffset(int idx, bool raw);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
Cond m_cond;
AutoPtr<GainPedImgProc> m_gain_ped_img_proc;
AutoPtr<GainADCMapImgProc> m_gain_adc_map_img_proc;
AutoPtr<AveImgProc> m_ave_img_proc;
......@@ -617,8 +550,6 @@ class Jungfrau : public Model
ImgSrc m_img_src;
ModelReconstruction *m_reconstruction;
RecvList m_recv_list;
FrameType m_nb_frames;
ThreadList m_thread_list;
};
std::ostream& operator <<(std::ostream& os, Jungfrau::GainPed::MapType map_type);
......
......@@ -47,8 +47,6 @@ class Model
public:
typedef Defs::Settings Settings;
typedef FrameMap::FinishInfo FinishInfo;
Model(Camera *cam, Type type);
virtual ~Model();
......@@ -86,12 +84,8 @@ class Model
virtual bool isAcqActive();
virtual bool isXferActive() = 0;
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list) = 0;
virtual Reconstruction *getReconstruction();
void processPackets();
protected:
void updateCameraModel();
void updateCameraImageSize();
......@@ -110,8 +104,6 @@ class Model
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
void processFinishInfo(const FinishInfo& finfo);
BufferMgr *getBuffer();
private:
......@@ -125,8 +117,6 @@ class Model
protected:
AutoPtr<sls::Detector> m_det;
FrameType m_next_frame;
FrameType m_last_frame;
};
......
......@@ -103,9 +103,6 @@ class Eiger : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......
......@@ -201,9 +201,6 @@ class Jungfrau : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......
......@@ -34,9 +34,6 @@ public:
Model(SlsDetector::Camera *cam, SlsDetector::Type type);
virtual ~Model();
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list) = 0;
virtual void getFrameDim(FrameDim& frame_dim /Out/,
bool raw = false) = 0;
virtual void getAcqFrameDim(FrameDim& frame_dim /Out/,
......
......@@ -1618,8 +1618,6 @@ void GlobalCPUAffinityMgr::setModelAffinity(
if (model_affinity_list == m_curr.model_threads)
return;
m_cam->m_model->setThreadCPUAffinity(model_affinity_list);
CPUAffinity buffer_affinity = CPUAffinityList_all(model_affinity_list);
m_cam->m_buffer.setAcqBufferCPUAffinity(buffer_affinity);
......
......@@ -103,7 +103,8 @@ Camera::AcqThread::ExceptionCleanUp::~ExceptionCleanUp()
}
Camera::AcqThread::AcqThread(Camera *cam)
: m_cam(cam), m_cond(m_cam->m_cond), m_state(m_cam->m_state)
: m_cam(cam), m_cond(m_cam->m_cond), m_state(m_cam->m_state),
m_acq_stopped(false)
{
DEB_CONSTRUCTOR();
}
......@@ -136,10 +137,29 @@ void Camera::AcqThread::stop(AutoMutex& l, bool wait)
m_state = StopReq;
m_cond.broadcast();
{
AutoMutexUnlock u(l);
stopAcq();
}
while (wait && (m_state != Stopped) && (m_state != Idle))
m_cond.wait();
}
inline
Camera::AcqThread::Status Camera::AcqThread::newFrameReady(FrameType frame)
{
DEB_MEMBER_FUNCT();
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frame;
StdBufferCbMgr *cb_mgr = m_cam->m_buffer.getBufferCbMgr(LimaBuffer);
bool cont_acq = cb_mgr->newFrameReady(frame_info);
bool acq_end = (frame == m_cam->m_lima_nb_frames - 1);
cont_acq &= !acq_end;
return Status(cont_acq, acq_end);
}
void Camera::AcqThread::threadFunction()
{
DEB_MEMBER_FUNCT();
......@@ -160,43 +180,39 @@ void Camera::AcqThread::threadFunction()
DEB_TRACE() << DEB_VAR1(m_state);
m_cond.broadcast();
Reconstruction *reconstruct = m_cam->m_model->getReconstruction();
SeqFilter seq_filter;
bool had_frames = false;
bool cont_acq = true;
bool acq_end = false;
do {
while ((m_state != StopReq) && m_frame_queue.empty()) {
if (!m_cond.wait(m_cam->m_new_frame_timeout)) {
AutoMutexUnlock u(l);
try {
m_cam->checkLostPackets();
} catch (Exception& e) {
string name = ("Camera::AcqThread: "
"checkLostPackets");
m_cam->reportException(e, name);
}
}
FrameType next_frame = 0;
auto get_next_frame = [&]() {
DetFrameImagePackets packets = readRecvPackets(next_frame++);
FrameType frame = packets.first;
reconstruct->addFramePackets(std::move(packets));
return frame;
};
while ((m_state != StopReq) && cont_acq &&
(next_frame < m_cam->m_lima_nb_frames)) {
FrameType frame;
{
AutoMutexUnlock u(l);
frame = get_next_frame();
DEB_TRACE() << DEB_VAR2(next_frame, frame);
}
if (!m_frame_queue.empty()) {
FrameType frame = m_frame_queue.front();
m_frame_queue.pop();
DEB_TRACE() << DEB_VAR1(frame);
seq_filter.addVal(frame);
SeqFilter::Range frames = seq_filter.getSeqRange();
if (frames.nb > 0) {
int f = frames.first;
do {
m_cam->m_buffer.waitLimaFrame(f, l);
AutoMutexUnlock u(l);
DEB_TRACE() << DEB_VAR1(f);
Status status = newFrameReady(f);
cont_acq = status.first;
acq_end = status.second;
had_frames = true;
} while ((++f != frames.end()) && cont_acq);
}
m_cam->m_buffer.waitLimaFrame(frame, l);
{
AutoMutexUnlock u(l);
Status status = newFrameReady(frame);
cont_acq = status.first;
acq_end = status.second;
had_frames = true;
}
} while ((m_state != StopReq) && cont_acq);
}
AcqState prev_state = m_state;
if (acq_end && m_cam->m_skip_frame_freq) {
......@@ -238,13 +254,47 @@ void Camera::AcqThread::threadFunction()
m_cond.broadcast();
}
void Camera::AcqThread::queueFinishedFrame(FrameType frame)
DetFrameImagePackets Camera::AcqThread::readRecvPackets(FrameType frame)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame);
AutoMutex l = m_cam->lock();
m_frame_queue.push(frame);
m_cond.broadcast();
DetFrameImagePackets det_frame_packets{frame, {}};
DetImagePackets& det_packets = det_frame_packets.second;
FramePacketMap::iterator it = m_frame_packet_map.find(frame);
if (it != m_frame_packet_map.end()) {
det_packets = std::move(det_packets);
m_frame_packet_map.erase(it);
}
auto stopped = [&]() { return (m_cam->getAcqState() == StopReq); };
int nb_recvs = m_cam->getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
while ((det_packets.find(i) == det_packets.end()) &&
!stopped()) {
Receiver::ImagePackets *image_packets;
Receiver *recv = m_cam->m_recv_list[i];
image_packets = recv->readImagePackets();
if (!image_packets)
break;
typedef DetImagePackets::value_type MapEntry;
FrameType f = image_packets->frame;
if (f == frame) {
det_packets.emplace(MapEntry(i, image_packets));
break;
} else {
DetImagePackets& other = m_frame_packet_map[f];
other.emplace(MapEntry(i, image_packets));
}
}
}
if (stopped()) {
det_frame_packets.second.clear();
m_frame_packet_map.clear();
}
return det_frame_packets;
}
void Camera::AcqThread::cleanUp(AutoMutex& l)
......@@ -292,12 +342,20 @@ void Camera::AcqThread::startAcq()
void Camera::AcqThread::stopAcq()
{
DEB_MEMBER_FUNCT();
{
AutoMutex l(m_cam->lock());
if (m_acq_stopped)
return;
m_acq_stopped = true;
}
sls::Detector *det = m_cam->m_det;
DetStatus det_status = m_cam->getDetStatus();
bool xfer_active = m_cam->m_model->isXferActive();
DEB_ALWAYS() << DEB_VAR2(det_status, xfer_active);
if ((det_status == Defs::Running) || xfer_active) {
DEB_TRACE() << "calling stopDetector";
DEB_ALWAYS() << "calling stopDetector";
det->stopDetector();
Timestamp t0 = Timestamp::now();
while (m_cam->m_model->isAcqActive())
......@@ -305,24 +363,12 @@ void Camera::AcqThread::stopAcq()
double milli_sec = (Timestamp::now() - t0) * 1e3;
DEB_TRACE() << "Abort -> Idle: " << DEB_VAR1(milli_sec);
}
DEB_TRACE() << "calling stopReceiver";
DEB_ALWAYS() << "calling stopReceiver";
det->stopReceiver();
DEB_TRACE() << "calling Model::stopAcq";
m_cam->m_model->stopAcq();
}
Camera::AcqThread::Status Camera::AcqThread::newFrameReady(FrameType frame)
{
DEB_MEMBER_FUNCT();
HwFrameInfoType frame_info;
frame_info.acq_frame_nb = frame;
StdBufferCbMgr *cb_mgr = m_cam->m_buffer.getBufferCbMgr(LimaBuffer);
bool cont_acq = cb_mgr->newFrameReady(frame_info);
bool acq_end = (frame == m_cam->m_lima_nb_frames - 1);
cont_acq &= !acq_end;
return Status(cont_acq, acq_end);
}
Camera::Camera(string config_fname, int det_id)
: m_det_id(det_id),
m_model(NULL),
......@@ -363,7 +409,7 @@ Camera::Camera(string config_fname, int det_id)
EXC_CHECK(m_det->setRxSilentMode(1));
EXC_CHECK(m_det->setRxFrameDiscardPolicy(
slsDetectorDefs::DISCARD_PARTIAL_FRAMES));
setReceiverFifoDepth(4);
setReceiverFifoDepth(16);
sls::Result<int> dr_res;
EXC_CHECK(dr_res = m_det->getDynamicRange());
......@@ -874,6 +920,10 @@ void Camera::prepareAcq()
m_next_ready_ts = Timestamp();
}
RecvList::iterator rit, rend = m_recv_list.end();
for (rit = m_recv_list.begin(); rit != rend; ++rit)
(*rit)->prepareAcq();
m_model->prepareAcq();
m_global_cpu_affinity_mgr.prepareAcq();
m_model->getReconstruction()->prepare();
......@@ -948,54 +998,6 @@ Camera::DetStatus Camera::getDetTrigStatus()
return trig_status;
}
DetFrameImagePackets Camera::readRecvPackets()
{
DEB_MEMBER_FUNCT();
FrameType frame = m_model->m_next_frame++;
DetFrameImagePackets det_frame_packets{frame, {}};
DetImagePackets& det_packets = det_frame_packets.second;
FramePacketMap::iterator it = m_frame_packet_map.find(frame);
if (it != m_frame_packet_map.end()) {
det_packets = std::move(det_packets);
m_frame_packet_map.erase(it);
}
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
while (det_packets.find(i) == det_packets.end()) {
Receiver::ImagePackets *image_packets;
image_packets = m_recv_list[i]->readImagePackets();
if (!image_packets)
break;
typedef DetImagePackets::value_type MapEntry;
FrameType f = image_packets->frame;
if (f == frame) {
det_packets.emplace(MapEntry(i, image_packets));
break;
} else {
DetImagePackets& other = m_frame_packet_map[f];
other.emplace(MapEntry(i, image_packets));
}
}
}
FrameType& m_last_frame = m_model->m_last_frame;
if ((int(m_last_frame) == -1) || (frame > m_last_frame))
m_last_frame = frame;
return det_frame_packets;
}
void Camera::publishFrame(FrameType frame)
{
DEB_MEMBER_FUNCT();
FrameMap::Item *mi = m_frame_map.getItem(0);
Model::FinishInfo finfo = mi->frameFinished(frame, true, true);
m_model->processFinishInfo(finfo);
}
void Camera::assemblePackets(DetFrameImagePackets&& det_frame_packets)
{
DEB_MEMBER_FUNCT();
......@@ -1017,56 +1019,6 @@ void Camera::assemblePackets(DetFrameImagePackets&& det_frame_packets)
}
}
bool Camera::checkLostPackets()
{
DEB_MEMBER_FUNCT();
FrameArray ifa = m_frame_map.getItemFrameArray();
if (ifa != m_prev_ifa) {
m_prev_ifa = ifa;
return false;
}
FrameType last_frame = getLatestFrame(ifa);
if (getOldestFrame(ifa) == last_frame) {
DEB_RETURN() << DEB_VAR1(false);
return false;
}
if (!m_tol_lost_packets) {
ostringstream err_msg;
err_msg << "checkLostPackets: frame_map=" << m_frame_map;
Event::Code err_code = Event::CamOverrun;
Event *event = new Event(Hardware, Event::Error, Event::Camera,
err_code, err_msg.str());
DEB_EVENT(*event) << DEB_VAR1(*event);
reportEvent(event);
DEB_RETURN() << DEB_VAR1(true);
return true;
}
int nb_items = m_model->getNbFrameMapItems();
IntList first_bad(nb_items);
if (DEB_CHECK_ANY(DebTypeWarning)) {
for (int i = 0; i < nb_items; ++i) {
FrameMap::Item *item = m_frame_map.getItem(i);
first_bad[i] = item->getNbBadFrames();
}
}