Commit a6115388 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Revert "Move Eiger threads from Recv to Model level"

Lower reliability, probably due to different data streams' speeds
parent 5fb037cb
......@@ -1676,8 +1676,8 @@ Add LimaCCDs and SlsDetector class devices.
+----------------------------------------------------------+------------------------------------------------------+
| id10/slsdetector/eiger500k->apply_corrections | 0 |
+----------------------------------------------------------+------------------------------------------------------+
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | { 4: (((CPU( 6), CPU( 7)), (CPU( 9), CPU(10))), |
| | | (CPU(18), CPU(19), CPU(21), CPU(22)), |
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | { 4: ((((CPU( 6), CPU( 7)), (CPU(18), CPU(19))), |
| | | ((CPU( 9), CPU(10)), (CPU(21), CPU(22)))), |
| | | CPU(*chain(range(0, 6), range(12, 18))), |
| | | CPU(0), |
| | | (('eth0,eth1,eth2,eth4,eth6,eth7,eth8,eth9', |
......@@ -1695,7 +1695,7 @@ Add LimaCCDs and SlsDetector class devices.
the following tasks:
* Receivers' packet stream threads (passive mode equivalent to listeners): 2
* Eiger processing threads: 4, configurable
* Eiger processing threads: 2 per receiver, configurable
* Lima processing threads
* Other OS processes
* Net-dev group packet dispatching for Rx queues: irq & processing
......
......@@ -21,10 +21,11 @@ threshold_energy No 0 Initial detector threshold energy (eV)
tolerate_lost_packets No True Initial tolerance to lost packets
pixel_depth_cpu_affinity_map No [] Default PixelDepthCPUAffinityMap as Python string(s) defining a dict:
{<pixel_depth>: <global_affinity>}, being global_affinity a tuple:
(<recv_list>, <model_threads>, <lima>, <other>, <netdev_grp_list>),
where recv_list is a list of tuples of listeners affinities,
model_threads a tuple of affinities, lima and other are affinities,
and netdev_grp_list is a list of tuples in the form:
(<recv_list>, <lima>, <other>, <netdev_grp_list>), where recv_list
is a list of tupples in the form: (<listeners>, <port_threads>),
where listeners and port_threads are tuples of affinities,
lima and and other are affinities, and netdev_grp_list is a list of
tuples in the form:
(<comma_separated_netdev_name_list>, <rx_queue_affinity_map>), the
latter in the form of: {<queue>: (<irq>, <processing>)}.
Each affinity can be expressed by one of the functions: Mask(mask)
......
......@@ -484,6 +484,7 @@ class SystemCPUAffinityMgr
struct RecvCPUAffinity {
CPUAffinityList listeners;
CPUAffinityList recv_threads;
RecvCPUAffinity();
CPUAffinity all() const;
......@@ -491,6 +492,8 @@ struct RecvCPUAffinity {
const CPUAffinityList& Listeners() const
{ return listeners; }
const CPUAffinityList& RecvThreads() const
{ return recv_threads; }
typedef const CPUAffinityList& (RecvCPUAffinity::*Selector)() const;
};
......@@ -498,14 +501,16 @@ struct RecvCPUAffinity {
inline CPUAffinity RecvCPUAffinity::all() const
{
return CPUAffinityList_all(listeners);
return (CPUAffinityList_all(listeners) |
CPUAffinityList_all(recv_threads));
}
inline
bool operator ==(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
return (a.listeners == b.listeners);
return ((a.listeners == b.listeners) &&
(a.recv_threads == b.recv_threads));
}
inline
......@@ -539,12 +544,10 @@ inline CPUAffinity RecvCPUAffinityList_all(const RecvCPUAffinityList& l,
struct GlobalCPUAffinity {
RecvCPUAffinityList recv;
CPUAffinityList model_threads;
CPUAffinity lima;
CPUAffinity other;
NetDevGroupCPUAffinityList netdev;
GlobalCPUAffinity();
CPUAffinity all() const;
void updateRecvAffinity(CPUAffinity a);
};
......@@ -631,7 +634,6 @@ class GlobalCPUAffinityMgr
void setLimaAffinity(CPUAffinity lima_affinity);
void setRecvAffinity(const RecvCPUAffinityList& recv_affinity_list);
void setModelAffinity(const CPUAffinityList& model_affinity_list);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
......
......@@ -268,12 +268,13 @@ class Eiger : public Model
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
virtual int getNbRecvs();
virtual Model::Recv *getRecv(int recv_idx);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
......@@ -282,7 +283,7 @@ class Eiger : public Model
friend class Correction;
friend class CorrBase;
class Recv
class Recv : public Model::Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
......@@ -293,67 +294,105 @@ class Eiger : public Model
virtual ~Recv();
void prepareAcq();
void startAcq();
void stopAcq();
virtual int getNbProcessingThreads();
virtual void setNbProcessingThreads(int nb_proc_threads);
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity);
void updateFrameMapItem(FrameMap::Item *item)
{ m_frame_map_item = item; }
bool processOneFrame(FrameType frame, char *bptr);
void processBadFrame(FrameType frame, char *bptr);
private:
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Thread",
"SlsDetector");
public:
enum State {
Init, Ready, Running, Stop, End,
};
Eiger *m_eiger;
int m_idx;
Geometry::Recv *m_geom;
Receiver *m_recv;
int m_data_offset;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
Thread(Recv *recv, int idx);
virtual ~Thread();
void setCPUAffinity(CPUAffinity aff);
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Thread", "SlsDetector");
public:
enum State {
Init, Ready, Running, Stop, End,
};
void prepareAcq();
Thread(Eiger *eiger, int idx);
virtual ~Thread();
void startAcq()
{ setState(Running); }
void stopAcq()
{ setState(Ready); }
void setCPUAffinity(CPUAffinity aff);
protected:
virtual void threadFunction();
void prepareAcq();
private:
friend class Recv;
void startAcq()
{ setState(Running); }
void stopAcq()
{ setState(Ready); }
AutoMutex lock()
{ return m_recv->lock(); }
void wait()
{ m_recv->wait(); }
void broadcast()
{ m_recv->broadcast(); }
protected:
virtual void threadFunction();
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
private:
friend class Eiger;
Recv *m_recv;
int m_idx;
State m_state;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
AutoMutex lock()
{ return m_eiger->lock(); }
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_eiger->wait(); }
{ m_cond.wait(); }
void broadcast()
{ m_eiger->broadcast(); }
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
void setState(State state)
bool checkForRecvState(Thread& t)
{
AutoMutex l = lock();
m_state = state;
broadcast();
while ((t.m_state == Thread::Ready) ||
((t.m_state == Thread::Running) &&
allFramesAcquired()))
wait();
return (t.m_state == Thread::Running);
}
void processOneFrame(Thread& t, AutoMutex& l);
Eiger *m_eiger;
int m_idx;
State m_state;
Cond m_cond;
Geometry::Recv *m_geom;
Receiver *m_recv;
int m_data_offset;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
typedef std::vector<AutoPtr<Recv> > RecvList;
class CorrBase
{
......@@ -399,7 +438,8 @@ class Eiger : public Model
};
Camera *m_cam;
BadFrameData m_bfd;
int m_nb_recvs;
std::vector<BadFrameData> m_bfd_list;
};
class InterModGapCorr : public CorrBase
......@@ -530,31 +570,6 @@ class Eiger : public Model
const FrameDim& getRecvFrameDim()
{ return m_geom.m_recv_frame_dim; }
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
bool allFramesAcquired()
{ return m_next_frame == m_nb_frames; }
bool checkForRecvState(Thread& t)
{
while ((t.m_state == Thread::Ready) ||
((t.m_state == Thread::Running) && allFramesAcquired()))
wait();
return (t.m_state == Thread::Running);
}
int getNbRecvs();
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
void processOneFrame(Thread& t, AutoMutex& l);
CorrBase *createBadRecvFrameCorr();
CorrBase *createChipBorderCorr(ImageType image_type);
CorrBase *createInterModGapCorr();
......@@ -590,16 +605,9 @@ class Eiger : public Model
static const LinScale ChipXfer2Buff;
static const LinScale ChipRealReadout;
Cond m_cond;
Geometry m_geom;
CorrList m_corr_list;
RecvList m_recv_list;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
};
......
......@@ -47,6 +47,19 @@ class Model
typedef FrameMap::FinishInfo FinishInfo;
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv", "SlsDetector");
public:
virtual ~Recv();
virtual int getNbProcessingThreads() = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity) = 0;
};
Model(Camera *cam, Type type);
virtual ~Model();
......@@ -80,8 +93,6 @@ class Model
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr) = 0;
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list) = 0;
protected:
void updateCameraModel();
void updateTimeRanges();
......@@ -95,6 +106,9 @@ class Model
virtual bool checkSettings(Settings settings) = 0;
virtual int getNbRecvs() = 0;
virtual Recv *getRecv(int recv_idx) = 0;
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
......
......@@ -322,6 +322,7 @@ public:
struct RecvCPUAffinity {
SlsDetector::CPUAffinityList listeners;
SlsDetector::CPUAffinityList recv_threads;
RecvCPUAffinity();
SlsDetector::CPUAffinity all() const;
......@@ -335,12 +336,9 @@ SlsDetector::CPUAffinity RecvCPUAffinityList_all(
struct GlobalCPUAffinity {
SlsDetector::RecvCPUAffinityList recv;
SlsDetector::CPUAffinityList model_threads;
SlsDetector::CPUAffinity lima;
SlsDetector::CPUAffinity other;
SlsDetector::NetDevGroupCPUAffinityList netdev;
GlobalCPUAffinity();
SlsDetector::CPUAffinity all() const;
};
......
......@@ -100,8 +100,8 @@ class Eiger : public SlsDetector::Model
virtual bool checkSettings(SlsDetector::Defs::Settings settings);
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list);
virtual int getNbRecvs();
virtual SlsDetector::Model::Recv *getRecv(int recv_idx);
virtual void prepareAcq();
virtual void startAcq();
......
......@@ -31,12 +31,20 @@ class Model
{
public:
Model(SlsDetector::Camera *cam, SlsDetector::Type type);
virtual ~Model();
class Recv
{
public:
virtual ~Recv();
virtual int getNbProcessingThreads() = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual void setCPUAffinity(const SlsDetector::RecvCPUAffinity&
recv_affinity) = 0;
};
virtual void setThreadCPUAffinity(const SlsDetector::CPUAffinityList&
det_thread_aff_list) = 0;
Model(SlsDetector::Camera *cam, SlsDetector::Type type);
virtual ~Model();
virtual void getFrameDim(FrameDim& frame_dim /Out/,
bool raw = false) = 0;
......@@ -77,6 +85,9 @@ protected:
virtual
bool checkSettings(SlsDetector::Defs::Settings settings) = 0;
virtual int getNbRecvs() = 0;
virtual SlsDetector::Model::Recv *getRecv(int recv_idx) = 0;
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
......
......@@ -1331,21 +1331,17 @@ void SystemCPUAffinityMgr::setNetDevCPUAffinity(
}
RecvCPUAffinity::RecvCPUAffinity()
: listeners(1)
: listeners(1), recv_threads(1)
{
}
RecvCPUAffinity& RecvCPUAffinity::operator =(CPUAffinity a)
{
listeners.assign(1, a);
recv_threads.assign(1, a);
return *this;
}
GlobalCPUAffinity::GlobalCPUAffinity()
: model_threads(1)
{
}
CPUAffinity GlobalCPUAffinity::all() const
{
return (RecvCPUAffinityList_all(recv) |
......@@ -1528,7 +1524,6 @@ void GlobalCPUAffinityMgr::applyAndSet(const GlobalCPUAffinity& o)
setLimaAffinity(o.lima);
setRecvAffinity(o.recv);
setModelAffinity(o.model_threads);
if (!m_system_mgr)
m_system_mgr = new SystemCPUAffinityMgr();
......@@ -1570,19 +1565,14 @@ void GlobalCPUAffinityMgr::setRecvAffinity(
return;
m_cam->setRecvCPUAffinity(recv_affinity_list);
m_curr.recv = recv_affinity_list;
}
void GlobalCPUAffinityMgr::setModelAffinity(
const CPUAffinityList& model_affinity_list)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(model_affinity_list);
m_cam->m_model->setThreadCPUAffinity(model_affinity_list);
CPUAffinity buffer_affinity = CPUAffinityList_all(model_affinity_list);
const RecvCPUAffinityList& l = recv_affinity_list;
RecvCPUAffinity::Selector s = &RecvCPUAffinity::RecvThreads;
CPUAffinity buffer_affinity = RecvCPUAffinityList_all(l, s);
DEB_ALWAYS() << DEB_VAR1(buffer_affinity);
m_cam->m_buffer_ctrl_obj->setCPUAffinityMask(buffer_affinity);
m_curr.recv = recv_affinity_list;
}
void GlobalCPUAffinityMgr::updateRecvRestart()
......@@ -1675,7 +1665,6 @@ void GlobalCPUAffinityMgr::limaFinished()
<< m_set.lima;
setLimaAffinity(m_set.lima);
setRecvAffinity(m_set.recv);
setModelAffinity(m_set.model_threads);
}
m_state = Ready;
......@@ -1758,7 +1747,7 @@ lima::SlsDetector::operator <<(ostream& os, const NetDevGroupCPUAffinity& a)
ostream& lima::SlsDetector::operator <<(ostream& os, const RecvCPUAffinity& a)
{
os << "<";
os << "listeners=" << a.listeners;
os << "listeners=" << a.listeners << ", recv_threads=" << a.recv_threads;
return os << ">";
}
......
......@@ -655,12 +655,22 @@ void Camera::updateCPUAffinity(bool recv_restarted)
void Camera::setRecvCPUAffinity(const RecvCPUAffinityList& recv_affinity_list)
{
DEB_MEMBER_FUNCT();
unsigned int nb_recv = m_model->getNbRecvs();
unsigned int nb_aff = recv_affinity_list.size();
DEB_PARAM() << DEB_VAR1(nb_aff);
DEB_PARAM() << DEB_VAR2(nb_recv, nb_aff);
if (nb_aff != nb_recv)
THROW_HW_ERROR(InvalidValue) << "invalid affinity list size: "
<< DEB_VAR2(nb_recv, nb_aff);
RecvCPUAffinityList::const_iterator ait = recv_affinity_list.begin();
RecvList::iterator rit = m_recv_list.begin();
for (int i = 0; i < nb_aff; ++i, ++ait, ++rit)
(*rit)->setCPUAffinity(*ait);
for (int i = 0; i < nb_recv; ++i, ++ait, ++rit) {
Model::Recv *recv = m_model->getRecv(i);
const RecvCPUAffinity& aff = *ait;
recv->setNbProcessingThreads(aff.recv_threads.size());
recv->setCPUAffinity(aff);
(*rit)->setCPUAffinity(aff);
}
}
void Camera::clearAllBuffers()
......@@ -1139,8 +1149,8 @@ void Camera::getSortedBadFrameList(IntList first_idx, IntList last_idx,
{
bool all = first_idx.empty();
IntList bfl;
int nb_items = m_frame_map.getNbItems();
for (int i = 0; i < nb_items; ++i) {
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
FrameMap::Item *item = m_frame_map.getItem(i);
int first = all ? 0 : first_idx[i];
int last = all ? item->getNbBadFrames() : last_idx[i];
......
......@@ -87,6 +87,8 @@ Eiger::BadRecvFrameCorr::BadRecvFrameCorr(Eiger *eiger)
DEB_CONSTRUCTOR();
m_cam = m_eiger->getCamera();
m_nb_recvs = m_eiger->getNbRecvs();
m_bfd_list.resize(m_nb_recvs);
}
void Eiger::BadRecvFrameCorr::prepareAcq()
......@@ -94,7 +96,9 @@ void Eiger::BadRecvFrameCorr::prepareAcq()
DEB_MEMBER_FUNCT();
CorrBase::prepareAcq();
m_bfd.reset();
for (int i = 0; i < m_nb_recvs; ++i)
m_bfd_list[i].reset();
}
void Eiger::BadRecvFrameCorr::correctFrame(FrameType frame, void *ptr)
......@@ -102,26 +106,27 @@ void Eiger::BadRecvFrameCorr::correctFrame(FrameType frame, void *ptr)
DEB_MEMBER_FUNCT();
char *bptr = (char *) ptr;
IntList& bfl = m_bfd.bad_frame_list;
int& last_idx = m_bfd.last_idx;
if (bfl.empty()) {
int bad_frames = m_cam->getNbBadFrames(0);
if (bad_frames == last_idx)
return;
m_cam->getBadFrameList(0, last_idx, bad_frames, bfl);
}
IntList::iterator end = bfl.end();
if (find(bfl.begin(), end, frame) != end) {
Eiger::Geometry *geom = m_eiger->getGeometry();
for (int i = 0; i < geom->getNbRecvs(); ++i) {
for (int i = 0; i < m_nb_recvs; ++i) {
BadFrameData& bfd = m_bfd_list[i];
IntList& bfl = bfd.bad_frame_list;
int& last_idx = bfd.last_idx;
if (bfl.empty()) {
int bad_frames = m_cam->getNbBadFrames(i);
if (bad_frames == last_idx)
continue;
m_cam->getBadFrameList(i, last_idx, bad_frames, bfl);
}
IntList::iterator end = bfl.end();
if (find(bfl.begin(), end, frame) != end) {
Eiger::Geometry *geom = m_eiger->getGeometry();
Eiger::Geometry::Recv *recv = geom->getRecv(i);
recv->fillBadFrame(frame, bptr);
}
if (*(end - 1) > int(frame))
continue;
last_idx += bfl.size();
bfl.clear();
}
if (*(end - 1) > int(frame))
return;
last_idx += bfl.size();
bfl.clear();
}
Eiger::InterModGapCorr::InterModGapCorr(Eiger *eiger)
......@@ -595,6 +600,65 @@ FrameDim Eiger::Geometry::getRecvFrameDim(bool raw)
return frame_dim;
}
Eiger::Recv::Thread::Thread(Recv *recv, int idx)
: m_recv(recv), m_idx(idx)
{
DEB_MEMBER_FUNCT();
AutoMutex l = lock();
m_state = Init;
start();
struct sched_param param;
param.sched_priority = 50;
int ret = pthread_setschedparam(m_thread, SCHED_RR, &param);
if (ret != 0)
DEB_ERROR() << "Could not set real-time priority!!";