Commit fb340515 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Add NetDevRxQueueAffinity with independent per-queue irq & processing:

* Move NetDev management to NetDevRxQueueMgr
* Separate one RecvCPUAffinity per recv, move setter code to Receiver
* Use Pipe dupInto/restoreDup for redirecting I/O in SystemCmd
parent 0b9c0908
......@@ -38,8 +38,7 @@ class SystemCmd
{
DEB_CLASS_NAMESPC(DebModCamera, "SystemCmd", "SlsDetector");
public:
SystemCmd(std::string cmd, std::string desc = "",
bool try_sudo = true, bool can_hide_out = true);
SystemCmd(std::string cmd, std::string desc = "", bool try_sudo = true);
SystemCmd(const SystemCmd& o);
static void setUseSudo(bool use_sudo);
......@@ -48,18 +47,27 @@ class SystemCmd
std::ostream& args()
{ return m_args; }
void setPipes(Pipe *stdin, Pipe *stdout, Pipe *stderr);
int execute();
private:
void checkSudo();
void preparePipes();
void restorePipes();
bool sameOutErr()
{ return (m_stderr == m_stdout); }
static bool UseSudo;
std::string m_cmd;
std::string m_desc;
bool m_try_sudo;
bool m_can_hide_out;
std::ostringstream m_args;
Pipe *m_stdin;
Pipe *m_stdout;
Pipe *m_stderr;
};
class CPUAffinity
......@@ -83,8 +91,6 @@ class CPUAffinity
void initCPUSet(cpu_set_t& cpu_set) const;
void applyToTask(pid_t task, bool incl_threads = true,
bool use_taskset = true) const;
void applyToNetDev(std::string dev) const;
void applyToNetDevGroup(StringList dev_list) const;
uint64_t getMask() const
{ return m_mask.any() ? m_mask.to_ulong() : allCPUs(); }
......@@ -112,16 +118,9 @@ class CPUAffinity
void applyWithTaskset(pid_t task, bool incl_threads) const;
void applyWithSetAffinity(pid_t task, bool incl_threads) const;
bool applyWithNetDevFile(const std::string& fname) const;
bool applyWithNetDevSetter(const std::string& dev,
const std::string& queue) const;
static int findNbSystemCPUs();
static int findMaxNbSystemCPUs();
static std::string getNetDevSetterSudoDesc();
static const std::string NetDevSetQueueRpsName;
static const StringList NetDevSetQueueRpsSrc;
std::bitset<64> m_mask;
};
......@@ -155,21 +154,103 @@ CPUAffinity& CPUAffinity::operator |=(const CPUAffinity& o)
typedef std::vector<CPUAffinity> CPUAffinityList;
inline CPUAffinity CPUAffinityList_all(const CPUAffinityList& l)
{
CPUAffinity all;
if (l.size() > 0) {
CPUAffinityList::const_iterator it, end = l.end();
for (all = *(it = l.begin())++; it != end; ++it)
all |= *it;
}
return all;
}
struct NetDevRxQueueCPUAffinity {
CPUAffinity irq;
CPUAffinity processing;
bool isDefault() const
{ return irq.isDefault() && processing.isDefault(); }
CPUAffinity all() const
{ return irq | processing; }
};
inline
bool operator ==(const NetDevRxQueueCPUAffinity& a,
const NetDevRxQueueCPUAffinity& b)
{
return ((a.irq == b.irq) && (a.processing == b.processing));
}
typedef std::map<int, NetDevRxQueueCPUAffinity> NetDevRxQueueAffinityMap;
class NetDevRxQueueMgr
{
DEB_CLASS_NAMESPC(DebModCamera, "NetDevRxQueueMgr", "SlsDetector");
public:
NetDevRxQueueMgr(std::string dev = "");
void setDev(std::string dev);
void apply(int queue, const NetDevRxQueueCPUAffinity& queue_affinity);
IntList getRxQueueList();
private:
void checkDev();
void applyProcessing(int queue, CPUAffinity a);
bool applyProcessingWithFile(const std::string& fname, CPUAffinity a);
bool applyProcessingWithSetter(const std::string& queue, CPUAffinity a);
void applyIrq(int queue, CPUAffinity a);
static std::string getSetterSudoDesc();
std::string m_dev;
static const std::string SetRpsName;
static const StringList SetRpsSrc;
};
struct NetDevGroupCPUAffinity {
StringList name_list;
CPUAffinity processing;
NetDevRxQueueAffinityMap queue_affinity;
bool isDefault() const;
CPUAffinity all() const;
};
inline bool NetDevGroupCPUAffinity::isDefault() const
{
if (queue_affinity.empty())
return true;
else if (queue_affinity.size() != 1)
return false;
NetDevRxQueueAffinityMap::const_iterator it = queue_affinity.begin();
return (it->first == -1) && (it->second.isDefault());
}
inline CPUAffinity NetDevGroupCPUAffinity::all() const
{
if (isDefault())
return CPUAffinity();
CPUAffinityList all_queues;
NetDevRxQueueAffinityMap::const_iterator it, end = queue_affinity.end();
for (it = queue_affinity.begin(); it != end; ++it)
all_queues.push_back(it->second.all());
return CPUAffinityList_all(all_queues);
}
inline
bool operator ==(const NetDevGroupCPUAffinity& a,
bool operator ==(const NetDevGroupCPUAffinity& a,
const NetDevGroupCPUAffinity& b)
{
return ((a.name_list == b.name_list) && (a.processing == b.processing));
return ((a.name_list == b.name_list) &&
(a.queue_affinity == b.queue_affinity));
}
inline
bool operator !=(const NetDevGroupCPUAffinity& a,
bool operator !=(const NetDevGroupCPUAffinity& a,
const NetDevGroupCPUAffinity& b)
{
return !(a == b);
......@@ -216,28 +297,53 @@ class SystemCPUAffinityMgr
Init, SetProcAffinity, SetNetDevAffinity, CleanUp, Ok,
};
enum {
StringLen=128,
AffinityMapLen=128,
};
typedef uint64_t Arg;
typedef char String[StringLen];
struct Packet {
typedef char String[128];
Cmd cmd;
Arg arg;
String str;
union Union {
uint64_t proc_affinity;
struct NetDevAffinity {
String name_list;
unsigned int queue_affinity_len;
struct QueueAffinity {
int queue;
uint64_t irq;
uint64_t processing;
} queue_affinity[AffinityMapLen];
} netdev_affinity;
} u;
Packet(Cmd c=Init) : cmd(c)
{ memset(&u, 0, sizeof(u)); }
};
typedef NetDevRxQueueAffinityMap NetDevAffinityMap;
typedef Packet::Union::NetDevAffinity PacketNetDevAffinity;
typedef PacketNetDevAffinity::QueueAffinity
PacketNetDevQueueAffinity;
typedef std::map<std::string, NetDevRxQueueMgr> NetDevMgrMap;
static void sigTermHandler(int signo);
static std::string concatStringList(StringList list);
static StringList splitStringList(std::string str);
void childFunction();
void procAffinitySetter(CPUAffinity cpu_affinity);
NetDevGroupCPUAffinity netDevAffinityEncode(
const Packet& packet);
void netDevAffinitySetter(
NetDevGroupCPUAffinity netdev_affinity);
const NetDevGroupCPUAffinity& netdev_affinity);
ProcList getOtherProcList(CPUAffinity cpu_affinity);
void sendChildCmd(Cmd cmd, Arg arg = 0, std::string str = "");
void sendChildCmd(const Packet& packet);
Packet readParentCmd();
void ackParentCmd();
......@@ -246,7 +352,7 @@ class SystemCPUAffinityMgr
pid_t m_lima_pid;
pid_t m_child_pid;
CPUAffinity m_other;
StringList m_netdev_list;
NetDevMgrMap m_netdev_mgr_map;
};
void checkWatchDogStart();
......@@ -267,6 +373,13 @@ struct RecvCPUAffinity {
RecvCPUAffinity& operator =(CPUAffinity a);
};
inline CPUAffinity RecvCPUAffinity::all() const
{
return (CPUAffinityList_all(listeners) | CPUAffinityList_all(writers) |
CPUAffinityList_all(port_threads));
}
inline
bool operator ==(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
......@@ -279,13 +392,16 @@ bool operator !=(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
return !(a == b);
}
typedef std::vector<RecvCPUAffinity> RecvCPUAffinityList;
struct GlobalCPUAffinity {
RecvCPUAffinity recv;
RecvCPUAffinityList recv;
CPUAffinity lima;
CPUAffinity other;
NetDevGroupCPUAffinityList netdev;
CPUAffinity all() const;
void updateRecvAffinity(CPUAffinity a);
};
typedef std::map<PixelDepth, GlobalCPUAffinity> PixelDepthCPUAffinityMap;
......@@ -369,7 +485,7 @@ class GlobalCPUAffinityMgr
};
void setLimaAffinity(CPUAffinity lima_affinity);
void setRecvAffinity(const RecvCPUAffinity& recv_affinity);
void setRecvAffinity(const RecvCPUAffinityList& recv_affinity_list);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
......@@ -386,8 +502,11 @@ class GlobalCPUAffinityMgr
};
std::ostream& operator <<(std::ostream& os, const CPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const CPUAffinityList& a);
std::ostream& operator <<(std::ostream& os, const CPUAffinityList& l);
std::ostream& operator <<(std::ostream& os, const NetDevRxQueueCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const NetDevGroupCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const RecvCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const RecvCPUAffinityList& l);
std::ostream& operator <<(std::ostream& os, const GlobalCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const PixelDepthCPUAffinityMap& m);
......
......@@ -228,7 +228,7 @@ private:
void updateImageSize();
void updateTimeRanges();
void updateCPUAffinity(bool recv_restarted);
void setRecvCPUAffinity(const RecvCPUAffinity& recv_affinity);
void setRecvCPUAffinity(const RecvCPUAffinityList& recv_affinity_list);
static int64_t NSec(double x)
{ return int64_t(x * 1e9); }
......
......@@ -261,6 +261,7 @@ typedef std::set<int> SortedIntList;
typedef std::vector<FrameType> FrameArray;
typedef IntList ProcList;
std::ostream& operator <<(std::ostream& os, const StringList& l);
std::ostream& operator <<(std::ostream& os, const SortedIntList& l);
std::ostream& operator <<(std::ostream& os, const FrameArray& a);
......@@ -324,6 +325,7 @@ class NumericGlob
{ return m_glob.getNbEntries(); }
IntStringList getIntPathList() const;
IntStringList getIntSubPathList(int idx) const;
private:
int m_nb_idx;
......
......@@ -24,7 +24,7 @@
#define __SLS_DETECTOR_RECEIVER_H
#include "SlsDetectorModel.h"
#include "SlsDetectorCPUAffinity.h"
#include "slsReceiverUsers.h"
namespace lima
......@@ -48,6 +48,8 @@ public:
void prepareAcq();
void setCPUAffinity(const RecvCPUAffinity& recv_affinity);
private:
friend class Camera;
......@@ -163,6 +165,11 @@ private:
void portCallback(FrameType frame, int port, char *dptr,
uint32_t dsize);
void getNodeMaskList(const CPUAffinityList& listener,
const CPUAffinityList& writer,
slsReceiverUsers::NodeMaskList& fifo_node_mask,
int& max_node);
Camera *m_cam;
int m_idx;
int m_rx_port;
......
......@@ -51,6 +51,36 @@ using namespace lima::SlsDetector;
};
// namespace SlsDetector
// {
// typedef std::vector<RecvCPUAffinity> RecvCPUAffinityList;
// };
%MappedType SlsDetector::RecvCPUAffinityList
{
%TypeHeaderCode
#include "SlsDetectorCPUAffinity.h"
#include "sipAPIlimaslsdetector.h"
#include "SlsDetectorSip.h"
using namespace lima::SlsDetector;
%End
%ConvertToTypeCode
typedef SipSequence<RecvCPUAffinityList> Seq;
Seq seq(sipType_SlsDetector_RecvCPUAffinity);
return seq.convertToTypeCode(sipPy, sipCppPtr, sipIsErr,
sipTransferObj);
%End
%ConvertFromTypeCode
typedef SipSequence<RecvCPUAffinityList> Seq;
Seq seq(sipType_SlsDetector_RecvCPUAffinity);
return seq.convertFromTypeCode(sipCpp);
%End
};
// namespace SlsDetector
// {
// typedef std::vector<SlsDetector::NetDevGroupCPUAffinity>
......@@ -82,6 +112,36 @@ using namespace lima::SlsDetector;
};
// namespace SlsDetector
// {
// typedef std::map<int, NetDevRxQueueCPUAffinity> NetDevRxQueueAffinityMap;
// };
%MappedType SlsDetector::NetDevRxQueueAffinityMap
{
%TypeHeaderCode
#include "SlsDetectorCPUAffinity.h"
#include "sipAPIlimaslsdetector.h"
#include "SlsDetectorSip.h"
using namespace lima::SlsDetector;
%End
%ConvertToTypeCode
typedef SipMap<NetDevRxQueueAffinityMap> Map;
Map map(NULL, sipType_SlsDetector_NetDevRxQueueCPUAffinity);
return map.convertToTypeCode(sipPy, sipCppPtr, sipIsErr,
sipTransferObj);
%End
%ConvertFromTypeCode
typedef SipMap<NetDevRxQueueAffinityMap> Map;
Map map(NULL, sipType_SlsDetector_NetDevRxQueueCPUAffinity);
return map.convertFromTypeCode(sipCpp);
%End
};
// namespace SlsDetector
// {
// typedef std::map<PixelDepth, GlobalCPUAffinity> PixelDepthCPUAffinityMap;
......@@ -118,6 +178,7 @@ SipTypeIntImpl(PixelDepth);
%End
};
namespace SlsDetector
{
......@@ -128,8 +189,7 @@ namespace SlsDetector
class SystemCmd
{
public:
SystemCmd(std::string cmd, std::string desc = "",
bool try_sudo = true, bool can_hide_out = true);
SystemCmd(std::string cmd, std::string desc = "", bool try_sudo = true);
SystemCmd(const SlsDetector::SystemCmd& o);
static void setUseSudo(bool use_sudo);
......@@ -159,7 +219,6 @@ public:
//void initCPUSet(cpu_set_t& cpu_set) const;
void applyToTask(int task, bool incl_threads = true,
bool use_taskset = true) const;
void applyToNetDev(std::string dev) const;
unsigned long getMask() const;
unsigned long getZeroDefaultMask() const;
......@@ -179,9 +238,34 @@ public:
// typedef std::vector<CPUAffinity> CPUAffinityList;
struct NetDevRxQueueCPUAffinity {
SlsDetector::CPUAffinity irq;
SlsDetector::CPUAffinity processing;
bool isDefault() const;
SlsDetector::CPUAffinity all() const;
};
// typedef std::map<int, NetDevRxQueueCPUAffinity> NetDevRxQueueAffinityMap;
class NetDevRxQueueMgr
{
public:
NetDevRxQueueMgr(std::string dev = "");
void setDev(std::string dev);
void apply(int queue,
const SlsDetector::NetDevRxQueueCPUAffinity& queue_affinity);
std::vector<int> getRxQueueList();
};
struct NetDevGroupCPUAffinity {
std::vector<std::string> name_list;
SlsDetector::CPUAffinity processing;
SlsDetector::NetDevRxQueueAffinityMap queue_affinity;
bool isDefault() const;
SlsDetector::CPUAffinity all() const;
};
// typedef std::vector<SlsDetector::NetDevGroupCPUAffinity>
......@@ -222,11 +306,14 @@ struct RecvCPUAffinity {
// RecvCPUAffinity& operator =(CPUAffinity a);
};
// typedef std::vector<RecvCPUAffinity> RecvCPUAffinityList;
struct GlobalCPUAffinity {
SlsDetector::RecvCPUAffinity recv;
SlsDetector::RecvCPUAffinityList recv;
SlsDetector::CPUAffinity lima;
SlsDetector::CPUAffinity other;
SlsDetector::NetDevGroupCPUAffinityList netdev;
SlsDetector::CPUAffinity all() const;
};
// typedef std::map<SlsDetector::PixelDepth, SlsDetector::GlobalCPUAffinity>
......
This diff is collapsed.
......@@ -619,69 +619,21 @@ void Camera::updateCPUAffinity(bool recv_restarted)
m_global_cpu_affinity_mgr.applyAndSet(global_affinity);
}
void Camera::setRecvCPUAffinity(const RecvCPUAffinity& recv_affinity)
void Camera::setRecvCPUAffinity(const RecvCPUAffinityList& recv_affinity_list)
{
DEB_MEMBER_FUNCT();
unsigned int nb_recv = m_recv_list.size();
unsigned int nb_aff = recv_affinity_list.size();
DEB_PARAM() << DEB_VAR2(nb_recv, nb_aff);
if (nb_aff != nb_recv)
THROW_HW_ERROR(InvalidValue) << "invalid affinity list size: "
<< DEB_VAR2(nb_recv, nb_aff);
CPUAffinityList::const_iterator lit = recv_affinity.listeners.begin();
CPUAffinityList::const_iterator wit = recv_affinity.writers.begin();
RecvCPUAffinityList::const_iterator ait = recv_affinity_list.begin();
RecvList::iterator it, end = m_recv_list.end();
for (it = m_recv_list.begin(); it != end; ++it) {
for (it = m_recv_list.begin(); it != end; ++it, ++ait) {
Receiver *recv = *it;
slsReceiverUsers::CPUMaskList list_cpu_mask;
slsReceiverUsers::CPUMaskList writ_cpu_mask;
slsReceiverUsers::NodeMaskList fifo_node_mask;
int max_node;
string deb_head;
if (DEB_CHECK_ANY(DebTypeTrace) ||
DEB_CHECK_ANY(DebTypeWarning)) {
ostringstream os;
os << "setting recv " << recv->m_idx << " ";
deb_head = os.str();
}
for (int i = 0; i < m_recv_nb_ports; ++i, ++lit, ++wit) {
cpu_set_t cpu_set;
const CPUAffinity& listener = *lit;
DEB_TRACE() << deb_head << "listener " << i << " "
<< "CPU mask to " << listener;
listener.initCPUSet(cpu_set);
list_cpu_mask.push_back(cpu_set);
const CPUAffinity& writer = *wit;
DEB_TRACE() << deb_head << "writer " << i << " "
<< "CPU mask to " << writer;
writer.initCPUSet(cpu_set);
writ_cpu_mask.push_back(cpu_set);
CPUAffinity both = listener | writer;
vector<unsigned long> mlist;
both.getNUMANodeMask(mlist, max_node);
if (mlist.size() != 1)
THROW_HW_ERROR(Error) << DEB_VAR1(mlist.size());
unsigned long& nmask = mlist[0];
DEB_TRACE() << deb_head << "Fifo " << i << " "
<< "NUMA node mask mask to "
<< DEB_HEX(nmask);
int c = bitset<64>(nmask).count();
if (c != 1)
DEB_WARNING() << deb_head << "Fifo " << i << " "
<< "NUMA node mask has "
<< c << " nodes";
fifo_node_mask.push_back(nmask);
}
slsReceiverUsers *recv_users = recv->m_recv;
recv_users->setThreadCPUAffinity(list_cpu_mask, writ_cpu_mask);
recv_users->setFifoNodeAffinity(fifo_node_mask, max_node);
}
CPUAffinityList::const_iterator tit;
tit = recv_affinity.port_threads.begin();
RecvPortList port_list = getRecvPortList();
RecvPortList::iterator pit, pend = port_list.end();
for (pit = port_list.begin(); pit != pend; ++pit, ++tit) {
pid_t tid = (*pit)->getThreadID();
(*tit).applyToTask(tid, false);
recv->setCPUAffinity(*ait);
}
}
......
......@@ -303,6 +303,9 @@ StringList Glob::split(string path)
StringList Glob::getSubPathList(int idx) const
{
if (idx == -1)
return getPathList();
StringList sub_path_list;
StringList::const_iterator it, end = m_found_list.end();
for (it = m_found_list.begin(); it != end; ++it)
......@@ -330,20 +333,26 @@ NumericGlob::NumericGlob(string pattern_prefix, string pattern_suffix)
}
NumericGlob::IntStringList NumericGlob::getIntPathList() const
{
DEB_MEMBER_FUNCT();
return getIntSubPathList(-1);
}
NumericGlob::IntStringList NumericGlob::getIntSubPathList(int idx) const
{
DEB_MEMBER_FUNCT();
IntStringList list;
StringList path_list = m_glob.getPathList();
StringList::const_iterator pit = path_list.begin();
StringList ret_path_list = m_glob.getSubPathList(idx);
StringList::const_iterator rit = ret_path_list.begin();
StringList sub_path_list = m_glob.getSubPathList(m_nb_idx);
StringList::const_iterator it, end = sub_path_list.end();
for (it = sub_path_list.begin(); it != end; ++it, ++pit) {
for (it = sub_path_list.begin(); it != end; ++it, ++rit) {
size_t l = (*it).size() - m_prefix_len - m_suffix_len;
string s = (*it).substr(m_prefix_len, l);
int nb;
istringstream(s) >> nb;
DEB_TRACE() << DEB_VAR2(nb, *pit);