Commit c0d612e3 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Tango: add full PixelDepthCPUAffinityMap to/from string conversion

* pixel_depth_cpu_affinity_map property/attr are now Python strings
* Remove netdev_groups property/attr
parent b97d3bf2
......@@ -1743,50 +1743,54 @@ local database:
Add LimaCCDs and SlsDetector class devices.
+----------------------------------------------------------+-------------------------------------------+
| LimaCCDs/eiger500k/DEVICE/LimaCCDs | id10/limaccds/eiger500k |
+----------------------------------------------------------+-------------------------------------------+
| id10/limaccds/eiger500k->LimaCameraType | SlsDetector |
+----------------------------------------------------------+-------------------------------------------+
| id10/limaccds/eiger500k->NbProcessingThread | 23 |
+----------------------------------------------------------+-------------------------------------------+
| id10/limaccds/eiger500k->BufferMaxMemory | 40 |
+----------------------------------------------------------+-------------------------------------------+
| LimaCCDs/eiger500k/DEVICE/SlsDetector | id10/slsdetector/eiger500k |
+----------------------------------------------------------+-------------------------------------------+
| id10/slsdetector/eiger500k->config_fname | /users/opid00/eiger/eiger_v3.1.1/config/ |
| | beb-021-020-direct-FO-10g.config |
+----------------------------------------------------------+-------------------------------------------+
| id10/slsdetector/eiger500k->netdev_groups | | eth0,eth1,eth2,eth4,eth6,eth7,eth8,eth9 |
| | | eth3 |
| | | eth5 |
+----------------------------------------------------------+-------------------------------------------+
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | 4,0x0006c0,0x6c0000,0x03f03e,0x000001, |
| | 0x000001,0x100100,0x800800 |
| | | 8,0x0006c0,0x6c0000,0x03f03e,0x000001, |
| | 0x000001,0x100100,0x800800 |
| | | 16,0xffffff,0xffffff,0xffffff,0xffffff, |
| | 0xffffff,0xffffff,0xffffff |
| | | 32,0xffffff,0xffffff,0xffffff,0xffffff, |
| | 0xffffff,0xffffff,0xffffff |
+----------------------------------------------------------+-------------------------------------------+
+----------------------------------------------------------+-----------------------------------------------+
| LimaCCDs/eiger500k/DEVICE/LimaCCDs | id10/limaccds/eiger500k |
+----------------------------------------------------------+-----------------------------------------------+
| id10/limaccds/eiger500k->LimaCameraType | SlsDetector |
+----------------------------------------------------------+-----------------------------------------------+
| id10/limaccds/eiger500k->NbProcessingThread | 23 |
+----------------------------------------------------------+-----------------------------------------------+
| id10/limaccds/eiger500k->BufferMaxMemory | 30 |
+----------------------------------------------------------+-----------------------------------------------+
| LimaCCDs/eiger500k/DEVICE/SlsDetector | id10/slsdetector/eiger500k |
+----------------------------------------------------------+-----------------------------------------------+
| id10/slsdetector/eiger500k->config_fname | /users/opid00/eiger/eiger_v3.1.1/config/ |
| | beb-021-020-direct-FO-10g.config |
+----------------------------------------------------------+-----------------------------------------------+
| id10/slsdetector/eiger500k->pixel_depth_cpu_affinity_map | | { 4: ((((CPU( 6), CPU(18), CPU( 8, 20)), |
| | (CPU( 7), CPU(19), CPU( 8, 20))), |
| | | ((CPU( 9), CPU(21), CPU(11, 23)), |
| | (CPU(10), CPU(22), CPU(11, 23)))), |
| | | CPU(*(range( 1, 6) + |
| | range(12, 18))), |
| | | CPU(0), |
| | | (('eth0,eth1,eth2,eth4,eth6,eth7, |
| | eth8,eth9', |
| | {-1: (CPU(0), CPU(0))}), |
| | | ('eth3', |
| | {-1: (CPU( 8, 20), |
| | CPU( 8, 20))}), |
| | | ('eth5', |
| | {-1: (CPU(11, 23), |
| | CPU(11, 23))}))), |
| | | 8: '@4', |
| | | 16: '@4', |
| | | 32: '@4'} |
+----------------------------------------------------------+-----------------------------------------------+
.. note:: in order to perform high frame rate acquisitions, the CPU affinity must be fixed for
the following tasks:
* Receiver listeners
* Receiver writers
* Receivers ports (2x2=4): listeners, writers, port_threads
* Lima processing threads
* OS processes
* Net-dev group #0 packet dispatching
* Net-dev group #1 packet dispatching
* ...
* Net-dev group packet dispatching for Rx queues: irq & processing
The previous example is based on a dual 6-core CPUs backend with *Hyper-Threading Technology* (12 cores,
24 threads). After the data acquisition finishes the Lima processing threads will run also on the CPUs
assigned to listeners and writers (0xfffffe), that is 23 cores in total, which is used for setting the
NbProcessingThreads. Please note that there are three network groups and four pixel_depth->cpu_affinity
settings (4-, 8-, 16- and 32-bit), each one represented by a line in a multi-line string array.
settings (4-, 8-, 16- and 32-bit). The special global_affinity '@X' is a reference to pixel_depth X.
.. note:: The Intel 10 Gigabit Ethernet Server Adapter has multiple hardware FIFOs per port, called
queues in the OS terminology. The hardware uses a hash algorithm to dispatch packets into the active
......
......@@ -47,6 +47,7 @@ from collections import OrderedDict
from functools import partial, reduce
from itertools import chain
from multiprocessing import Process
import re
from Lima import Core
from Lima import SlsDetector as SlsDetectorHw
......@@ -123,13 +124,11 @@ class SlsDetector(PyTango.Device_4Impl):
self.model.setThresholdEnergy(self.threshold_energy)
self.cam.setTolerateLostPackets(self.tolerate_lost_packets)
self.netdev_groups = [g.split(',') for g in self.netdev_groups]
aff_array = self.pixel_depth_cpu_affinity_map
if aff_array:
flat_array = ','.join(aff_array).split(',')
aff_array_map = map(partial(int, base=0), flat_array)
aff_array = np.array(list(aff_array_map))
aff_map = self.getPixelDepthCPUAffinityMapFromArray(aff_array)
aff_arr = self.pixel_depth_cpu_affinity_map
if aff_arr:
aff_str = ' '.join(aff_arr)
aff_map = self.getPixelDepthCPUAffinityMapFromString(aff_str)
self.printPixelDepthCPUAffinityMap(aff_map)
self.cam.setPixelDepthCPUAffinityMap(aff_map)
def init_list_attr(self):
......@@ -340,125 +339,145 @@ class SlsDetector(PyTango.Device_4Impl):
deb.Return("stat_data=%s" % stat_data)
return stat_data
def getCPUAffinityLen(self):
return 5 + len(self.netdev_groups);
@Core.DEB_MEMBER_FUNCT
def getArrayFromPixelDepthCPUAffinityMap(self, aff_map):
nb_pixel_depth = len(aff_map)
aff_len = self.getCPUAffinityLen()
aff_array = np.zeros((nb_pixel_depth, aff_len), 'int')
for i, (pixel_depth, global_affinity) in enumerate(aff_map.items()):
aff_array[i][:5] = (pixel_depth,
global_affinity.recv.listeners,
global_affinity.recv.writers,
global_affinity.lima,
global_affinity.other)
for ng_aff in global_affinity.netdev:
j = self.netdev_groups.index(ng_aff.name_list)
aff_array[i][5 + j] = ng_aff.processing
return aff_array
@Core.DEB_MEMBER_FUNCT
def getPixelDepthCPUAffinityMapFromArray(self, aff_array):
aff_len = self.getCPUAffinityLen()
err = ValueError("Invalid pixel_depth_cpu_affinity_map: "
"must be a list of %d-value tuples" % aff_len)
if len(aff_array.shape) == 1:
if len(aff_array) % aff_len != 0:
raise err
aff_array.resize((int(len(aff_array) / aff_len), aff_len))
if aff_array.shape[1] != aff_len:
raise err
aff_map = {}
def getPixelDepthCPUAffinityMapFromString(self, aff_str):
CPUAffinity = SlsDetectorHw.CPUAffinity
RecvCPUAffinity = SlsDetectorHw.RecvCPUAffinity
NetDevRxQueueCPUAffinity = SlsDetectorHw.NetDevRxQueueCPUAffinity
NetDevGroupCPUAffinity = SlsDetectorHw.NetDevGroupCPUAffinity
GlobalCPUAffinity = SlsDetectorHw.GlobalCPUAffinity
for aff_data in aff_array:
aff_data = list(map(int, aff_data))
pixel_depth, recv_l, recv_w, lima, other = aff_data[:5]
netdev_aff = aff_data[5:]
all_cpus = range(CPUAffinity.getNbSystemCPUs())
recv_lw = [[6, 7], [9, 10]]
indep_lw = True
if indep_lw:
recv_l = recv_lw
recv_w = [list(map(lambda x: x + 12, l)) for l in recv_l]
else:
recv_l = [[(x, x + 12) for x in r] for r in recv_lw]
recv_w = recv_l
recv_pt = [(8, 20), (11, 23)]
recv_pt = zip(*([recv_pt] * 2))
lima = list(range(6))
lima += map(lambda x: x + 12, list(lima))
lima.remove(0)
other = [0]
irq_aff = [0, (8, 20), (11, 23)]
proc_aff = irq_aff
def Affinity(*x):
if type(x[0]) in [tuple, list]:
x = list(chain(*x))
m = reduce(lambda a, b: a | b, map(lambda a: 1 << a, x))
return CPUAffinity(m)
global_affinity = GlobalCPUAffinity()
Mask = CPUAffinity
def CPU(*x):
if type(x[0]) in [tuple, list]:
x = list(chain(*x))
m = reduce(lambda a, b: a | b, map(lambda a: 1 << a, x))
return Mask(m)
aff_map_raw = eval(aff_str)
self.expandPixelDepthRefs(aff_map_raw)
aff_map = {}
for pixel_depth, aff_data in aff_map_raw.items():
recv_aff, lima, other, netdev_aff = aff_data
global_aff = GlobalCPUAffinity()
recv_list = []
for l, w, pt in zip(recv_l, recv_w, recv_pt):
for recv_ports in recv_aff:
recv = RecvCPUAffinity()
recv.listeners = list(map(Affinity, l))
recv.writers = list(map(Affinity, w))
recv.port_threads = list(map(Affinity, pt))
l, w, pt = zip(*recv_ports)
recv.listeners = list(l)
recv.writers = list(w)
recv.port_threads = list(pt)
recv_list.append(recv)
global_affinity.recv = recv_list
for i, r in enumerate(global_affinity.recv):
s = "Recv[%d]:" % i
def A(x):
return hex(NumAffinity(x))
s += " listeners=%s," % [A(x) for x in r.listeners]
s += " writers=%s," % [A(x) for x in r.writers]
s += " port_threads=%s" % [A(x) for x in r.port_threads]
deb.Always(s)
global_affinity.lima = Affinity(*lima)
global_affinity.other = Affinity(*other)
global_aff.recv = recv_list
global_aff.lima = lima
global_aff.other = other
ng_aff_list = []
for name_list, (irq, proc) in zip(self.netdev_groups,
zip(irq_aff, proc_aff)):
for name_list, queue_data in netdev_aff:
ng_aff = NetDevGroupCPUAffinity()
ng_aff.name_list = name_list
ng_aff_queue = NetDevRxQueueCPUAffinity()
ng_aff_queue.irq = Affinity(irq)
ng_aff_queue.processing = Affinity(proc)
ng_aff.queue_affinity = {-1: ng_aff_queue}
ng_aff.name_list = name_list.split(',')
queue_aff = {}
for queue, (irq, proc) in queue_data.items():
ng_aff_queue = NetDevRxQueueCPUAffinity()
ng_aff_queue.irq = irq
ng_aff_queue.processing = proc
queue_aff[queue] = ng_aff_queue
ng_aff.queue_affinity = queue_aff
ng_aff_list.append(ng_aff)
global_affinity.netdev = ng_aff_list
aff_map[pixel_depth] = global_affinity
global_aff.netdev = ng_aff_list
aff_map[pixel_depth] = global_aff
return aff_map
@Core.DEB_MEMBER_FUNCT
def read_netdev_groups(self, attr):
netdev_groups = [','.join(g) for g in self.netdev_groups]
deb.Return("aff_array=%s" % netdev_groups)
attr.set_value(netdev_groups)
def getStringFromPixelDepthCPUAffinityMap(self, aff_map, use_cpu=True):
pixel_depth_aff_list = []
def cpu_str(a):
cpu_list = [str(i) for i in range(128) if NumAffinity(a) & (1 << i)]
return 'CPU(%s)' % ', '.join(cpu_list)
def mask_str(a):
return 'Mask(0x%x)' % NumAffinity(a)
aff_2_str = cpu_str if use_cpu else mask_str
for pixel_depth, global_aff in sorted(aff_map.items()):
recv_list = []
for r in global_aff.recv:
p1, p2 = zip(r.listeners, r.writers, r.port_threads)
def port_str(p):
return '(%s)' % ', '.join(map(aff_2_str, p))
recv_str = '(%s, %s)' % (port_str(p1), port_str(p2))
recv_list.append(recv_str)
recv_str = '(%s)' % ', '.join(recv_list)
lima_str = aff_2_str(global_aff.lima)
other_str = aff_2_str(global_aff.other)
netdev_grp_list = []
for netdev_grp in global_aff.netdev:
name_str = ','.join(netdev_grp.name_list)
queue_list = [('%d: (%s, %s)' % (q, aff_2_str(a.irq),
aff_2_str(a.processing)))
for q, a in netdev_grp.queue_affinity.items()]
queue_str = ','.join(queue_list)
netdev_str = '"%s": {%s}' % (name_str, queue_str)
netdev_grp_list.append(netdev_str)
netdev_grp_str = '(%s)' % ', '.join(netdev_grp_list)
aff_str = '%d: (%s, %s, %s, %s)' % (pixel_depth, recv_str,
lima_str, other_str,
netdev_grp_str)
pixel_depth_aff_list.append(aff_str)
return '{%s}' % ', '.join(pixel_depth_aff_list)
@Core.DEB_MEMBER_FUNCT
def expandPixelDepthRefs(self, aff_map):
for k, v in aff_map.items():
if type(v) != str:
continue
re_str = '^@(%s)$' % '|'.join(self.__PixelDepth.keys())
re_obj = re.compile(re_str)
m = re_obj.match(v)
if not m:
err = 'Invalid pixel_depth_cpu_affinity_map entry: %s' % v
raise ValueError(err)
o = int(m.group(1))
v = aff_map[o]
if type(v) == str:
raise ValueError('Invalid reference to pixel depth %d' % o)
aff_map[k] = v
@Core.DEB_MEMBER_FUNCT
def printPixelDepthCPUAffinityMap(self, aff_map):
for pixel_depth, global_aff in sorted(aff_map.items()):
deb.Always('PixelDepth: %d' % pixel_depth)
self.printGlobalAffinity(global_aff)
@Core.DEB_MEMBER_FUNCT
def write_netdev_groups(self, attr):
netdev_groups = [g.split(',') for g in attr.get_write_value()]
deb.Param("aff_array=%s" % netdev_groups)
self.netdev_groups = netdev_groups
def printGlobalAffinity(self, global_aff):
def A(x):
return hex(NumAffinity(x))
for i, r in enumerate(global_aff.recv):
s = "Recv[%d]:" % i
s += " listeners=%s," % [A(x) for x in r.listeners]
s += " writers=%s," % [A(x) for x in r.writers]
s += " port_threads=%s" % [A(x) for x in r.port_threads]
deb.Always(' ' + s)
lima, other = global_aff.lima, global_aff.other
deb.Always(' Lima=%s, Other=%s' % (A(lima), A(other)))
for netdev_grp in global_aff.netdev:
s = "NetDevGroup[%s]: {" % ','.join(netdev_grp.name_list)
l = []
for queue, queue_aff in netdev_grp.queue_affinity.items():
l.append('%d: (%s, %s)' % (queue, A(queue_aff.irq),
A(queue_aff.processing)))
s += ','.join(l) + "}"
deb.Always(' ' + s)
@Core.DEB_MEMBER_FUNCT
def read_pixel_depth_cpu_affinity_map(self, attr):
aff_map = self.cam.getPixelDepthCPUAffinityMap()
aff_array = self.getArrayFromPixelDepthCPUAffinityMap(aff_map)
deb.Return("aff_array=%s" % aff_array)
attr.set_value(aff_array)
aff_str = self.getStringFromPixelDepthCPUAffinityMap(aff_map)
deb.Return("aff_str=%s" % aff_str)
attr.set_value(aff_str)
@Core.DEB_MEMBER_FUNCT
def write_pixel_depth_cpu_affinity_map(self, attr):
aff_array = attr.get_write_value()
deb.Param("aff_array=%s" % aff_array)
aff_map = self.getPixelDepthCPUAffinityMapFromArray(aff_array)
aff_str = attr.get_write_value()
deb.Param("aff_str=%s" % aff_str)
aff_map = self.getPixelDepthCPUAffinityMapFromString(aff_str)
self.printPixelDepthCPUAffinityMap(aff_map)
self.cam.setPixelDepthCPUAffinityMap(aff_map)
......@@ -486,16 +505,19 @@ class SlsDetectorClass(PyTango.DeviceClass):
'tolerate_lost_packets':
[PyTango.DevBoolean,
"Initial tolerance to lost packets", True],
'netdev_groups':
[PyTango.DevVarStringArray,
"List of network device groups, each group is a list of "
"comma-separated interface names: [\"ethX,ethY\", \"ethZ,...\"]", []],
'pixel_depth_cpu_affinity_map':
[PyTango.DevVarStringArray,
"Default PixelDepthCPUAffinityMap as a list of 5+n-value tuple "
"strings (n is nb of netdev_groups): "
"[\"<pixel_depth>,<recv_l>,<recv_w>,<lima>,<other>"
"[,<netdev_grp1>,...]\", ...]", []],
"Default PixelDepthCPUAffinityMap as Python string(s) defining a dict: "
"{<pixel_depth>: <global_affinity>}, being global_affinity a tuple: "
"(<recv_list>, <lima>, <other>, <netdev_grp_list>), where recv_list "
"is a list of tupples in the form: (<port_1>, <port_2>), where portX "
"is a tupple of affinities: (<listener>, <writer>, <port_thread>), "
"lima and and other are affinities, and netdev_grp_list is a list of "
"tuples in the form: "
"(<comma_separated_netdev_name_list>, <rx_queue_affinity_map>), the "
"latter in the form of: {<queue>: (<irq>, <processing>)}. "
"Each affinity can be expressed by one of the functions: Mask(mask) "
"or CPU(<cpu1>[, ..., <cpuN>]) for independent CPU enumeration", []],
}
cmd_list = {
......@@ -583,14 +605,10 @@ class SlsDetectorClass(PyTango.DeviceClass):
[[PyTango.DevBoolean,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'netdev_groups':
[[PyTango.DevString,
PyTango.SPECTRUM,
PyTango.READ_WRITE, 64]],
'pixel_depth_cpu_affinity_map':
[[PyTango.DevLong,
PyTango.IMAGE,
PyTango.READ_WRITE, 64, 5]],
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'stats_do_hist':
[[PyTango.DevBoolean,
PyTango.SCALAR,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment