Commit 908a668d authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Move Eiger threads from Recv to Model level:

* Each thread process a whole frame
* FrameMap has only one item
parent 17779892
......@@ -1676,8 +1676,8 @@ Add LimaCCDs and SlsDetector class devices.
+----------------------------------------------------------+------------------------------------------------------+
| id10/slsdetector/eiger500k->apply_corrections | 0 |
+----------------------------------------------------------+------------------------------------------------------+
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | { 4: ((((CPU( 6), CPU( 7)), (CPU(18), CPU(19))), |
| | | ((CPU( 9), CPU(10)), (CPU(21), CPU(22)))), |
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | { 4: (((CPU( 6), CPU( 7)), (CPU( 9), CPU(10))), |
| | | (CPU(18), CPU(19), CPU(21), CPU(22)), |
| | | CPU(*chain(range(0, 6), range(12, 18))), |
| | | CPU(0), |
| | | (('eth0,eth1,eth2,eth4,eth6,eth7,eth8,eth9', |
......@@ -1695,7 +1695,7 @@ Add LimaCCDs and SlsDetector class devices.
the following tasks:
* Receivers' packet stream threads (passive mode equivalent to listeners): 2
* Eiger processing threads: 2 per receiver, configurable
* Eiger processing threads: 4, configurable
* Lima processing threads
* Other OS processes
* Net-dev group packet dispatching for Rx queues: irq & processing
......
......@@ -21,11 +21,10 @@ threshold_energy No 0 Initial detector threshold energy (eV)
tolerate_lost_packets No True Initial tolerance to lost packets
pixel_depth_cpu_affinity_map No [] Default PixelDepthCPUAffinityMap as Python string(s) defining a dict:
{<pixel_depth>: <global_affinity>}, being global_affinity a tuple:
(<recv_list>, <lima>, <other>, <netdev_grp_list>), where recv_list
is a list of tupples in the form: (<listeners>, <port_threads>),
where listeners and port_threads are tuples of affinities,
lima and and other are affinities, and netdev_grp_list is a list of
tuples in the form:
(<recv_list>, <model_threads>, <lima>, <other>, <netdev_grp_list>),
where recv_list is a list of tuples of listeners affinities,
model_threads a tuple of affinities, lima and other are affinities,
and netdev_grp_list is a list of tuples in the form:
(<comma_separated_netdev_name_list>, <rx_queue_affinity_map>), the
latter in the form of: {<queue>: (<irq>, <processing>)}.
Each affinity can be expressed by one of the functions: Mask(mask)
......
......@@ -484,7 +484,6 @@ class SystemCPUAffinityMgr
struct RecvCPUAffinity {
CPUAffinityList listeners;
CPUAffinityList recv_threads;
RecvCPUAffinity();
CPUAffinity all() const;
......@@ -492,8 +491,6 @@ struct RecvCPUAffinity {
const CPUAffinityList& Listeners() const
{ return listeners; }
const CPUAffinityList& RecvThreads() const
{ return recv_threads; }
typedef const CPUAffinityList& (RecvCPUAffinity::*Selector)() const;
};
......@@ -501,16 +498,14 @@ struct RecvCPUAffinity {
inline CPUAffinity RecvCPUAffinity::all() const
{
return (CPUAffinityList_all(listeners) |
CPUAffinityList_all(recv_threads));
return CPUAffinityList_all(listeners);
}
inline
bool operator ==(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
return ((a.listeners == b.listeners) &&
(a.recv_threads == b.recv_threads));
return (a.listeners == b.listeners);
}
inline
......@@ -544,10 +539,12 @@ inline CPUAffinity RecvCPUAffinityList_all(const RecvCPUAffinityList& l,
struct GlobalCPUAffinity {
RecvCPUAffinityList recv;
CPUAffinityList model_threads;
CPUAffinity lima;
CPUAffinity other;
NetDevGroupCPUAffinityList netdev;
GlobalCPUAffinity();
CPUAffinity all() const;
void updateRecvAffinity(CPUAffinity a);
};
......@@ -634,6 +631,7 @@ class GlobalCPUAffinityMgr
void setLimaAffinity(CPUAffinity lima_affinity);
void setRecvAffinity(const RecvCPUAffinityList& recv_affinity_list);
void setModelAffinity(const CPUAffinityList& model_affinity_list);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
......
......@@ -277,13 +277,12 @@ class Eiger : public Model
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
virtual int getNbRecvs();
virtual Model::Recv *getRecv(int recv_idx);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......@@ -292,7 +291,7 @@ class Eiger : public Model
friend class Correction;
friend class CorrBase;
class Recv : public Model::Recv
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
......@@ -303,105 +302,67 @@ class Eiger : public Model
virtual ~Recv();
void prepareAcq();
void startAcq();
void stopAcq();
virtual int getNbProcessingThreads();
virtual void setNbProcessingThreads(int nb_proc_threads);
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity);
void updateFrameMapItem(FrameMap::Item *item)
{ m_frame_map_item = item; }
bool processOneFrame(FrameType frame, char *bptr);
void processBadFrame(FrameType frame, char *bptr);
private:
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Thread",
"SlsDetector");
public:
enum State {
Init, Ready, Running, Stop, End,
};
Thread(Recv *recv, int idx);
virtual ~Thread();
Eiger *m_eiger;
int m_idx;
Geometry::Recv *m_geom;
Receiver *m_recv;
int m_data_offset;
};
void setCPUAffinity(CPUAffinity aff);
typedef std::vector<AutoPtr<Recv> > RecvList;
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{ setState(Ready); }
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stop, End,
};
protected:
virtual void threadFunction();
Thread(Eiger *eiger, int idx);
virtual ~Thread();
private:
friend class Recv;
void setCPUAffinity(CPUAffinity aff);
AutoMutex lock()
{ return m_recv->lock(); }
void wait()
{ m_recv->wait(); }
void broadcast()
{ m_recv->broadcast(); }
void prepareAcq();
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
void startAcq()
{ setState(Running); }
void stopAcq()
{ setState(Ready); }
Recv *m_recv;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
protected:
virtual void threadFunction();
private:
friend class Eiger;
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
{ return m_eiger->lock(); }
void wait()
{ m_cond.wait(); }
{ m_eiger->wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
{ m_eiger->broadcast(); }
bool checkForRecvState(Thread& t)
void setState(State state)
{
while ((t.m_state == Thread::Ready) ||
((t.m_state == Thread::Running) &&
allFramesAcquired()))
wait();
return (t.m_state == Thread::Running);
AutoMutex l = lock();
m_state = state;
broadcast();
}
void processOneFrame(Thread& t, AutoMutex& l);
Eiger *m_eiger;
int m_idx;
Cond m_cond;
Geometry::Recv *m_geom;
Receiver *m_recv;
int m_data_offset;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
State m_state;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
typedef std::vector<AutoPtr<Thread> > ThreadList;
class CorrBase
{
......@@ -447,8 +408,7 @@ class Eiger : public Model
};
Camera *m_cam;
int m_nb_recvs;
std::vector<BadFrameData> m_bfd_list;
BadFrameData m_bfd;
};
class InterModGapCorr : public CorrBase
......@@ -579,6 +539,31 @@ class Eiger : public Model
const FrameDim& getRecvFrameDim()
{ return m_geom.m_recv_frame_dim; }
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
bool checkForRecvState(Thread& t)
{
while ((t.m_state == Thread::Ready) ||
((t.m_state == Thread::Running) && allFramesAcquired()))
wait();
return (t.m_state == Thread::Running);
}
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
void processOneFrame(Thread& t, AutoMutex& l);
CorrBase *createBadRecvFrameCorr();
CorrBase *createChipBorderCorr(ImageType image_type);
CorrBase *createInterModGapCorr();
......@@ -618,9 +603,16 @@ class Eiger : public Model
static const unsigned long BebFpgaReadPtrAddr;
static const unsigned long BebFpgaPtrRange;
Cond m_cond;
Geometry m_geom;
CorrList m_corr_list;
RecvList m_recv_list;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
};
......
......@@ -47,19 +47,6 @@ class Model
typedef FrameMap::FinishInfo FinishInfo;
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv", "SlsDetector");
public:
virtual ~Recv();
virtual int getNbProcessingThreads() = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity) = 0;
};
Model(Camera *cam, Type type);
virtual ~Model();
......@@ -95,6 +82,8 @@ class Model
virtual bool isXferActive() = 0;
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list) = 0;
protected:
void updateCameraModel();
void updateTimeRanges();
......@@ -108,9 +97,6 @@ class Model
virtual bool checkSettings(Settings settings) = 0;
virtual int getNbRecvs() = 0;
virtual Recv *getRecv(int recv_idx) = 0;
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
......
......@@ -322,7 +322,6 @@ public:
struct RecvCPUAffinity {
SlsDetector::CPUAffinityList listeners;
SlsDetector::CPUAffinityList recv_threads;
RecvCPUAffinity();
SlsDetector::CPUAffinity all() const;
......@@ -336,9 +335,12 @@ SlsDetector::CPUAffinity RecvCPUAffinityList_all(
struct GlobalCPUAffinity {
SlsDetector::RecvCPUAffinityList recv;
SlsDetector::CPUAffinityList model_threads;
SlsDetector::CPUAffinity lima;
SlsDetector::CPUAffinity other;
SlsDetector::NetDevGroupCPUAffinityList netdev;
GlobalCPUAffinity();
SlsDetector::CPUAffinity all() const;
};
......
......@@ -107,8 +107,8 @@ class Eiger : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual int getNbRecvs();
virtual SlsDetector::Model::Recv *getRecv(int recv_idx);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual void prepareAcq();
virtual void startAcq();
......
......@@ -31,21 +31,13 @@ class Model
{
public:
class Recv
{
public:
virtual ~Recv();
virtual int getNbProcessingThreads() = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual void setCPUAffinity(const SlsDetector::RecvCPUAffinity&
recv_affinity) = 0;
};
Model(SlsDetector::Camera *cam, SlsDetector::Type type);
virtual ~Model();
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list) = 0;
virtual void getFrameDim(FrameDim& frame_dim /Out/,
bool raw = false) = 0;
......@@ -87,9 +79,6 @@ protected:
virtual
bool checkSettings(SlsDetector::Defs::Settings settings) = 0;
virtual int getNbRecvs() = 0;
virtual SlsDetector::Model::Recv *getRecv(int recv_idx) = 0;
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
......
......@@ -1331,17 +1331,21 @@ void SystemCPUAffinityMgr::setNetDevCPUAffinity(
}
RecvCPUAffinity::RecvCPUAffinity()
: listeners(1), recv_threads(1)
: listeners(1)
{
}
RecvCPUAffinity& RecvCPUAffinity::operator =(CPUAffinity a)
{
listeners.assign(1, a);
recv_threads.assign(1, a);
return *this;
}
GlobalCPUAffinity::GlobalCPUAffinity()
: model_threads(1)
{
}
CPUAffinity GlobalCPUAffinity::all() const
{
return (RecvCPUAffinityList_all(recv) |
......@@ -1524,6 +1528,7 @@ void GlobalCPUAffinityMgr::applyAndSet(const GlobalCPUAffinity& o)
setLimaAffinity(o.lima);
setRecvAffinity(o.recv);
setModelAffinity(o.model_threads);
if (!m_system_mgr)
m_system_mgr = new SystemCPUAffinityMgr();
......@@ -1565,14 +1570,19 @@ void GlobalCPUAffinityMgr::setRecvAffinity(
return;
m_cam->setRecvCPUAffinity(recv_affinity_list);
m_curr.recv = recv_affinity_list;
}
const RecvCPUAffinityList& l = recv_affinity_list;
RecvCPUAffinity::Selector s = &RecvCPUAffinity::RecvThreads;
CPUAffinity buffer_affinity = RecvCPUAffinityList_all(l, s);
void GlobalCPUAffinityMgr::setModelAffinity(
const CPUAffinityList& model_affinity_list)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(model_affinity_list);
m_cam->m_model->setThreadCPUAffinity(model_affinity_list);
CPUAffinity buffer_affinity = CPUAffinityList_all(model_affinity_list);
DEB_ALWAYS() << DEB_VAR1(buffer_affinity);
m_cam->m_buffer_ctrl_obj->setCPUAffinityMask(buffer_affinity);
m_curr.recv = recv_affinity_list;
}
void GlobalCPUAffinityMgr::updateRecvRestart()
......@@ -1665,6 +1675,7 @@ void GlobalCPUAffinityMgr::limaFinished()
<< m_set.lima;
setLimaAffinity(m_set.lima);
setRecvAffinity(m_set.recv);
setModelAffinity(m_set.model_threads);
}
m_state = Ready;
......@@ -1747,7 +1758,7 @@ lima::SlsDetector::operator <<(ostream& os, const NetDevGroupCPUAffinity& a)
ostream& lima::SlsDetector::operator <<(ostream& os, const RecvCPUAffinity& a)
{
os << "<";
os << "listeners=" << a.listeners << ", recv_threads=" << a.recv_threads;
os << "listeners=" << a.listeners;
return os << ">";
}
......
......@@ -211,8 +211,10 @@ void Camera::AcqThread::threadFunction()
DEB_ALWAYS() << DEB_VAR1(stats);
FrameMap& m = m_cam->m_frame_map;
XYStat::LinRegress delay_stat = m.calcDelayStat();
DEB_ALWAYS() << DEB_VAR1(delay_stat);
if (m.getNbItems() > 1) {
XYStat::LinRegress delay_stat = m.calcDelayStat();
DEB_ALWAYS() << DEB_VAR1(delay_stat);
}
if (had_frames) {
affinity_mgr.recvFinished();
......@@ -673,22 +675,12 @@ void Camera::updateCPUAffinity(bool recv_restarted)
void Camera::setRecvCPUAffinity(const RecvCPUAffinityList& recv_affinity_list)
{
DEB_MEMBER_FUNCT();
unsigned int nb_recv = m_model->getNbRecvs();
unsigned int nb_aff = recv_affinity_list.size();
DEB_PARAM() << DEB_VAR2(nb_recv, nb_aff);
if (nb_aff != nb_recv)
THROW_HW_ERROR(InvalidValue) << "invalid affinity list size: "
<< DEB_VAR2(nb_recv, nb_aff);
DEB_PARAM() << DEB_VAR1(nb_aff);
RecvCPUAffinityList::const_iterator ait = recv_affinity_list.begin();
RecvList::iterator rit = m_recv_list.begin();
for (unsigned int i = 0; i < nb_recv; ++i, ++ait, ++rit) {
Model::Recv *recv = m_model->getRecv(i);
const RecvCPUAffinity& aff = *ait;
recv->setNbProcessingThreads(aff.recv_threads.size());
recv->setCPUAffinity(aff);
(*rit)->setCPUAffinity(aff);
}
for (unsigned int i = 0; i < nb_aff; ++i, ++ait, ++rit)
(*rit)->setCPUAffinity(*ait);
}
void Camera::clearAllBuffers()
......@@ -1167,8 +1159,8 @@ void Camera::getSortedBadFrameList(IntList first_idx, IntList last_idx,
{
bool all = first_idx.empty();
IntList bfl;
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
int nb_items = m_frame_map.getNbItems();
for (int i = 0; i < nb_items; ++i) {
FrameMap::Item *item = m_frame_map.getItem(i);
int first = all ? 0 : first_idx[i];
int last = all ? item->getNbBadFrames() : last_idx[i];
......
......@@ -91,8 +91,6 @@ Eiger::BadRecvFrameCorr::BadRecvFrameCorr(Eiger *eiger)
DEB_CONSTRUCTOR();
m_cam = m_eiger->getCamera();
m_nb_recvs = m_eiger->getNbRecvs();
m_bfd_list.resize(m_nb_recvs);
}
void Eiger::BadRecvFrameCorr::prepareAcq()
......@@ -100,9 +98,7 @@ void Eiger::BadRecvFrameCorr::prepareAcq()
DEB_MEMBER_FUNCT();
CorrBase::prepareAcq();
for (int i = 0; i < m_nb_recvs; ++i)
m_bfd_list[i].reset();
m_bfd.reset();
}
void Eiger::BadRecvFrameCorr::correctFrame(FrameType frame, void *ptr)
......@@ -110,27 +106,26 @@ void Eiger::BadRecvFrameCorr::correctFrame(FrameType frame, void *ptr)
DEB_MEMBER_FUNCT();
char *bptr = (char *) ptr;
for (int i = 0; i < m_nb_recvs; ++i) {
BadFrameData& bfd = m_bfd_list[i];
IntList& bfl = bfd.bad_frame_list;
int& last_idx = bfd.last_idx;
if (bfl.empty()) {
int bad_frames = m_cam->getNbBadFrames(i);
if (bad_frames == last_idx)
continue;
m_cam->getBadFrameList(i, last_idx, bad_frames, bfl);
}
IntList::iterator end = bfl.end();
if (find(bfl.begin(), end, frame) != end) {
Eiger::Geometry *geom = m_eiger->getGeometry();
IntList& bfl = m_bfd.bad_frame_list;
int& last_idx = m_bfd.last_idx;
if (bfl.empty()) {
int bad_frames = m_cam->getNbBadFrames(0);
if (bad_frames == last_idx)
return;
m_cam->getBadFrameList(0, last_idx, bad_frames, bfl);
}
IntList::iterator end = bfl.end();
if (find(bfl.begin(), end, frame) != end) {
Eiger::Geometry *geom = m_eiger->getGeometry();
for (int i = 0; i < geom->getNbRecvs(); ++i) {
Eiger::Geometry::Recv *recv = geom->getRecv(i);
recv->fillBadFrame(frame, bptr);
}
if (*(end - 1) > int(frame))
continue;
last_idx += bfl.size();
bfl.clear();
}
if (*(end - 1) > int(frame))
return;
last_idx += bfl.size();
bfl.clear();
}
Eiger::InterModGapCorr::InterModGapCorr(Eiger *eiger)
......@@ -604,65 +599,6 @@ FrameDim Eiger::Geometry::getRecvFrameDim(bool raw)
return frame_dim;
}
Eiger::Recv::Thread::Thread(Recv *recv, int idx)
: m_recv(recv), m_idx(idx)