Commit bbc778de authored by Samuel Debionne's avatar Samuel Debionne
Browse files

Merge branch 'client-fixes-and-refactoring' into 'develop'

Small fix & refactoring in client

See merge request !117
parents c8ab8cb8 5148f6c6
Pipeline #71818 passed with stages
in 15 minutes and 25 seconds
......@@ -39,7 +39,7 @@ def FSMTransition(**kws):
return NotifyingTransition(notifier=notify, **kws)
def Cpp2PythonName(n):
def cpp_2_python_name(n):
return string.capwords(n.replace('_', ' ')).replace(' ', '')
......@@ -50,16 +50,14 @@ class ParamBase:
# from_model: create an item of type t from a Model instance x
# to_model: return Model instance from item value v
ItemKind = namedtuple('Detector_Param_ItemKind',
['name', 'from_model', 'to_model'])
['name', 'to_model', 'from_model'],
defaults=[lambda t, x: t(x)])
ItemKinds = [ItemKind('enums',
lambda t, x: t(x),
lambda v: v.value),
ItemKind('types',
lambda t, x: t(**x),
lambda v: v.getModelDict()),
ItemKind('arrays',
lambda t, x: t(x),
lambda v: v.getModelList())]
# class dict with the three kind of items: enums, types, arrays
......@@ -69,6 +67,10 @@ class ParamBase:
model = self.Model(*args, **kws)
self.__dict__.update({'__model__': model})
# Also parse dictionary passed as unique arg
if len(args) == 1:
kws.update(args[0])
# collect the class dicts provided on type creation, one per ItemKind:
# self.Enums, self.Types, self.Arrays
dicts = {n: getattr(self, n.capitalize()) for n in self.ItemKindDict}
......@@ -192,7 +194,7 @@ class ParamBase:
def __deepcopy__(self, memo):
klass = self.__class__
return klass(**self.getModelDict())
return klass(self.getModelDict())
class ParamArray(list):
......@@ -263,7 +265,7 @@ class ParamArray(list):
return klass(self.getModelList(), data=self.__data__)
def GetSchemaType(schema, name=None, sub_schema=None, name_prefix=''):
def get_schema_type(schema, name=None, sub_schema=None, name_prefix=''):
def RefName(x):
return x['$ref'].split('/')[-1]
......@@ -283,11 +285,11 @@ def GetSchemaType(schema, name=None, sub_schema=None, name_prefix=''):
schema_items = type_def['properties'].items()
type_name = name_prefix + Cpp2PythonName(name)
type_name = name_prefix + cpp_2_python_name(name)
HasEnum = lambda p: 'enum' in p
def SchemaEnum(n, items):
enum_name = f'{type_name}.{Cpp2PythonName(n)}'
enum_name = f'{type_name}.{cpp_2_python_name(n)}'
return Enum(enum_name, {e.upper(): e for e in items})
enums = {n: SchemaEnum(n, p['enum'])
for n, p in schema_items if HasEnum(p)}
......@@ -297,7 +299,7 @@ def GetSchemaType(schema, name=None, sub_schema=None, name_prefix=''):
name = RefName(p) if '$ref' in p else n
sub_schema = p if 'properties' in p else None
prefix = f'{type_name}.'
return GetSchemaType(schema, name, sub_schema, prefix)
return get_schema_type(schema, name, sub_schema, prefix)
types = {n: SchemaType(n, p)
for n, p in schema_items if HasType(p)}
......@@ -322,13 +324,13 @@ def GetSchemaType(schema, name=None, sub_schema=None, name_prefix=''):
type_dict = dict(Model=model, Enums=enums, Types=types, Arrays=arrays)
for d in enums, types, arrays:
type_dict.update({Cpp2PythonName(n): t for n, t in d.items()})
type_dict.update({cpp_2_python_name(n): t for n, t in d.items()})
return type(type_name, (ParamBase,), type_dict)
def GetTypeData(name, schema_json):
def get_type_data(name, schema_json):
schema = json.loads(schema_json)
klass = GetSchemaType(schema)
klass = get_schema_type(schema)
return TypeData(name, schema, klass)
......@@ -399,26 +401,25 @@ class Detector:
class StateFollower:
class Iterator:
def __init__(self, follower, queue):
self.__follower = follower
self.__queue = queue
def __next__(self):
return next(self.__queue)
def __init__(self, det, queue):
def __init__(self, det, queue, end_event):
self.__det_ref = weakref.ref(det)
self.__queue = queue
self.__end_task = None
if end_event is not None:
def end_task(event, queue):
end_event.wait()
queue.put(StopIteration)
self.__end_task = gevent.spawn(end_task, end_event, queue)
def __del__(self):
if self.__end_task:
self.__end_task.kill()
det = self.__det_ref()
if det:
det._removeStateQueue(self.__queue)
def __iter__(self):
return self.Iterator(self, self.__queue)
return self.__queue.__iter__()
def __init__(self, ctrl_dev, *recv_devs, timeout=DefaultTimeout):
if not recv_devs:
......@@ -451,9 +452,9 @@ class Detector:
prop = tango_db.get_class_attribute_property(tango_class, n)
return prop[n]['schema'][0]
def GetAttrData(n):
type_data = GetTypeData(n, GetSchema(n))
type_data = get_type_data(n, GetSchema(n))
json_data = json.loads(getattr(d, n))
value = type_data.klass(**json_data)
value = type_data.klass(json_data)
return dict(type_data=type_data, value=value)
names = tango_db.get_class_attribute_list(tango_class, '*')
attr_names = [n for n in names for attr in d.attribute_list_query()
......@@ -461,7 +462,7 @@ class Detector:
attrs = {n: GetAttrData(n) for n in attr_names}
self.__attrs.update(attrs)
def GetKlassData(n, attr_data):
return (Cpp2PythonName(n), attr_data['type_data'].klass)
return (cpp_2_python_name(n), attr_data['type_data'].klass)
self.__dict__.update([GetKlassData(n, attr_data)
for n, attr_data in attrs.items()])
class Descriptor:
......@@ -536,28 +537,29 @@ class Detector:
for q in self.__state_queues:
q.put(state)
def createStateFollower(self):
def createStateFollower(self, end_event=None):
queue = gevent.queue.Queue()
queue.put(self.getState())
self.__state_queues.append(queue)
return self.StateFollower(self, queue)
return self.StateFollower(self, queue, end_event)
def _removeStateQueue(self, queue):
self.__state_queues.remove(queue)
def getStateEvent(self, state):
return self._getEventFor('State', state)
def getStateEvent(self, state, end_event=None):
return self._getEventFor('State', state, end_event)
def getNotStateEvent(self, state):
return self._getEventFor('NotState', state)
def getNotStateEvent(self, state, end_event=None):
return self._getEventFor('NotState', state, end_event)
def _getEventFor(self, kind, state):
def _getEventFor(self, kind, state, end_event):
events = self.__state_events[kind]
if state in events:
return events[state]()
if not self.__state_follower:
det_ref = weakref.ref(self)
args = (self.createStateFollower(), self.__state_events, det_ref)
follower = self.createStateFollower(end_event)
args = (follower, self.__state_events, det_ref)
t = gevent.spawn(self._stateEventFollower, *args)
self.__state_follower = t
event = gevent.event.Event()
......@@ -620,6 +622,10 @@ class Detector:
class Processing:
def __init__(self, det, uuid, instances):
try:
self.__has_roi_counters = det.proc_params.counters.rois
except:
self.__has_roi_counters = False
self.__uuid = uuid
self.__devs = [ tango.DeviceProxy(instance) for instance in instances]
for d in self.__devs:
......@@ -652,11 +658,13 @@ class Detector:
return [ data_array.decode_devencoded_image(dev.getFrame(frame_idx)) for dev in self.__devs ][0]
def is_finished(self, nb_frames):
def isFinished(self, nb_frames):
"""" Returns True if all counters are equal to nb_frames """
# Finished when all counters are equal to nb_frames
return all([ c.nb_frames_counters == nb_frames and c.nb_frames_saved == nb_frames for c in self.counters ])
def countersFinished(c):
return not self.__has_roi_counters or c.nb_frames_counters == nb_frames
return all([ countersFinished(c) and c.nb_frames_saved == nb_frames for c in self.counters ])
def _eraseProcessing(self, uuid):
[ recv.erasePipeline(str(uuid)) for recv in self.__recvs ]
......@@ -720,7 +728,7 @@ class Detector:
return min(recv.nb_frames_xferred for recv in self.__recvs)
def GetSavingGroups(proc_params):
def get_saving_groups(proc_params):
return [g for n, g in proc_params.items() if n.startswith('sav')]
......@@ -755,7 +763,7 @@ def test_acq_params_schema(d):
pretty_print_json(acq_params_json, 'acq_params_json')
acq_params_dict = json.loads(acq_params_json)
print(f'acq_params_dict={acq_params_dict} [{type(acq_params_dict)}]')
acq_params = AcqParams(**acq_params_dict)
acq_params = AcqParams(acq_params_dict)
print(f'acq_params={acq_params}')
expo_time = acq_params.acq.expo_time
print(f'expo_time={expo_time} [{type(expo_time)}]')
......@@ -799,7 +807,7 @@ def test_acq_params_schema(d):
for k in 'flip', 'roi':
partial_acq_params_dict['img'].pop(k, None)
print(f'partial_acq_params_dict={partial_acq_params_dict}')
partial_acq_params = AcqParams(**partial_acq_params_dict)
partial_acq_params = AcqParams(partial_acq_params_dict)
partial_acq_params.acq.nb_frames *= 5
partial_acq_params.acq.expo_time *= 10
tl = dict(x=10, y=10)
......@@ -825,7 +833,7 @@ def test_proc_params_schema(d):
pretty_print_json(proc_params_json, 'proc_params_json')
proc_params_dict = json.loads(proc_params_json)
print(f'proc_params_dict={proc_params_dict} [{type(proc_params_dict)}]')
proc_params = ProcParams(**proc_params_dict)
proc_params = ProcParams(proc_params_dict)
print(f'proc_params={proc_params}')
proc_params_modified = False
if 'counters' in proc_params:
......@@ -850,7 +858,7 @@ def test_proc_params_schema(d):
proc_params.counters.rois = rois
proc_params.counters.rois = [roi]
proc_params_modified = True
for sav in GetSavingGroups(proc_params):
for sav in get_saving_groups(proc_params):
overwrite_policy = sav.file_exists_policy
print(f'overwrite_policy={overwrite_policy}')
overwrite_policy = sav.FileExistsPolicy.OVERWRITE
......@@ -864,6 +872,73 @@ def test_proc_params_schema(d):
print(f'recv_dev.proc_params={recv_dev.proc_params}')
def test_acquisition_states(device, args, config):
print('*** Testing states & acquisition ***')
print(f'State={device.getState()}')
for name, val in config.items():
param = getattr(device, name)
param.update(config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
def state_monitor(device, end_event):
follower = device.createStateFollower(end_event)
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
end_event = gevent.event.Event()
monitor = gevent.spawn(state_monitor, device, end_event)
uuid = uuid1()
with device.prepareAcq(uuid) as proc:
print(f'UUID={uuid}')
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
roi_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
# only if nb_frames > 5
def ready_for_stop():
ok = False
if args.stop_after_nb_frames:
ok = device.nb_frames_xferred >= args.stop_after_nb_frames
return event.ready() or ok
while not ready_for_stop():
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
roi_counters.append(rc)
event.wait(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.isFinished(device.nb_frames_xferred):
print(f'Counters={proc.counters}')
gevent.sleep(0.1)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(0.1)
print(f'ROI Counters={roi_counters}')
print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
end_event.set()
monitor.join()
def main():
parser = argparse.ArgumentParser(description='Lima2 Client test program.')
parser.add_argument('ctrl_dev', help='control Tango device name')
......@@ -881,6 +956,8 @@ def main():
help='JSON string with proc_params')
parser.add_argument('--proc_params_file',
help='file containg JSON string with proc_params')
parser.add_argument('--stop_after_nb_frames', type=int,
help='Stop after a number of frames have been xferred')
args = parser.parse_args()
......@@ -892,9 +969,6 @@ def main():
if args.test_proc_params_schema:
test_proc_params_schema(device)
print('*** Testing states & acquisition ***')
print(f'State={device.getState()}')
config = {}
config_keys = ['acq_params', 'proc_params']
for name, val in {k: '{}' for k in config_keys}.items():
......@@ -906,68 +980,13 @@ def main():
with open(fname) as f:
val = f.read()
par_json = json.loads(val)
klass_name = Cpp2PythonName(name)
klass_name = cpp_2_python_name(name)
klass = getattr(device, klass_name, None)
if klass:
config[name] = klass(**par_json)
config[name] = klass(par_json)
for name, val in config.items():
param = getattr(device, name)
param.update(config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
test_acquisition_states(device, args, config)
def state_monitor(follower):
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
follower = device.createStateFollower()
monitor = gevent.spawn(state_monitor, follower)
try:
uuid = uuid1()
with device.prepareAcq(uuid) as proc:
print(f'UUID={uuid}')
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
roi_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
nb_frames_before_stop = config['acq_params'].acq.nb_frames / 2
while not event.ready() and device.nb_frames_xferred < nb_frames_before_stop:
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
roi_counters.append(rc)
event.wait(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.is_finished(device.nb_frames_xferred):
print(f'Counters={proc.counters}')
gevent.sleep(0.1)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(0.1)
print(f'ROI Counters={roi_counters}')
print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
except tango.DevFailed as e:
print(e)
finally:
sys.exc_info()
del device
monitor.join()
if __name__ == '__main__':
t = gevent.spawn(main)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment