Commit 78d5b937 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

RecvCPUAffinity: control each thread affinity and Fifo NUMA node masks:

* slsReceiver: fill Fifo with 0xff in Listener only for missing packets
* Distinguish between CPUAffinity getNbSystemCPUs and getNbCPUs
* Use std::bitset<64> internally in CPUAffinity
parent ca97655f
......@@ -57,7 +57,7 @@ target_include_directories(lima${NAME} PUBLIC
target_link_libraries(lima${NAME}
limacore
slsDetectorShared slsReceiverShared zmq)
slsDetectorShared slsReceiverShared zmq numa)
limatools_set_library_soversion(lima${NAME} VERSION)
install(TARGETS lima${NAME} LIBRARY DESTINATION lib)
......
......@@ -38,17 +38,23 @@ class CPUAffinity
{
DEB_CLASS_NAMESPC(DebModCamera, "CPUAffinity", "SlsDetector");
public:
CPUAffinity(uint64_t m = 0) : m_mask(m)
CPUAffinity(uint64_t m = 0) : m_mask(internalMask(m))
{}
static void setUseSudo(bool use_sudo);
static bool getUseSudo();
static void checkSudo(std::string cmd, std::string desc = "");
static int getNbCPUs(bool max_nb = false);
static int getNbSystemCPUs(bool max_nb = false);
static int getNbHexDigits(bool max_nb = false)
{ return getNbSystemCPUs(max_nb) / 4; }
static uint64_t allCPUs(bool max_nb = false)
{ return (uint64_t(1) << getNbCPUs(max_nb)) - 1; }
{ return (uint64_t(1) << getNbSystemCPUs(max_nb)) - 1; }
int getNbCPUs() const
{ return m_mask.any() ? m_mask.count() : getNbSystemCPUs(); }
void initCPUSet(cpu_set_t& cpu_set) const;
void applyToTask(pid_t task, bool incl_threads = true,
......@@ -57,18 +63,23 @@ class CPUAffinity
void applyToNetDevGroup(StringList dev_list) const;
operator uint64_t() const
{ return m_mask ? m_mask : allCPUs(); }
{ return m_mask.any() ? m_mask.to_ulong() : allCPUs(); }
CPUAffinity& operator =(uint64_t m)
{ m_mask = m; return *this; }
CPUAffinity& operator |=(const CPUAffinity& o);
bool isDefault() const
{ return !m_mask || (m_mask == allCPUs()); }
{ return m_mask.none() || (m_mask.to_ulong() == allCPUs()); }
void getNUMANodeMask(std::vector<unsigned long>& node_mask,
int& max_node);
static std::string getProcDir(bool local_threads);
static std::string getTaskProcDir(pid_t task, bool is_thread);
private:
static uint64_t internalMask(uint64_t m)
{ return (m != allCPUs()) ? m : 0; }
void applyWithTaskset(pid_t task, bool incl_threads) const;
void applyWithSetAffinity(pid_t task, bool incl_threads) const;
bool applyWithNetDevFile(const std::string& fname) const;
......@@ -76,14 +87,14 @@ class CPUAffinity
const std::string& queue) const;
static bool UseSudo;
static int findNbCPUs();
static int findMaxNbCPUs();
static int findNbSystemCPUs();
static int findMaxNbSystemCPUs();
static std::string getNetDevSetterSudoDesc();
static const std::string NetDevSetQueueRpsName;
static const StringList NetDevSetQueueRpsSrc;
uint64_t m_mask;
std::bitset<64> m_mask;
};
inline
......@@ -99,6 +110,22 @@ bool operator !=(const CPUAffinity& a, const CPUAffinity& b)
return !(a == b);
}
inline
CPUAffinity operator |(const CPUAffinity& a, const CPUAffinity& b)
{
if (a.isDefault() || b.isDefault())
return CPUAffinity();
return CPUAffinity(uint64_t(a) | uint64_t(b));
}
inline
CPUAffinity& CPUAffinity::operator |=(const CPUAffinity& o)
{
return *this = *this | o;
}
typedef std::vector<CPUAffinity> CPUAffinityList;
struct NetDevGroupCPUAffinity {
StringList name_list;
......@@ -202,19 +229,13 @@ class SystemCPUAffinityMgr
};
struct RecvCPUAffinity {
CPUAffinity listeners;
CPUAffinity writers;
CPUAffinity all() const
{
return listeners | writers;
}
CPUAffinityList listeners;
CPUAffinityList writers;
CPUAffinityList port_threads;
RecvCPUAffinity& operator =(CPUAffinity a)
{
listeners = writers = a;
return *this;
}
RecvCPUAffinity();
CPUAffinity all() const;
RecvCPUAffinity& operator =(CPUAffinity a);
};
inline
......@@ -235,6 +256,7 @@ struct GlobalCPUAffinity {
CPUAffinity lima;
CPUAffinity other;
NetDevGroupCPUAffinityList netdev;
CPUAffinity all() const;
};
typedef std::map<PixelDepth, GlobalCPUAffinity> PixelDepthCPUAffinityMap;
......@@ -335,6 +357,7 @@ class GlobalCPUAffinityMgr
};
std::ostream& operator <<(std::ostream& os, const CPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const CPUAffinityList& a);
std::ostream& operator <<(std::ostream& os, const RecvCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const GlobalCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const PixelDepthCPUAffinityMap& m);
......
......@@ -28,6 +28,7 @@
using namespace std;
typedef set<int> CPP_SeqType;
typedef CPP_SeqType::value_type CPP_SeqValType;
%End
%ConvertToTypeCode
......@@ -40,7 +41,7 @@ typedef set<int> CPP_SeqType;
*sipCppPtr = new CPP_SeqType();
for (int i = 0; i < PyList_Size(sipPy); ++i) {
int val = PyInt_AS_LONG(PyList_GET_ITEM(sipPy, i));
CPP_SeqValType val = PyInt_AS_LONG(PyList_GET_ITEM(sipPy, i));
(*sipCppPtr)->insert(val);
}
return sipGetState(sipTransferObj);
......@@ -57,3 +58,41 @@ typedef set<int> CPP_SeqType;
%End
};
%MappedType std::vector<unsigned long>
{
%TypeHeaderCode
#include <vector>
using namespace std;
typedef vector<unsigned long> CPP_SeqType;
typedef CPP_SeqType::value_type CPP_SeqValType;
%End
%ConvertToTypeCode
if (sipIsErr == NULL) {
bool ok = PyList_Check(sipPy);
for (int i = 0; ok && (i < PyList_Size(sipPy)); ++i)
ok = PyLong_Check(PyList_GET_ITEM(sipPy, i));
return ok;
}
*sipCppPtr = new CPP_SeqType();
for (int i = 0; i < PyList_Size(sipPy); ++i) {
CPP_SeqValType val = PyLong_AsLong(PyList_GET_ITEM(sipPy, i));
(*sipCppPtr)->push_back(val);
}
return sipGetState(sipTransferObj);
%End
%ConvertFromTypeCode
PyObject* sip_list = PyList_New(sipCpp->size());
if (sip_list == NULL)
return NULL;
CPP_SeqType::iterator it = sipCpp->begin();
for (int i = 0; it != sipCpp->end(); ++it, ++i)
PyList_SET_ITEM(sip_list, i, PyLong_FromLong(*it));
return sip_list;
%End
};
......@@ -21,6 +21,79 @@
//###########################################################################
// namespace SlsDetector
// {
// typedef std::vector<CPUAffinity> CPUAffinityList;
// };
%MappedType SlsDetector::CPUAffinityList
{
%TypeHeaderCode
#include <vector>
#include "SlsDetectorCPUAffinity.h"
#include "sipAPIlimaslsdetector.h"
using namespace std;
using namespace lima;
using namespace lima::SlsDetector;
typedef vector<CPUAffinity> CPP_SeqType;
typedef CPP_SeqType::value_type CPP_SeqValType;
#define SIP_ValueType sipType_SlsDetector_CPUAffinity
%End
%ConvertToTypeCode
if (sipIsErr == NULL) {
bool ok = PyList_Check(sipPy);
for (int i = 0; ok && (i < PyList_Size(sipPy)); ++i) {
PyObject *value = PyList_GET_ITEM(sipPy, i);
ok = sipCanConvertToType(value, SIP_ValueType,
SIP_NOT_NONE);
}
return ok;
}
CPP_SeqType *cpp_seq = new CPP_SeqType();
for (int i = 0; i < PyList_Size(sipPy); ++i) {
PyObject *value = PyList_GET_ITEM(sipPy, i);
int state;
void *p = sipConvertToType(value, SIP_ValueType, NULL,
SIP_NOT_NONE, &state, sipIsErr);
CPP_SeqValType *cpp_value = static_cast<CPP_SeqValType *>(p);
if (*sipIsErr) {
sipReleaseType(cpp_value, SIP_ValueType, state);
delete cpp_seq;
return 0;
}
cpp_seq->push_back(*cpp_value);
sipReleaseType(cpp_value, SIP_ValueType, state);
}
*sipCppPtr = cpp_seq;
return sipGetState(sipTransferObj);
%End
%ConvertFromTypeCode
PyObject* sip_list = PyList_New(sipCpp->size());
if (sip_list == NULL)
return NULL;
CPP_SeqType::iterator it = sipCpp->begin();
for (int i = 0; it != sipCpp->end(); ++it, ++i) {
CPP_SeqValType *new_val = new CPP_SeqValType(*it);
PyObject *value = sipConvertFromNewType(new_val, SIP_ValueType,
NULL);
if (value == NULL) {
delete new_val;
Py_DECREF(sip_list);
return NULL;
}
PyList_SET_ITEM(sip_list, i, value);
}
return sip_list;
%End
};
// namespace SlsDetector
// {
// typedef std::vector<SlsDetector::NetDevGroupCPUAffinity>
......@@ -38,8 +111,8 @@ using namespace std;
using namespace lima;
using namespace lima::SlsDetector;
typedef NetDevGroupCPUAffinity CPP_ValueType;
typedef vector<NetDevGroupCPUAffinity> CPP_SeqType;
typedef CPP_SeqType::value_type CPP_SeqValType;
#define SIP_ValueType sipType_SlsDetector_NetDevGroupCPUAffinity
%End
......@@ -60,7 +133,7 @@ typedef vector<NetDevGroupCPUAffinity> CPP_SeqType;
int state;
void *p = sipConvertToType(value, SIP_ValueType, NULL,
SIP_NOT_NONE, &state, sipIsErr);
CPP_ValueType *cpp_value = static_cast<CPP_ValueType *>(p);
CPP_SeqValType *cpp_value = static_cast<CPP_SeqValType *>(p);
if (*sipIsErr) {
sipReleaseType(cpp_value, SIP_ValueType, state);
delete cpp_seq;
......@@ -80,7 +153,7 @@ typedef vector<NetDevGroupCPUAffinity> CPP_SeqType;
return NULL;
CPP_SeqType::iterator it = sipCpp->begin();
for (int i = 0; it != sipCpp->end(); ++it, ++i) {
CPP_ValueType *new_val = new CPP_ValueType(*it);
CPP_SeqValType *new_val = new CPP_SeqValType(*it);
PyObject *value = sipConvertFromNewType(new_val, SIP_ValueType,
NULL);
if (value == NULL) {
......@@ -111,9 +184,9 @@ using namespace std;
using namespace lima;
using namespace lima::SlsDetector;
typedef PixelDepth CPP_KeyType;
typedef GlobalCPUAffinity CPP_ValueType;
typedef PixelDepthCPUAffinityMap CPP_MapType;
typedef CPP_MapType::key_type CPP_MapKeyType;
typedef CPP_MapType::mapped_type CPP_MappedType;
#define SIP_ValueType sipType_SlsDetector_GlobalCPUAffinity
%End
......@@ -134,11 +207,11 @@ typedef PixelDepthCPUAffinityMap CPP_MapType;
PyObject *key, *value;
SIP_SSIZE_T pos = 0;
while (PyDict_Next(sipPy, &pos, &key, &value)) {
CPP_KeyType cpp_key = CPP_KeyType(PyInt_AS_LONG(key));
CPP_MapKeyType cpp_key = CPP_MapKeyType(PyInt_AS_LONG(key));
int state;
void *p = sipConvertToType(value, SIP_ValueType, NULL,
SIP_NOT_NONE, &state, sipIsErr);
CPP_ValueType *cpp_value = static_cast<CPP_ValueType *>(p);
CPP_MappedType *cpp_value = static_cast<CPP_MappedType *>(p);
if (*sipIsErr) {
sipReleaseType(cpp_value, SIP_ValueType, state);
delete cpp_map;
......@@ -163,7 +236,7 @@ typedef PixelDepthCPUAffinityMap CPP_MapType;
PyObject *key = PyInt_FromLong(it->first);
if (key == NULL)
goto error;
CPP_ValueType *new_val = new CPP_ValueType(it->second);
CPP_MappedType *new_val = new CPP_MappedType(it->second);
PyObject *value = sipConvertFromNewType(new_val, SIP_ValueType,
NULL);
bool ok = (value != NULL);
......@@ -201,9 +274,12 @@ public:
static bool getUseSudo();
static void checkSudo(std::string cmd, std::string desc = "");
static int getNbCPUs(bool max_nb = false);
static int getNbSystemCPUs(bool max_nb = false);
static int getNbHexDigits(bool max_nb = false);
static unsigned long allCPUs(bool max_nb = false);
int getNbCPUs() const;
//void initCPUSet(cpu_set_t& cpu_set) const;
void applyToTask(int task, bool incl_threads = true,
bool use_taskset = true) const;
......@@ -211,14 +287,19 @@ public:
operator unsigned long() const;
//CPUAffinity& operator =(unsigned long m);
SlsDetector::CPUAffinity& operator |=(const SlsDetector::CPUAffinity& o);
bool isDefault() const;
void getNUMANodeMask(std::vector<unsigned long>& node_mask /Out/,
int& max_node /Out/);
static std::string getProcDir(bool local_threads);
static std::string getTaskProcDir(int task, bool is_thread);
};
// typedef std::vector<CPUAffinity> CPUAffinityList;
struct NetDevGroupCPUAffinity {
std::vector<std::string> name_list;
SlsDetector::CPUAffinity processing;
......@@ -253,9 +334,11 @@ public:
};
struct RecvCPUAffinity {
SlsDetector::CPUAffinity listeners;
SlsDetector::CPUAffinity writers;
SlsDetector::CPUAffinityList listeners;
SlsDetector::CPUAffinityList writers;
SlsDetector::CPUAffinityList port_threads;
RecvCPUAffinity();
SlsDetector::CPUAffinity all() const;
// RecvCPUAffinity& operator =(CPUAffinity a);
};
......
Subproject commit ea08164d4f92d02fbce9be757ce6884c095feaa3
Subproject commit f3b604c944bedf3ef3b044e68606e0cfd951e1a7
......@@ -33,6 +33,8 @@
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <numa.h>
#include <iomanip>
using namespace std;
using namespace lima;
......@@ -88,7 +90,7 @@ void CPUAffinity::checkSudo(string cmd, string desc)
<< "See output for details";
}
int CPUAffinity::findNbCPUs()
int CPUAffinity::findNbSystemCPUs()
{
DEB_STATIC_FUNCT();
int nb_cpus = 0;
......@@ -107,7 +109,7 @@ int CPUAffinity::findNbCPUs()
return nb_cpus;
}
int CPUAffinity::findMaxNbCPUs()
int CPUAffinity::findMaxNbSystemCPUs()
{
DEB_STATIC_FUNCT();
NumericGlob proc_glob("/proc/sys/kernel/sched_domain/cpu");
......@@ -116,12 +118,12 @@ int CPUAffinity::findMaxNbCPUs()
return max_nb_cpus;
}
int CPUAffinity::getNbCPUs(bool max_nb)
int CPUAffinity::getNbSystemCPUs(bool max_nb)
{
static int nb_cpus = 0;
EXEC_ONCE(nb_cpus = findNbCPUs());
EXEC_ONCE(nb_cpus = findNbSystemCPUs());
static int max_nb_cpus = 0;
EXEC_ONCE(max_nb_cpus = findMaxNbCPUs());
EXEC_ONCE(max_nb_cpus = findMaxNbSystemCPUs());
int cpus = (max_nb && max_nb_cpus) ? max_nb_cpus : nb_cpus;
return cpus;
}
......@@ -188,9 +190,7 @@ void CPUAffinity::applyWithTaskset(pid_t task, bool incl_threads) const
os << "sudo -n ";
}
const char *all_tasks_opt = incl_threads ? "-a " : "";
os << "taskset " << all_tasks_opt
<< "-p " << hex << showbase << mask << " "
<< dec << noshowbase << task;
os << "taskset " << all_tasks_opt << "-p " << *this << " " << task;
if (!DEB_CHECK_ANY(DebTypeTrace))
os << " > /dev/null 2>&1";
DEB_TRACE() << "executing: '" << os.str() << "'";
......@@ -286,7 +286,7 @@ bool CPUAffinity::applyWithNetDevFile(const string& fname) const
DEB_MEMBER_FUNCT();
ostringstream os;
os << noshowbase << hex << m_mask;
os << hex << m_mask.to_ulong();
DEB_TRACE() << "writing " << os.str() << " to " << fname;
ofstream rps_file(fname.c_str());
if (rps_file)
......@@ -313,7 +313,7 @@ bool CPUAffinity::applyWithNetDevSetter(const string& dev,
os << "sudo -n ";
}
os << NetDevSetQueueRpsName << " " << dev << " " << queue << " "
<< showbase << hex << m_mask;
<< hex << "0x" << m_mask.to_ulong();
if (!DEB_CHECK_ANY(DebTypeTrace) && false)
os << " > /dev/null 2>&1";
DEB_TRACE() << "executing: '" << os.str() << "'";
......@@ -411,6 +411,45 @@ static const char *CPUAffinityNetDevSetQueueRpsSrcCList[] = {
const StringList CPUAffinity::NetDevSetQueueRpsSrc(
C_LIST_ITERS(CPUAffinityNetDevSetQueueRpsSrcCList));
void CPUAffinity::getNUMANodeMask(vector<unsigned long>& node_mask,
int& max_node)
{
DEB_MEMBER_FUNCT();
typedef vector<unsigned long> Array;
int nb_nodes = numa_max_node() + 1;
const int item_bits = sizeof(Array::reference) * 8;
int nb_items = nb_nodes / item_bits;
if (nb_nodes % item_bits != 0)
++nb_items;
DEB_PARAM() << DEB_VAR2(*this, nb_items);
max_node = nb_nodes + 1;
node_mask.assign(nb_items, 0);
uint64_t mask = *this;
for (unsigned int i = 0; i < sizeof(mask) * 8; ++i) {
if ((mask >> i) & 1) {
unsigned int n = numa_node_of_cpu(i);
Array::reference v = node_mask[n / item_bits];
v |= 1L << (n % item_bits);
}
}
if (DEB_CHECK_ANY(DebTypeAlways)) {
ostringstream os;
os << hex << "0x" << setw(nb_nodes / 4) << setfill('0');
bool first = true;
Array::reverse_iterator it, end = node_mask.rend();
for (it = node_mask.rbegin(); it != end; ++it, first = false)
os << (!first ? "," : "") << *it;
DEB_ALWAYS() << "node_mask=" << os.str() << ", "
<< DEB_VAR1(max_node);
}
}
SystemCPUAffinityMgr::WatchDog::WatchDog()
{
DEB_CONSTRUCTOR();
......@@ -774,6 +813,44 @@ void SystemCPUAffinityMgr::setNetDevCPUAffinity(
checkWatchDogStop();
}
RecvCPUAffinity::RecvCPUAffinity()
: listeners(1), writers(1), port_threads(1)
{
}
CPUAffinity RecvCPUAffinity::all() const
{
#define AffinityAllAndList(a, l) \
CPUAffinity a = l[0]; \
{ \
CPUAffinityList::const_iterator it, end = l.end(); \
for (it = l.begin() + 1; it != end; ++it) \
a |= *it; \
}
AffinityAllAndList(l_all, listeners);
AffinityAllAndList(w_all, writers);
AffinityAllAndList(pt_all, port_threads);
#undef AffinityAllAndList
return l_all | w_all | pt_all;
}
RecvCPUAffinity& RecvCPUAffinity::operator =(CPUAffinity a)
{
listeners.assign(1, a);
writers.assign(1, a);
port_threads.assign(1, a);
return *this;
}
CPUAffinity GlobalCPUAffinity::all() const
{
CPUAffinity all = recv.all() | lima | other;
NetDevGroupCPUAffinityList::const_iterator it, end = netdev.end();
for (it = netdev.begin(); it != end; ++it)
all |= it->processing;
return all;
}
GlobalCPUAffinityMgr::
ProcessingFinishedEvent::ProcessingFinishedEvent(GlobalCPUAffinityMgr *mgr)
: m_mgr(mgr), m_cb(this), m_ct(NULL)
......@@ -937,11 +1014,9 @@ void GlobalCPUAffinityMgr::applyAndSet(const GlobalCPUAffinity& o)
if (!m_cam)
THROW_HW_ERROR(InvalidValue) << "apply without camera";
CPUAffinity all_system = o.recv.all() | o.lima | o.other;
cpu_set_t all_cpu_set;
all_system.initCPUSet(all_cpu_set);
if (CPU_COUNT(&all_cpu_set) <= CPUAffinity::getNbCPUs() / 2)
THROW_HW_ERROR(Error) << "Hyper-threading is activated!";
CPUAffinity all_system = o.all();
if (all_system.getNbCPUs() == CPUAffinity::getNbSystemCPUs() / 2)
DEB_WARNING() << "Hyper-threading seems activated!";
setLimaAffinity(o.lima);
setRecvAffinity(o.recv);
......@@ -1117,13 +1192,26 @@ void GlobalCPUAffinityMgr::cleanUp()
ostream& lima::SlsDetector::operator <<(ostream& os, const CPUAffinity& a)
{
return os << hex << showbase << uint64_t(a) << dec << noshowbase;
return os << hex << "0x" << setw(CPUAffinity::getNbHexDigits())
<< setfill('0') << uint64_t(a) << dec << setw(0)
<< setfill(' ');
}
ostream&
lima::SlsDetector::operator <<(ostream& os, const CPUAffinityList& l)
{
os << "<";
bool first = true;
CPUAffinityList::const_iterator it, end = l.end();
for (it = l.begin(); it != end; ++it, first=false)
os << (!first ? ", " : "") << *it;
return os << ">";
}
ostream& lima::SlsDetector::operator <<(ostream& os, const RecvCPUAffinity& a)
{
os << "<";
os << "listeners=" << a.listeners << ", writers=" << a.writers;
os << "listeners=" << a.listeners << ", writers=" << a.writers << ", "
<< "port_threads=" << a.port_threads;
return os << ">";
}
......
......@@ -299,7 +299,7 @@ Camera::Camera(string config_fname)
{
DEB_CONSTRUCTOR();
CPUAffinity::getNbCPUs();
CPUAffinity::getNbSystemCPUs();
m_input_data = new AppInputData(config_fname);
......@@ -623,28 +623,65 @@ void Camera::setRecvCPUAffinity(const RecvCPUAffinity& recv_affinity)
{
DEB_MEMBER_FUNCT();
CPUAffinity listeners_affinity = recv_affinity.listeners;
CPUAffinity writers_affinity = recv_affinity.writers;
cpu_set_t list_cpu_set;
listeners_affinity.initCPUSet(list_cpu_set);
cpu_set_t writ_cpu_set;
writers_affinity.initCPUSet(writ_cpu_set);
CPUAffinityList::const_iterator lit = recv_affinity.listeners.begin();
CPUAffinityList::const_iterator wit = recv_affinity.writers.begin();
RecvList::iterator it, end = m_recv_list.end();
for (it = m_recv_list.begin(); it != end; ++it) {
Receiver *recv = *it;
DEB_TRACE() << "setting recv " << recv->m_idx << " "
<< "listeners CPU mask to " << listeners_affinity;
DEB_TRACE() << "setting recv " << recv->m_idx << " "
<< "writers CPU mask to " << writers_affinity;
slsReceiverUsers