Commit b4f40882 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

CPUAffinity: support more than 64 CPUs

parent a4865445
......@@ -136,20 +136,27 @@ class SystemCmdPipe
SystemCmd m_cmd;
};
class CPUAffinity
{
DEB_CLASS_NAMESPC(DebModCamera, "CPUAffinity", "SlsDetector");
public:
CPUAffinity(uint64_t m = 0) : m_mask(internalMask(m))
{}
static constexpr int MaxNbCPUs = NumaSoftBufferAllocMgr::MaxNbCPUs;
static constexpr int NbULongBits = sizeof(unsigned long) * 8;
static constexpr int NbULongs = MaxNbCPUs / NbULongBits;
typedef std::bitset<MaxNbCPUs> Mask;
typedef unsigned long ULongArray[NbULongs];
CPUAffinity() {}
CPUAffinity(const Mask& m) : m_mask(m) {}
static int getNbSystemCPUs(bool max_nb = false);
static int getNbHexDigits(bool max_nb = false)
{ return getNbSystemCPUs(max_nb) / 4; }
static uint64_t allCPUs(bool max_nb = false)
{ return (uint64_t(1) << getNbSystemCPUs(max_nb)) - 1; }
static const Mask& allCPUs(bool max_nb = false);
int getNbCPUs() const
{ return m_mask.any() ? m_mask.count() : getNbSystemCPUs(); }
......@@ -158,19 +165,16 @@ class CPUAffinity
void applyToTask(pid_t task, bool incl_threads = true,
bool use_taskset = true) const;
uint64_t getMask() const
{ return m_mask.any() ? m_mask.to_ulong() : allCPUs(); }
uint64_t getZeroDefaultMask() const
{ return m_mask.to_ulong(); }
const Mask &getMask() const
{ return m_mask.any() ? m_mask : allCPUs(); }
operator uint64_t() const
{ return getMask(); }
const Mask &getZeroDefaultMask() const
{ return m_mask; }
CPUAffinity& operator |=(const CPUAffinity& o);
bool isDefault() const
{ return m_mask.none() || (m_mask.to_ulong() == allCPUs()); }
{ return m_mask.none() || (m_mask == allCPUs()); }
void getNUMANodeMask(std::vector<unsigned long>& node_mask,
int& max_node);
......@@ -178,8 +182,27 @@ class CPUAffinity
static std::string getProcDir(bool local_threads);
static std::string getTaskProcDir(pid_t task, bool is_thread);
static void maskToULongArray(const Mask& mask, ULongArray& array);
static Mask maskFromULongArray(const ULongArray& array);
static std::string maskToString(const Mask& mask, int base = 16,
bool comma_sep = false);
static Mask maskFromString(std::string aff_str, int base = 16);
void toULongArray(ULongArray& array) const
{ maskToULongArray(getMask(), array); }
static CPUAffinity fromULongArray(const ULongArray& array)
{ return maskFromULongArray(array); }
std::string toString(int base = 16, bool comma_sep = false) const
{ return maskToString(getMask(), base, comma_sep); }
static CPUAffinity fromString(std::string aff_str, int base = 16)
{ return maskFromString(aff_str, base); }
private:
static uint64_t internalMask(uint64_t m)
static Mask internalMask(Mask m)
{ return (m != allCPUs()) ? m : 0; }
void applyWithTaskset(pid_t task, bool incl_threads) const;
......@@ -188,13 +211,13 @@ class CPUAffinity
static int findNbSystemCPUs();
static int findMaxNbSystemCPUs();
std::bitset<64> m_mask;
Mask m_mask;
};
inline
bool operator ==(const CPUAffinity& a, const CPUAffinity& b)
{
uint64_t mask = CPUAffinity::allCPUs();
CPUAffinity::Mask mask = CPUAffinity::allCPUs();
return (a.getMask() & mask) == (b.getMask() & mask);
}
......@@ -215,7 +238,8 @@ CPUAffinity operator |(const CPUAffinity& a, const CPUAffinity& b)
inline
CPUAffinity& CPUAffinity::operator |=(const CPUAffinity& o)
{
return *this = *this | o;
m_mask |= o.m_mask;
return *this;
}
typedef std::vector<CPUAffinity> CPUAffinityList;
......@@ -397,9 +421,9 @@ class SystemCPUAffinityMgr
~SystemCPUAffinityMgr();
static ProcList getProcList(Filter filter = All,
CPUAffinity cpu_affinity = 0);
CPUAffinity cpu_affinity = {});
static ProcList getThreadList(Filter filter = All,
CPUAffinity cpu_affinity = 0);
CPUAffinity cpu_affinity = {});
void setOtherCPUAffinity(CPUAffinity cpu_affinity);
void setNetDevCPUAffinity(
......@@ -430,17 +454,19 @@ class SystemCPUAffinityMgr
typedef uint64_t Arg;
typedef char String[StringLen];
typedef CPUAffinity::ULongArray Mask;
struct Packet {
Cmd cmd;
union Union {
uint64_t proc_affinity;
Mask proc_affinity;
struct NetDevAffinity {
String name_list;
unsigned int queue_affinity_len;
struct QueueAffinity {
int queue;
uint64_t irq;
uint64_t processing;
Mask irq;
Mask processing;
} queue_affinity[AffinityMapLen];
} netdev_affinity;
} u;
......@@ -465,7 +491,7 @@ class SystemCPUAffinityMgr
void childFunction();
void procAffinitySetter(CPUAffinity cpu_affinity);
NetDevGroupCPUAffinity netDevAffinityEncode(
NetDevGroupCPUAffinity netDevAffinityDecode(
const Packet& packet);
void netDevAffinitySetter(
const NetDevGroupCPUAffinity& netdev_affinity);
......
......@@ -209,12 +209,60 @@ public:
class CPUAffinity
{
%TypeCode
typedef ::SlsDetector::CPUAffinity::Mask CPUMask;
PyObject *maskToPyLong(const CPUMask& mask)
{
std::string r = mask.to_string();
return PyLong_FromString(r.c_str(), NULL, 2);
}
CPUMask pyBytesToCPUMask(PyObject *py_bytes)
{
char *r = PyBytes_AsString(py_bytes);
if (!r)
throw LIMA_HW_EXC(InvalidValue, "Invalid PyBytes");
int base = 2;
Py_ssize_t len = PyBytes_Size(py_bytes);
if ((len > 2) && (strncmp(r, "0x", 2) == 0))
base = 16;
return ::SlsDetector::CPUAffinity::maskFromString(r, base);
}
%End
public:
CPUAffinity(unsigned long m = 0);
CPUAffinity();
CPUAffinity(SIP_PYOBJECT mask);
%MethodCode
Py_BEGIN_ALLOW_THREADS
try
{
if (!PyBytes_Check(a0))
throw LIMA_HW_EXC(InvalidValue, "CPUAffinity accepts "
"PyBytes objects only");
CPUMask mask = pyBytesToCPUMask(a0);
sipCpp = new ::SlsDetector::CPUAffinity(mask);
}
catch (...)
{
Py_BLOCK_THREADS
sipRaiseUnknownException();
return NULL;
}
Py_END_ALLOW_THREADS
%End
static int getNbSystemCPUs(bool max_nb = false);
static int getNbHexDigits(bool max_nb = false);
static unsigned long allCPUs(bool max_nb = false);
static SIP_PYOBJECT allCPUs(bool max_nb = false);
%MethodCode
CPUMask mask;
Py_BEGIN_ALLOW_THREADS
mask = ::SlsDetector::CPUAffinity::allCPUs(a0);
Py_END_ALLOW_THREADS
sipRes = maskToPyLong(mask);
%End
int getNbCPUs() const;
......@@ -222,10 +270,23 @@ public:
void applyToTask(int task, bool incl_threads = true,
bool use_taskset = true) const;
unsigned long getMask() const;
unsigned long getZeroDefaultMask() const;
SIP_PYOBJECT getMask() const;
%MethodCode
CPUMask mask;
Py_BEGIN_ALLOW_THREADS
mask = sipCpp->getMask();
Py_END_ALLOW_THREADS
sipRes = maskToPyLong(mask);
%End
operator unsigned long() const;
SIP_PYOBJECT getZeroDefaultMask() const;
%MethodCode
CPUMask mask;
Py_BEGIN_ALLOW_THREADS
mask = sipCpp->getZeroDefaultMask();
Py_END_ALLOW_THREADS
sipRes = maskToPyLong(mask);
%End
SlsDetector::CPUAffinity& operator |=(const SlsDetector::CPUAffinity& o);
......@@ -310,11 +371,11 @@ public:
static std::vector<int> getProcList(
SlsDetector::SystemCPUAffinityMgr::Filter filter
= SlsDetector::SystemCPUAffinityMgr::All,
SlsDetector::CPUAffinity cpu_affinity = 0);
SlsDetector::CPUAffinity cpu_affinity = CPUAffinity());
static std::vector<int> getThreadList(
SlsDetector::SystemCPUAffinityMgr::Filter filter
= SlsDetector::SystemCPUAffinityMgr::All,
SlsDetector::CPUAffinity cpu_affinity = 0);
SlsDetector::CPUAffinity cpu_affinity = CPUAffinity());
void setOtherCPUAffinity(
SlsDetector::CPUAffinity affinity);
......
......@@ -79,7 +79,7 @@ void BufferMgr::setAcqBufferCPUAffinity(CPUAffinity buffer_affinity)
DEB_ALWAYS() << DEB_VAR1(buffer_affinity);
BufferCtrlObj *buffer = getBufferCtrlObj();
if (buffer)
buffer->setCPUAffinityMask(buffer_affinity);
buffer->setCPUAffinityMask(buffer_affinity.getMask());
m_buffer_affinity = buffer_affinity;
}
......
......@@ -330,10 +330,28 @@ int CPUAffinity::getNbSystemCPUs(bool max_nb)
EXEC_ONCE(nb_cpus = findNbSystemCPUs());
static int max_nb_cpus = 0;
EXEC_ONCE(max_nb_cpus = findMaxNbSystemCPUs());
if (max_nb_cpus > MaxNbCPUs)
throw LIMA_HW_EXC(Error, "Too many CPUS: ")
<< max_nb_cpus << ", MaxNbCPUs=" << MaxNbCPUs;
int cpus = (max_nb && max_nb_cpus) ? max_nb_cpus : nb_cpus;
return cpus;
}
const CPUAffinity::Mask &CPUAffinity::allCPUs(bool max_nb)
{
if (max_nb) {
static Mask max_nb_mask;
EXEC_ONCE(for (int i = 0; i < getNbSystemCPUs(true); ++i)
max_nb_mask.set(i));
return max_nb_mask;
} else {
static Mask mask;
EXEC_ONCE(for (int i = 0; i < getNbSystemCPUs(false); ++i)
mask.set(i));
return mask;
}
}
std::string CPUAffinity::getProcDir(bool local_threads)
{
DEB_STATIC_FUNCT();
......@@ -361,9 +379,9 @@ std::string CPUAffinity::getTaskProcDir(pid_t task, bool is_thread)
void CPUAffinity::initCPUSet(cpu_set_t& cpu_set) const
{
CPU_ZERO(&cpu_set);
uint64_t mask = getMask();
for (unsigned int i = 0; i < sizeof(mask) * 8; ++i) {
if ((mask >> i) & 1)
Mask mask = getMask();
for (unsigned int i = 0; i < MaxNbCPUs; ++i) {
if (mask.test(i))
CPU_SET(i, &cpu_set);
}
}
......@@ -452,9 +470,9 @@ void CPUAffinity::getNUMANodeMask(vector<unsigned long>& node_mask,
node_mask.assign(nb_items, 0);
uint64_t mask = getMask();
for (unsigned int i = 0; i < sizeof(mask) * 8; ++i) {
if ((mask >> i) & 1) {
Mask mask = getMask();
for (unsigned int i = 0; i < MaxNbCPUs; ++i) {
if (mask.test(i)) {
unsigned int n = numa_node_of_cpu(i);
Array::reference v = node_mask[n / item_bits];
v |= 1L << (n % item_bits);
......@@ -473,6 +491,95 @@ void CPUAffinity::getNUMANodeMask(vector<unsigned long>& node_mask,
}
}
void CPUAffinity::maskToULongArray(const Mask& mask, ULongArray& array)
{
constexpr Mask ULongMask(std::numeric_limits<unsigned long>::max());
for (int i = 0; i < NbULongs; ++i)
array[i] = ((mask >> (i * NbULongBits)) & ULongMask).to_ulong();
}
CPUAffinity::Mask CPUAffinity::maskFromULongArray(const ULongArray& array)
{
Mask mask;
for (int i = 0; i < NbULongs; ++i)
mask |= Mask(array[i]) << (i * NbULongBits);
return mask;
}
string CPUAffinity::maskToString(const Mask& mask, int base, bool comma_sep)
{
if (base == 2)
return mask.to_string();
else if (base != 16)
throw LIMA_HW_EXC(InvalidValue, "Invalid base: ") << base;
ostringstream os;
ULongArray array;
maskToULongArray(mask, array);
os << hex << setfill('0');
string sep;
for (int i = NbULongs - 1; i >= 0; --i) {
unsigned long ul_val = array[i];
constexpr int NbULongInts = NbULongBits / 32;
for (int j = NbULongInts - 1; j >= 0; --j) {
uint32_t ui_val = (ul_val >> (j * 32)) & 0xffffffff;
os << sep << setw(32 / 4) << ui_val;
if (sep.empty() && comma_sep)
sep = ",";
}
}
return os.str();
}
CPUAffinity::Mask CPUAffinity::maskFromString(string aff_str, int base)
{
if ((base != 2) && (base != 16))
throw LIMA_HW_EXC(InvalidValue, "Invalid base: ") << base;
if (!aff_str.empty() && (base == 16) && (aff_str.find("0x") == 0))
aff_str.erase(0, 2);
size_t len = aff_str.size();
if (len && (aff_str.rfind('L') == len - 1))
aff_str.erase(len - 1);
#define convert_to_ulong(s, v) \
do { \
size_t pos; \
v = stoul(s, &pos, base); \
if (pos != s.size()) \
throw LIMA_HW_EXC(InvalidValue, "Invalid mask: ") \
<< aff_str; \
} while (0)
ULongArray array;
const size_t bits_per_char = (base == 16) ? 4 : 1;
const size_t chars_per_ulong = NbULongBits / bits_per_char;
int idx = 0;
string val;
string::const_reverse_iterator it, end = aff_str.rend();
for (it = aff_str.rbegin(); it != end; ++it) {
if (*it == ',')
continue;
else if (idx == NbULongs)
throw LIMA_HW_EXC(InvalidValue, "Invalid mask: ")
<< aff_str;
val.insert(0, 1, *it);
if (val.size() == chars_per_ulong) {
convert_to_ulong(val, array[idx++]);
val.clear();
}
}
if (!val.empty())
convert_to_ulong(val, array[idx++]);
while (idx < NbULongs)
array[idx++] = 0;
#undef convert_to_ulong
return maskFromULongArray(array);
}
bool IrqMgr::m_irqbalance_stopped = false;
StringList IrqMgr::m_dev_list;
......@@ -531,15 +638,15 @@ IntList IrqMgr::getIrqList()
ifstream proc_ints("/proc/interrupts");
ostringstream os;
int nb_cpus = CPUAffinity::getNbSystemCPUs();
string info_token_re = "([^ \t]+)[ \t]*";
os << "[ \t]*"
<< "(?P<irq>[0-9]+):[ \t]+"
<< "(?P<counts>(([0-9]+)[ \t]+){" << nb_cpus << "})"
<< "(?P<type>[-A-Za-z0-9_]+)[ \t]+"
<< "(?P<name>[-A-Za-z0-9_]+)";
<< "(?P<info>(" << info_token_re << "){2,4})";
DEB_TRACE() << DEB_VAR1(os.str());
RegEx int_re(os.str());
while (proc_ints) {
char buffer[1024];
char buffer[4096];
proc_ints.getline(buffer, sizeof(buffer));
string s = buffer;
DEB_TRACE() << DEB_VAR1(s);
......@@ -548,8 +655,14 @@ IntList IrqMgr::getIrqList()
continue;
int irq;
istringstream(match["irq"]) >> irq;
DEB_TRACE() << "found match: " << irq << ": "
<< string(match["name"]);
if (DEB_CHECK_ANY(DebTypeTrace)) {
string info = match["info"];
SimpleRegEx info_re(info_token_re);
RegEx::MatchListType match_list;
info_re.multiSearch(info, match_list);
string name = match_list.back()[1];
DEB_TRACE() << "found match: " << irq << ": " << name;
}
IntStringList::iterator it, end = dev_irqs.end();
bool ok = false;
for (it = dev_irqs.begin(); !ok && (it != end); ++it)
......@@ -720,7 +833,8 @@ void NetDevRxQueueMgr::apply(Task task, int queue, CPUAffinity a)
ok = applyWithSetter(task, *it, a);
if (ok)
return;
DEB_ERROR() << "Could not use setter";
DEB_ERROR() << "Could not use setter: "
<< DEB_VAR4(m_dev, task, queue, a);
did_error_setter[task] = true;
}
}
......@@ -806,12 +920,11 @@ bool NetDevRxQueueMgr::applyWithFile(const string& fname, CPUAffinity a)
{
DEB_MEMBER_FUNCT();
ostringstream os;
os << hex << a.getZeroDefaultMask();
DEB_TRACE() << "writing " << os.str() << " to " << fname;
string s = a.toString(16, true);
DEB_TRACE() << "writing " << s << " to " << fname;
ofstream aff_file(fname.c_str());
if (aff_file)
aff_file << os.str();
aff_file << s;
if (aff_file)
aff_file.close();
bool file_ok(aff_file);
......@@ -828,8 +941,9 @@ bool NetDevRxQueueMgr::applyWithSetter(Task task, const string& irq_queue,
static string desc = getSetterSudoDesc();
SystemCmd setter(AffinitySetterName, desc);
ConstStr task_opt = (task == Irq) ? "-i" : "-r";
const CPUAffinity::Mask& mask = a.getZeroDefaultMask();
setter.args() << task_opt << " " << m_dev << " " << irq_queue << " "
<< hex << "0x" << a.getZeroDefaultMask();
<< CPUAffinity::maskToString(mask, 16, true);
bool setter_ok = (setter.execute() == 0);
DEB_RETURN() << DEB_VAR1(setter_ok);
return setter_ok;
......@@ -880,9 +994,8 @@ static const char *NetDevRxQueueMgrAffinitySetterSrcCList[] = {
"",
"int main(int argc, char *argv[])",
"{",
" char *dev, *irq_queue, *p, fname[256], buffer[128];",
" char *dev, *irq_queue, *p, fname[256];",
" int irq, rps, fd, len, ret;",
" long aff;",
"",
" if (argc != 5)",
" exit(1);",
......@@ -896,11 +1009,6 @@ static const char *NetDevRxQueueMgrAffinitySetterSrcCList[] = {
" dev = argv[2];",
" irq_queue = argv[3];",
"",
" errno = 0;",
" aff = strtol(argv[4], &p, 0);",
" if (errno || *p)",
" exit(3);",
"",
" len = sizeof(fname);",
" if (irq)",
" ret = snprintf(fname, len, \"/proc/irq/%s/smp_affinity\",",
......@@ -909,23 +1017,18 @@ static const char *NetDevRxQueueMgrAffinitySetterSrcCList[] = {
" ret = snprintf(fname, len, \"/sys/class/net/%s/queues/%s/rps_cpus\",",
" dev, irq_queue);",
" if ((ret < 0) || (ret == len))",
" exit(4);",
"",
" len = sizeof(buffer);",
" ret = snprintf(buffer, len, \"%016lx\", aff);",
" if ((ret < 0) || (ret == len))",
" exit(5);",
" exit(3);",
"",
" fd = open(fname, O_WRONLY);",
" if (fd < 0)",
" exit(6);",
" exit(4);",
"",
" for (p = buffer; *p; p += ret)",
" for (p = argv[4]; *p; p += ret)",
" if ((ret = write(fd, p, strlen(p))) < 0)",
" exit(7);",
" exit(5);",
"",
" if (close(fd) < 0)",
" exit(8);",
" exit(6);",
" return 0;",
"}",
};
......@@ -946,7 +1049,7 @@ IntList NetDevRxQueueMgr::getRxQueueList()
StringList out, err;
ethtool.execute(out, err);
RegEx re("^[ \t]*rx_queue_(?P<queue>[0-9]+)_packets:[ \t]+"
RegEx re("^[ \t]*rx(_queue_)?(?P<queue>[0-9]+)_packets:[ \t]+"
"(?P<packets>[0-9]+)\n$");
StringList::iterator it, end = out.end();
for (it = out.begin(); it != end; ++it) {
......@@ -1075,10 +1178,14 @@ void SystemCPUAffinityMgr::WatchDog::childFunction()
do {
Packet packet = readParentCmd();
if (packet.cmd == SetProcAffinity) {
procAffinitySetter(packet.u.proc_affinity);
typedef CPUAffinity::ULongArray ULongArray;
ULongArray& array = packet.u.proc_affinity;
CPUAffinity proc_affinity =
CPUAffinity::fromULongArray(array);
procAffinitySetter(proc_affinity);
} else if (packet.cmd == SetNetDevAffinity) {
NetDevGroupCPUAffinity netdev_affinity =
netDevAffinityEncode(packet);
netDevAffinityDecode(packet);
netDevAffinitySetter(netdev_affinity);
} else if (packet.cmd == CleanUp) {
cleanup_req = true;
......@@ -1103,7 +1210,7 @@ void
SystemCPUAffinityMgr::WatchDog::procAffinitySetter(CPUAffinity cpu_affinity)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(cpu_affinity);
DEB_PARAM() << DEB_VAR2(cpu_affinity, m_other);
static bool aff_set = false;
if (aff_set && (cpu_affinity == m_other))
......@@ -1154,7 +1261,7 @@ StringList SystemCPUAffinityMgr::WatchDog::splitStringList(string str)
}
NetDevGroupCPUAffinity
SystemCPUAffinityMgr::WatchDog::netDevAffinityEncode(const Packet& packet)
SystemCPUAffinityMgr::WatchDog::netDevAffinityDecode(const Packet& packet)
{
DEB_MEMBER_FUNCT();
......@@ -1166,8 +1273,8 @@ SystemCPUAffinityMgr::WatchDog::netDevAffinityEncode(const Packet& packet)
const PacketNetDevQueueAffinity *a = packet_affinity.queue_affinity;
for (unsigned int i = 0; i < nb_queues; ++i, ++a) {
NetDevRxQueueCPUAffinity qa;
qa.irq = a->irq;
qa.processing = a->processing;
qa.irq = CPUAffinity::fromULongArray(a->irq);
qa.processing = CPUAffinity::fromULongArray(a->processing);
NetDevRxQueueAffinityMap::value_type v(a->queue, qa);
queue_affinity.insert(v);
}
......@@ -1222,7 +1329,7 @@ SystemCPUAffinityMgr::WatchDog::setOtherCPUAffinity(CPUAffinity cpu_affinity)
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(cpu_affinity);
Packet packet(SetProcAffinity);
packet.u.proc_affinity = cpu_affinity;
cpu_affinity.toULongArray(packet.u.proc_affinity);
sendChildCmd(packet);
}
......@@ -1246,8 +1353,8 @@ void SystemCPUAffinityMgr::WatchDog::setNetDevCPUAffinity(
PacketNetDevQueueAffinity *a = ndga.queue_affinity;
for (it = queue_affinity.begin(); it != end; ++it, ++a) {
a->queue = it->first;
a->irq = it->second.irq;
a->processing = it->second.processing;
it->second.irq.toULongArray(a->irq);
it->second.processing.toULongArray(a->processing);
}
sendChildCmd(packet);
}
......@@ -1295,9 +1402,10 @@ SystemCPUAffinityMgr::getProcList(Filter filter, CPUAffinity cpu_affinity)
<