Commit 64d69b25 authored by Samuel Debionne's avatar Samuel Debionne
Browse files

Merge branch 'client-smx' into 'develop'

Adapt Client to SMX Pipeline

See merge request !125
parents bbc778de 69151813
Pipeline #73928 passed with stages
in 8 minutes and 26 seconds
......@@ -6,6 +6,7 @@ import string
import copy
import itertools
import argparse
import glob
import numpy as np
from enum import Enum, auto
from functools import partial, reduce
......@@ -43,6 +44,11 @@ def cpp_2_python_name(n):
return string.capwords(n.replace('_', ' ')).replace(' ', '')
def get_gpu_devs():
# TODO: should be configurable
return glob.glob('/dev/nvidia[0-9]')
TypeData = namedtuple('Detector_TypeData', ['name', 'schema', 'klass'])
class ParamBase:
......@@ -499,6 +505,7 @@ class Detector:
if 'acq_params' in attrs:
acq_params = copy.deepcopy(attrs['acq_params'])
# Configure Round-Robin xfer parameters
xfer = acq_params.xfer
nb_recvs = len(self.__recvs)
has_round_robin = True
......@@ -517,6 +524,15 @@ class Detector:
xfer.time_slice.stride = 1
attrs['acq_params'] = acq_params
if 'proc_params' in attrs:
proc_params = copy.deepcopy(attrs['proc_params'])
# Configure receivers GPUs
nb_recvs = len(self.__recvs)
nb_gpus_per_system = len(get_gpu_devs())
if nb_recvs > 1 and nb_gpus_per_system > 1 and 'gpu' in proc_params:
proc_params.gpu.device_idx = dev.recv_rank % nb_gpus_per_system
attrs['proc_params'] = proc_params
return attrs
def _getDevs(self):
......@@ -628,6 +644,8 @@ class Detector:
self.__has_roi_counters = False
self.__uuid = uuid
self.__devs = [ tango.DeviceProxy(instance) for instance in instances]
self.__names = [n for n in json.loads(self.__devs[0].counters).keys()
if n.startswith('nb_frames_')]
for d in self.__devs:
d.set_green_mode(tango.GreenMode.Gevent)
d.set_timeout_millis(int(Detector.DefaultTimeout * 1000))
......@@ -639,7 +657,7 @@ class Detector:
@property
def counters(self):
"""" Returns the progress counters """
Counter = namedtuple('Counter', ['nb_frames_source', 'nb_frames_counters', 'nb_frames_saved'])
Counter = namedtuple('Counter', self.__names)
return [ Counter(**json.loads(dev.counters)) for dev in self.__devs ]
def popRoiCounters(self, nb_frames):
......@@ -652,6 +670,15 @@ class Detector:
# If all devices have empty ROI counter list, then empty and returns an empty iterable
return [] if not all([c.size > 0 for c in counters]) else counters
def popPeakCounters(self, nb_frames):
"""" Pop a list of RoiCounters for each devices or None if no RoiCouters were extracted """
counters = [ dev.popPeakCounters(nb_frames) for dev in self.__devs ]
# TODO Round-Robin mode - compute frame_idx
# If all devices have empty ROI counter list, then empty and returns an empty iterable
return [] if not all([c.size > 0 for c in counters]) else counters
def getFrame(self, frame_idx):
# TODO Round-Robin mode - select receiver that has frame_idx
......@@ -663,7 +690,11 @@ class Detector:
# Finished when all counters are equal to nb_frames
def countersFinished(c):
return not self.__has_roi_counters or c.nb_frames_counters == nb_frames
return all([ countersFinished(c) and c.nb_frames_saved == nb_frames for c in self.counters ])
def savingFinished(c):
not_finished = [n for n in self.__names
if n.endswith('saved') and getattr(c, n) != nb_frames]
return not not_finished
return all([ countersFinished(c) and savingFinished(c) for c in self.counters ])
def _eraseProcessing(self, uuid):
[ recv.erasePipeline(str(uuid)) for recv in self.__recvs ]
......
#!/bin/python3
import json
acq_params = {
"acq": {
"expo_time": 10,
"latency_time": 990,
"nb_frames": 1000
},
"xfer": {
"time_slice": { "start":0, "count": 1000}
}
#"img": {
# "roi": {"topleft": {"x": 128, "y": 128}, "dimensions": {"x": 256, "y": 256}}
#}
}
fai_params_base = "/nobackup/lid29pwr9/shared/lima2/detectors/psi/data/processing"
fai_kernels_base = "/users/debionne/source/.vs/lima2/53b0100f-6e47-4325-9d48-81117e566430/src/detectors/psi/processing/kernels"
proc_params = {
"saving_dense": {
"base_path": "/nobackup/lid29pwr92/tmp",
"filename_prefix" : "output_dense",
"nb_frames_per_file": 10,
"file_exists_policy": "overwrite"
},
"saving_sparse": {
"base_path": "/nobackup/lid29pwr92/tmp",
"filename_prefix" : "output_sparse",
"nb_frames_per_file": 10,
"file_exists_policy": "overwrite"
},
"counters": {
"rois": [{"topleft": {"x": 128, "y": 128}, "dimensions": {"x": 256, "y": 256}}]
},
"fai": {
"gain_path": fai_params_base + "/jungfrau_500k_307/gains_wg.h5",
"pedestal_path": fai_params_base + "/jungfrau_500k_307/pedestals_wg.h5",
"mask_path": fai_params_base + "/jungfrau_500k_307/mask_wg.h5",
"csr_path": fai_params_base + "/jungfrau_500k_307/csr_wg.h5",
"radius2d_path": fai_params_base + "/jungfrau_500k_307/r_center_wg.h5",
"radius1d_path": fai_params_base + "/jungfrau_500k_307/bin_centers_wg.h5",
"dummy": 0.0,
"delta_dummy": 0.0,
"normalization_factor": 1.0,
"cutoff_clip": 5.0,
"cycle": 5,
"empty": 0.0,
"noise": 1.0,
"cutoff_pick": 3.0,
"cl_source_path": fai_kernels_base
}
}
import gevent
import sys
import tango
from Lima2.Client.Detector import Detector
from uuid import uuid1
device = Detector("id00/limacontrol/jungfrau_500k_307_lid29pwr9_x1_0", "id00/limareceiver/jungfrau_500k_307_lid29pwr9_x1_1")
def run_acquisition(device):
# FSM follower
def state_monitor(follower):
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
follower = device.createStateFollower()
monitor = gevent.spawn(state_monitor, follower)
try:
device.acq_params.update(device.AcqParams(acq_params))
print(device.acq_params)
device.proc_params.update(device.ProcParams(proc_params))
print(device.proc_params)
uuid = uuid1()
print(f'UUID={uuid}')
with device.prepareAcq(uuid) as proc:
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
roi_counters = []
peak_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
nb_frames_before_stop = device.acq_params.acq.nb_frames / 2
while not event.ready() and device.nb_frames_xferred < nb_frames_before_stop:
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
roi_counters.append(rc)
# Peak counter acquisition 5 at a time
pc = proc.popPeakCounters(5)
peak_counters.append(pc)
event.wait(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.is_finished(device.nb_frames_xferred):
print(f'Counters={proc.counters}')
gevent.sleep(0.1)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(0.1)
print(f'ROI Counters={roi_counters}')
print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
except tango.DevFailed as e:
print(e)
finally:
sys.exc_info()
monitor.join()
run_acquisition(device)
#!/bin/python3
import json
acq_params = {
"acq": {
"expo_time": 10,
"latency_time": 990,
"nb_frames": 20
},
"xfer": {
"time_slice": { "start":0, "count": 20}
},
#"img": {
# "roi": {"topleft": {"x": 128, "y": 128}, "dimensions": {"x": 256, "y": 256}}
#}
"det": {
"pixel_type": "gray16",
'generator_type': 'gauss',
'nb_prefetch_frames': 2,
'grow_factor': 0.3,
'peaks': [{'x0': 1024.0 + 128.0, 'y0': 1024.0 - 256.0, 'fwhm': 128.0, 'max': 100.0}]
}
}
params_base = "/nobackup/lid29pwr9/shared/lima2/detectors/simulator/data/processing"
fai_kernels_base = "/users/debionne/source/.vs/lima2/53b0100f-6e47-4325-9d48-81117e566430/src/processings/common/fai/kernels"
proc_params = {
"fifo": {
"nb_fifo_frames": 1000
},
"saving_dense": {
"base_path": "/nobackup/lid29pwr92/tmp",
"filename_prefix" : "output_dense",
"nb_frames_per_file": 10,
"file_exists_policy": "overwrite"
},
"saving_sparse": {
"base_path": "/nobackup/lid29pwr92/tmp",
"filename_prefix" : "output_sparse",
"nb_frames_per_file": 10,
"file_exists_policy": "overwrite"
},
"counters": {
"rois": [{"topleft": {"x": 128, "y": 128}, "dimensions": {"x": 256, "y": 256}}]
},
"gpu" :{
"device_idx": 0
},
"jfrau" :{
"gain_path": params_base + "/simulator_4M/gains.h5",
"pedestal_path": params_base + "/simulator_4M/pedestals.h5",
},
"fai": {
#"mask_path": fai_params_base + "/jungfrau_500k_307/mask_wg.h5",
"csr_path": params_base + "/simulator_4M/csr.h5",
"radius2d_path": params_base + "/simulator_4M/r_center.h5",
"radius1d_path": params_base + "/simulator_4M/bin_centers.h5",
"dummy": 0.0,
"delta_dummy": 0.0,
"normalization_factor": 1.0,
"cutoff_clip": 5.0,
"cycle": 5,
#"empty": 0.0,
"noise": 1.0,
"cutoff_pick": 3.0,
"cl_source_path": fai_kernels_base
}
}
import gevent
import sys
import tango
from Lima2.Client.Detector import Detector
from uuid import uuid1
device = Detector("id00/limacontrol/sam_simulator_smx", "id00/limareceiver/sam_simulator_smx")
def run_acquisition(device):
# FSM follower
def state_monitor(follower):
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
follower = device.createStateFollower()
monitor = gevent.spawn(state_monitor, follower)
try:
device.acq_params.update(device.AcqParams(acq_params))
print(device.acq_params)
device.proc_params.update(device.ProcParams(proc_params))
print(device.proc_params)
uuid = uuid1()
print(f'UUID={uuid}')
with device.prepareAcq(uuid) as proc:
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
roi_counters = []
peak_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
nb_frames_before_stop = device.acq_params.acq.nb_frames
while not event.ready() and device.nb_frames_xferred < nb_frames_before_stop:
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
roi_counters.append(rc)
# Peak counter acquisition 5 at a time
pc = proc.popPeakCounters(5)
peak_counters.append(pc)
gevent.sleep(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.is_finished(device.nb_frames_xferred):
print(f'Counters={proc.counters}')
gevent.sleep(1.0)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(1.0)
#print(f'ROI Counters={roi_counters}')
#print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
except tango.DevFailed as e:
print(e)
finally:
sys.exc_info()
monitor.join()
run_acquisition(device)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment