SlsDetectorCPUAffinity.h 16.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2011
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################

#ifndef __SLS_DETECTOR_CPU_AFFINITY_H
#define __SLS_DETECTOR_CPU_AFFINITY_H

#include "SlsDetectorDefs.h"

#include "lima/CtControl.h"
#include "lima/SimplePipe.h"

31
32
#include <numeric>

33
34
35
36
37
38
namespace lima 
{

namespace SlsDetector
{

39
40
41
42
class SystemCmd
{
	DEB_CLASS_NAMESPC(DebModCamera, "SystemCmd", "SlsDetector");
 public:
43
	SystemCmd(std::string cmd, std::string desc = "", bool try_sudo = true);
44
45
46
47
48
49
50
51
	SystemCmd(const SystemCmd& o);

	static void setUseSudo(bool use_sudo);
	static bool getUseSudo();

	std::ostream& args()
	{ return m_args; }

52
53
	void setPipes(Pipe *stdin, Pipe *stdout, Pipe *stderr);

54
55
56
57
58
	int execute();

 private:
	void checkSudo();

59
60
61
62
63
	void preparePipes();
	void restorePipes();
	bool sameOutErr()
	{ return (m_stderr == m_stdout); }

64
65
66
67
68
69
	static bool UseSudo;

	std::string m_cmd;
	std::string m_desc;
	bool m_try_sudo;
	std::ostringstream m_args;
70
71
72
	Pipe *m_stdin;
	Pipe *m_stdout;
	Pipe *m_stderr;
73
74
};

Alejandro Homs Puron's avatar
Alejandro Homs Puron committed
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class SystemCmdPipe
{
	DEB_CLASS_NAMESPC(DebModCamera, "SystemCmdPipe", "SlsDetector");
 public:
	enum PipeIdx {
		StdIn, StdOut, StdErr,
	};

	enum PipeType {
		None, DoPipe,
	};

	SystemCmdPipe(std::string cmd, std::string desc = "",
		      bool try_sudo = true);
	~SystemCmdPipe();

	std::ostream& args()
	{ return m_cmd.args(); }

	void start();
95
96
	int wait();
	int wait(StringList& out, StringList& err);
Alejandro Homs Puron's avatar
Alejandro Homs Puron committed
97

98
99
100
101
102
103
	int execute()
	{ start(); return wait(); }
	int execute(StringList& out, StringList& err)
	{ start(); return wait(out, err); }

	
Alejandro Homs Puron's avatar
Alejandro Homs Puron committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
	void setPipe(PipeIdx idx, PipeType type);
	Pipe& getPipe(PipeIdx idx);
 
 private:
	enum {
		NbPipes = StdErr + 1,
	};

	struct PipeData {
		PipeType type;
		AutoPtr<Pipe> ptr;

		PipeData(PipeType t = None) : type(t)
		{
			if (*this)
				ptr = new Pipe();
		}

		operator bool() const
		{ return (type != None); }

		void close(Pipe::EndFd end)
		{
			if (*this)
				ptr->close(end);
		}
	};
			
	typedef std::vector<PipeData> PipeList;

	PipeList m_pipe_list;
	pid_t m_child_pid;
	SystemCmd m_cmd;
};

139

140
141
142
143
class CPUAffinity 
{
	DEB_CLASS_NAMESPC(DebModCamera, "CPUAffinity", "SlsDetector");
 public:
144
145
146
147
148
149
150
151
152
	static constexpr int MaxNbCPUs = NumaSoftBufferAllocMgr::MaxNbCPUs;
	static constexpr int NbULongBits = sizeof(unsigned long) * 8;
	static constexpr int NbULongs = MaxNbCPUs / NbULongBits;

	typedef std::bitset<MaxNbCPUs> Mask;
	typedef unsigned long ULongArray[NbULongs];

	CPUAffinity() {}
	CPUAffinity(const Mask& m) : m_mask(m) {}
153

154
155
156
157
	static int getNbSystemCPUs(bool max_nb = false);

	static int getNbHexDigits(bool max_nb = false)
	{ return getNbSystemCPUs(max_nb) / 4; }
158

159
	static const Mask& allCPUs(bool max_nb = false);
160
161
162

	int getNbCPUs() const
	{ return m_mask.any() ? m_mask.count() : getNbSystemCPUs(); }
163
164
165
166
167

	void initCPUSet(cpu_set_t& cpu_set) const;
	void applyToTask(pid_t task, bool incl_threads = true,
			 bool use_taskset = true) const;

168
169
	const Mask &getMask() const
	{ return m_mask.any() ? m_mask : allCPUs(); }
170

171
172
	const Mask &getZeroDefaultMask() const
	{ return m_mask; }
173

174
	CPUAffinity& operator |=(const CPUAffinity& o);
175
176

	bool isDefault() const
177
	{ return m_mask.none() || (m_mask == allCPUs()); }
178
179
180

	void getNUMANodeMask(std::vector<unsigned long>& node_mask,
			     int& max_node);
181
182
183
184

	static std::string getProcDir(bool local_threads);
	static std::string getTaskProcDir(pid_t task, bool is_thread);

185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
	static void maskToULongArray(const Mask& mask, ULongArray& array);
	static Mask maskFromULongArray(const ULongArray& array);

	static std::string maskToString(const Mask& mask, int base = 16,
					bool comma_sep = false);
	static Mask maskFromString(std::string aff_str, int base = 16);

	void toULongArray(ULongArray& array) const
	{ maskToULongArray(getMask(), array); }

	static CPUAffinity fromULongArray(const ULongArray& array)
	{ return maskFromULongArray(array); }

	std::string toString(int base = 16, bool comma_sep = false) const
	{ return maskToString(getMask(), base, comma_sep); }

	static CPUAffinity fromString(std::string aff_str, int base = 16)
	{ return maskFromString(aff_str, base);	}

204
 private:
205
	static Mask internalMask(Mask m)
206
207
	{ return (m != allCPUs()) ? m : 0; }

208
209
210
	void applyWithTaskset(pid_t task, bool incl_threads) const;
	void applyWithSetAffinity(pid_t task, bool incl_threads) const;

211
212
	static int findNbSystemCPUs();
	static int findMaxNbSystemCPUs();
213

214
	Mask m_mask;
215
216
217
218
219
};

inline
bool operator ==(const CPUAffinity& a, const CPUAffinity& b)
{
220
	CPUAffinity::Mask mask = CPUAffinity::allCPUs();
221
	return (a.getMask() & mask) == (b.getMask() & mask);
222
223
224
225
226
227
228
229
}

inline
bool operator !=(const CPUAffinity& a, const CPUAffinity& b)
{
	return !(a == b);
}

230
231
232
233
234
inline
CPUAffinity operator |(const CPUAffinity& a, const CPUAffinity& b)
{
	if (a.isDefault() || b.isDefault())
		return CPUAffinity();
235
	return CPUAffinity(a.getMask() | b.getMask());
236
237
238
239
240
}

inline
CPUAffinity& CPUAffinity::operator |=(const CPUAffinity& o)
{
241
242
	m_mask |= o.m_mask;
	return *this;
243
244
245
246
}

typedef std::vector<CPUAffinity> CPUAffinityList;

247
248
249
inline CPUAffinity CPUAffinityList_all(const CPUAffinityList& l)
{
	CPUAffinity all;
250
251
252
	if (!l.empty())
		all = std::accumulate(std::next(l.begin()), l.end(), l.front(),
				      std::bit_or<CPUAffinity>());
253
254
255
	return all;
}

256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class IrqMgr
{
	DEB_CLASS_NAMESPC(DebModCamera, "IrqMgr", "SlsDetector");
 public:
	IrqMgr(std::string net_dev = "");
	~IrqMgr();

	void setDev(std::string net_dev);

	IntList getIrqList();
	void updateRxQueueIrqAffinity(bool default_affinity);

 private:
	static bool isManaged(std::string net_dev);

	static void stopIrqBalance();
	static void restoreIrqBalance();

	static bool getIrqBalanceActive();
	static void setIrqBalanceActive(bool act);

	std::string m_net_dev;

	static bool m_irqbalance_stopped;
	static StringList m_dev_list;
};

283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
struct NetDevRxQueueCPUAffinity {
	CPUAffinity irq;
	CPUAffinity processing;

	bool isDefault() const
	{ return irq.isDefault() && processing.isDefault(); }
	CPUAffinity all() const
	{ return irq | processing; }
};

inline
bool operator ==(const NetDevRxQueueCPUAffinity& a,
		 const NetDevRxQueueCPUAffinity& b)
{
	return ((a.irq == b.irq) && (a.processing == b.processing));
}

typedef std::map<int, NetDevRxQueueCPUAffinity> NetDevRxQueueAffinityMap;

302
303
304
305
306
307
308
309
310
311
312
inline
bool NetDevRxQueueAffinityMap_isDefault(const NetDevRxQueueAffinityMap& m)
{
	if (m.empty())
		return true;
	else if (m.size() != 1)
		return false;
	NetDevRxQueueAffinityMap::const_iterator it = m.begin();
	return (it->first == -1) && (it->second.isDefault());
}

313
314
315
316
class NetDevRxQueueMgr
{
	DEB_CLASS_NAMESPC(DebModCamera, "NetDevRxQueueMgr", "SlsDetector");
 public:
317
318
319
	typedef NetDevRxQueueCPUAffinity Affinity;
	typedef NetDevRxQueueAffinityMap AffinityMap;

320
	NetDevRxQueueMgr(std::string dev = "");
321
322
	~NetDevRxQueueMgr();

323
324
	void setDev(std::string dev);

325
326
	void apply(int queue, const Affinity& queue_affinity);
	void apply(const AffinityMap& affinity_map);
327
328
329
330
331
332

	IntList getRxQueueList();

 private:
	void checkDev();

333
334
335
	enum Task {
		Irq, Processing, NbTasks,
	};
336

337
338
339
340
341
342
343
344
345
346
347
348
349
	struct FileSetterData {
		StringList file;
		StringList setter;
	};
			
	void apply(Task task, int queue, CPUAffinity a);
	void getIrqFileSetterData(int queue,
				  FileSetterData& file_setter);
	void getProcessingFileSetterData(int queue,
					 FileSetterData& file_setter);
	bool applyWithFile(const std::string& fname, CPUAffinity a);
	bool applyWithSetter(Task task, const std::string& irq_queue,
			     CPUAffinity a);
350
351
352
353

	static std::string getSetterSudoDesc();

	std::string m_dev;
354
355
	IrqMgr m_irq_mgr;
	AffinityMap m_aff_map;
356

357
358
	static const std::string AffinitySetterName;
	static const StringList AffinitySetterSrc;
359
};
360

361
362
struct NetDevGroupCPUAffinity {
	StringList name_list;
363
364
365
366
	NetDevRxQueueAffinityMap queue_affinity;

	bool isDefault() const;
	CPUAffinity all() const;
367
368
};

369
370
inline bool NetDevGroupCPUAffinity::isDefault() const
{
371
	return NetDevRxQueueAffinityMap_isDefault(queue_affinity);
372
373
374
375
376
377
378
379
380
381
382
383
384
}

inline CPUAffinity NetDevGroupCPUAffinity::all() const
{
	if (isDefault())
		return CPUAffinity();
	CPUAffinityList all_queues;
	NetDevRxQueueAffinityMap::const_iterator it, end = queue_affinity.end();
	for (it = queue_affinity.begin(); it != end; ++it)
		all_queues.push_back(it->second.all());
	return CPUAffinityList_all(all_queues);
}

385
inline 
386
bool operator ==(const NetDevGroupCPUAffinity& a,
387
388
		 const NetDevGroupCPUAffinity& b)
{
389
390
	return ((a.name_list == b.name_list) &&
		(a.queue_affinity == b.queue_affinity));
391
392
393
}

inline 
394
bool operator !=(const NetDevGroupCPUAffinity& a,
395
396
397
398
399
400
401
		 const NetDevGroupCPUAffinity& b)
{
	return !(a == b);
}

typedef std::vector<NetDevGroupCPUAffinity> NetDevGroupCPUAffinityList;

402
403
404
405
406
407
408
409
410
inline CPUAffinity NetDevGroupCPUAffinityList_all(
					const NetDevGroupCPUAffinityList& l)
{
	CPUAffinityList netdev_aff_list;
	NetDevGroupCPUAffinityList::const_iterator it, end = l.end();
	for (it = l.begin(); it != end; ++it)
		netdev_aff_list.push_back(it->all());
	return CPUAffinityList_all(netdev_aff_list);
}
411

412
class SystemCPUAffinityMgr
413
{
414
	DEB_CLASS_NAMESPC(DebModCamera, "SystemCPUAffinityMgr", "SlsDetector");
415
416
417
418
419
 public:
	enum Filter {
		All, MatchAffinity, NoMatchAffinity, ThisProc=0x10,
	};

420
421
	SystemCPUAffinityMgr();
	~SystemCPUAffinityMgr();
422
423

	static ProcList getProcList(Filter filter = All, 
424
				    CPUAffinity cpu_affinity = {});
425
	static ProcList getThreadList(Filter filter = All, 
426
				      CPUAffinity cpu_affinity = {});
427
428

	void setOtherCPUAffinity(CPUAffinity cpu_affinity);
429
430
	void setNetDevCPUAffinity(
			const NetDevGroupCPUAffinityList& netdev_list);
431
432
433
434
435

 private:
	class WatchDog
	{
		DEB_CLASS_NAMESPC(DebModCamera, "WatchDog", 
436
				  "SlsDetector::SystemCPUAffinityMgr");
437
438
439
440
441
442
	public:
		WatchDog();
		~WatchDog();

		bool childEnded();
		void setOtherCPUAffinity(CPUAffinity cpu_affinity);
443
		void setNetDevCPUAffinity(NetDevGroupCPUAffinity netdev_affinity);
444
445
446

	private:
		enum Cmd {
447
			Init, SetProcAffinity, SetNetDevAffinity, CleanUp, Ok,
448
449
		};

450
451
452
453
		enum {
			StringLen=128,
			AffinityMapLen=128,
		};
454
		typedef uint64_t Arg;
455
		typedef char String[StringLen];
456

457
458
		typedef CPUAffinity::ULongArray Mask;

459
460
		struct Packet {
			Cmd cmd;
461
			union Union {
462
				Mask proc_affinity;
463
464
465
466
467
				struct NetDevAffinity {
					String name_list;
					unsigned int queue_affinity_len;
					struct QueueAffinity {
						int queue;
468
469
						Mask irq;
						Mask processing;
470
471
472
473
					} queue_affinity[AffinityMapLen];
				} netdev_affinity;
			} u;

Alejandro Homs Puron's avatar
Alejandro Homs Puron committed
474
475
476
477
478
			Packet(Cmd c=Init)
			{
				memset(this, 0, sizeof(Packet));
				cmd = c;
			}
479
		};
480
481
482
483
484
485
486
		typedef NetDevRxQueueAffinityMap NetDevAffinityMap;
		typedef Packet::Union::NetDevAffinity PacketNetDevAffinity;
		typedef PacketNetDevAffinity::QueueAffinity
						PacketNetDevQueueAffinity;

		typedef std::map<std::string, NetDevRxQueueMgr> NetDevMgrMap;

487
		static void sigTermHandler(int signo);
488
489
		static std::string concatStringList(StringList list);
		static StringList splitStringList(std::string str);
490
491

		void childFunction();
492
		void procAffinitySetter(CPUAffinity cpu_affinity);
493

494
		NetDevGroupCPUAffinity netDevAffinityDecode(
495
							const Packet& packet);
496
		void netDevAffinitySetter(
497
				const NetDevGroupCPUAffinity& netdev_affinity);
498
499
500

		ProcList getOtherProcList(CPUAffinity cpu_affinity);

501
		void sendChildCmd(const Packet& packet);
502
503
504
505
506
507
508
		Packet readParentCmd();
		void ackParentCmd();

		Pipe m_cmd_pipe;
		Pipe m_res_pipe;
		pid_t m_lima_pid;
		pid_t m_child_pid;
509
		CPUAffinity m_other;
510
		NetDevMgrMap m_netdev_mgr_map;
511
512
	};

513
514
515
	void checkWatchDogStart();
	void checkWatchDogStop();

516
	AutoPtr<WatchDog> m_watchdog;
517
518
	CPUAffinity m_other;
	NetDevGroupCPUAffinityList m_netdev;
519
520
};

521
struct RecvCPUAffinity {
522
	CPUAffinityList listeners;
523

524
525
526
	RecvCPUAffinity();
	CPUAffinity all() const;
	RecvCPUAffinity& operator =(CPUAffinity a);
527

528
529
	const CPUAffinityList& Listeners() const
	{ return listeners; }
530
531

	typedef const CPUAffinityList& (RecvCPUAffinity::*Selector)() const;
532
533
};

534

535
536
inline CPUAffinity RecvCPUAffinity::all() const
{
537
	return CPUAffinityList_all(listeners);
538
539
540
}


541
542
543
inline 
bool operator ==(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
544
	return (a.listeners == b.listeners);
545
546
547
548
549
550
551
552
}

inline 
bool operator !=(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
	return !(a == b);
}

553
typedef std::vector<RecvCPUAffinity> RecvCPUAffinityList;
554

555
556
557
558
559
560
561
562
563
inline CPUAffinity RecvCPUAffinityList_all(const RecvCPUAffinityList& l)
{
	CPUAffinityList recv_aff_list;
	RecvCPUAffinityList::const_iterator it, end = l.end();
	for (it = l.begin(); it != end; ++it)
		recv_aff_list.push_back(it->all());
	return CPUAffinityList_all(recv_aff_list);
}

564
565
566
567
568
569
570
571
572
573
574
575
inline CPUAffinity RecvCPUAffinityList_all(const RecvCPUAffinityList& l,
					   RecvCPUAffinity::Selector s)
{
	CPUAffinityList recv_aff_list;
	RecvCPUAffinityList::const_iterator it, end = l.end();
	for (it = l.begin(); it != end; ++it) {
		const CPUAffinityList& l = ((*it).*s)();
		recv_aff_list.push_back(CPUAffinityList_all(l));
	}
	return CPUAffinityList_all(recv_aff_list);
}
					       
576
struct GlobalCPUAffinity {
577
	RecvCPUAffinityList recv;
578
	CPUAffinityList model_threads;
579
580
	CPUAffinity lima;
	CPUAffinity other;
581
	NetDevGroupCPUAffinityList netdev;
582

583
	GlobalCPUAffinity();
584
	CPUAffinity all() const;
585
	void updateRecvAffinity(CPUAffinity a);
586
587
};

588
typedef std::map<PixelDepth, GlobalCPUAffinity> PixelDepthCPUAffinityMap;
589

590
class GlobalCPUAffinityMgr 
591
{
592
	DEB_CLASS_NAMESPC(DebModCamera, "GlobalCPUAffinityMgr", 
593
594
595
596
597
			  "SlsDetector");
 public:
	class ProcessingFinishedEvent
	{
		DEB_CLASS_NAMESPC(DebModCamera, "ProcessingFinishedEvent", 
598
				  "SlsDetector::GlobalCPUAffinityMgr");
599
	public:
600
		ProcessingFinishedEvent(GlobalCPUAffinityMgr *mgr);
601
602
603
604
605
606
607
		~ProcessingFinishedEvent();

		void processingFinished();

		void registerStatusCallback(CtControl *ct_control);

	private:
608
		friend class GlobalCPUAffinityMgr;
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634

		class ImageStatusCallback : 
		public CtControl::ImageStatusCallback
			{
			public:
			ImageStatusCallback(
				ProcessingFinishedEvent *proc_finished)
				: m_proc_finished(proc_finished) 
				{}
			protected:
				virtual void imageStatusChanged(
					const CtControl::ImageStatus& status)
				{ m_proc_finished->imageStatusChanged(status); }
			private:
				ProcessingFinishedEvent *m_proc_finished;
			};

		void prepareAcq();
		void stopAcq();

		void limitUpdateRate();
		void updateLastCallbackTimestamp();
		Timestamp getLastCallbackTimestamp();

		void imageStatusChanged(const CtControl::ImageStatus& status);

635
		GlobalCPUAffinityMgr *m_mgr;
636
637
638
639
640
641
642
643
644
		ImageStatusCallback m_cb;
		CtControl *m_ct;
		int m_nb_frames;
		bool m_cnt_act;
		bool m_saving_act;
		bool m_stopped;
		Timestamp m_last_cb_ts;
	};

645
646
	GlobalCPUAffinityMgr(Camera *cam = NULL);
	~GlobalCPUAffinityMgr();
647

648
	void applyAndSet(const GlobalCPUAffinity& o);
649
650
651
652
653
654
655
656
657
658
	void updateRecvRestart();

	ProcessingFinishedEvent *getProcessingFinishedEvent();

	void prepareAcq();
	void startAcq();
	void stopAcq();
	void recvFinished();
	void limaFinished();
	void waitLimaFinished();
659
	void cleanUp();
660
661
662
663
664
665
666
667
668

 private:
	friend class ProcessingFinishedEvent;

	enum State {
		Ready, Acquiring, Changing, Processing, Restoring,
	};

	void setLimaAffinity(CPUAffinity lima_affinity);
669
	void setRecvAffinity(const RecvCPUAffinityList& recv_affinity_list);
670
	void setModelAffinity(const CPUAffinityList& model_affinity_list);
671
672
673
674
675
676

	AutoMutex lock()
	{ return AutoMutex(m_cond.mutex()); }

	Camera *m_cam;
	ProcList m_lima_tids;
677
678
679
	GlobalCPUAffinity m_curr;
	GlobalCPUAffinity m_set;
	AutoPtr<SystemCPUAffinityMgr> m_system_mgr;
680
681
682
683
684
685
686
	Cond m_cond;
	State m_state;
	ProcessingFinishedEvent *m_proc_finished;
	double m_lima_finished_timeout;
};

std::ostream& operator <<(std::ostream& os, const CPUAffinity& a);
687
688
689
std::ostream& operator <<(std::ostream& os, const CPUAffinityList& l);
std::ostream& operator <<(std::ostream& os, const NetDevRxQueueCPUAffinity& a);
std::ostream& operator <<(std::ostream& os, const NetDevGroupCPUAffinity& a);
690
std::ostream& operator <<(std::ostream& os, const RecvCPUAffinity& a);
691
std::ostream& operator <<(std::ostream& os, const RecvCPUAffinityList& l);
692
std::ostream& operator <<(std::ostream& os, const GlobalCPUAffinity& a);
693
694
695
696
697
698
699
700
701
std::ostream& operator <<(std::ostream& os, const PixelDepthCPUAffinityMap& m);

} // namespace SlsDetector

} // namespace lima



#endif // __SLS_DETECTOR_CPU_AFFINITY_H