Commit 09969204 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

Merge branch 'first-bliss-integration' into psi-smx-working

parents 883b3dfc 482a9278
Pipeline #76856 failed with stages
in 7 seconds
......@@ -12,7 +12,7 @@ from enum import Enum, auto
from functools import partial, reduce
from uuid import uuid1
from collections import namedtuple
from contextlib import contextmanager
from contextlib import contextmanager, ExitStack
import tango
import warlock
......@@ -49,6 +49,22 @@ def get_gpu_devs():
return glob.glob('/dev/nvidia[0-9]')
class EnumParam(Enum):
def dumps(self, **kws):
indent = kws.get('indent', None)
typename = kws.get('typename', True)
klass = self.__class__.__name__
klass_prefix = f'{klass}: ' if typename else ''
value = str(self.value)
if indent is None:
prefix, suffix = f'<{klass_prefix}', '>'
else:
prefix, suffix = klass_prefix, ''
value = value.upper()
return prefix + value + suffix
TypeData = namedtuple('Detector_TypeData', ['name', 'schema', 'klass'])
class ParamBase:
......@@ -118,6 +134,9 @@ class ParamBase:
def f(p, v):
for n, d in self._getItemDataDicts():
if p in d:
# If val is not from the expected class, pass to model layer
if not is_special_type_item(v, d[p]['klass']):
break
to_model = d[p]['to_model']
return to_model(v)
return v
......@@ -158,6 +177,9 @@ class ParamBase:
model[name] = f(name, val)
for n, d in self._getItemDataDicts():
if name in d:
# If val is not from the expected class, get it from model layer
if not is_special_type_item(val, d[name]['klass']):
val = d[name]['from_model'](val)
d[name]['value'] = val
break
......@@ -170,7 +192,10 @@ class ParamBase:
return model.__iter__()
def __getattr__(self, name):
return self[name]
if name in self.keys():
return self[name]
klass = self.__class__
raise AttributeError(f'Item {name} not found in {klass.__name__}')
def __setattr__(self, name, val):
self[name] = val
......@@ -179,14 +204,57 @@ class ParamBase:
return repr(self)
def __repr__(self):
klass = self.__class__.__name__
return f'<{klass}: {repr(dict(self.items()))}>'
return self.dumps()
def __dir__(self):
this_dir = super().__dir__()
item_dirs = [d.keys() for n, d in self._getItemDataDicts()]
return itertools.chain(this_dir, *item_dirs)
def dumps(self, **kws):
indent = kws.get('indent', None)
level = kws.setdefault('level', 0)
typename = kws.get('typename', True)
sort_keys = kws.get('sort_keys', False)
capitalize = kws.get('capitalize', False)
klass = self.__class__.__name__
klass_prefix = f'{klass}: ' if typename else ''
keys = sorted(self.keys()) if sort_keys else self.keys()
if indent is None:
prefix, suffix = f'<{klass_prefix}', '>'
sep = ', '
head_len = 0
key_len = 0
else:
prefix, suffix = klass_prefix, ''
sep = '\n'
head_len = indent * level
key_len = max([len(k) for k in keys]) + 2
def key_str(k):
if capitalize:
k = ' '.join([x.capitalize() for x in k.split('_')])
return k + ': '
item_kws = dict(kws)
item_kws['level'] += 1
def item_str(v):
if any([isinstance(v, b) for b in (EnumParam, ParamArray)]):
return v.dumps(**kws)
elif isinstance(v, ParamBase):
key_suffix = '\n' if indent else ''
return key_suffix + v.dumps(**item_kws)
else:
return repr(v)
head = '%*s' % (head_len, '')
def item_line(k):
return '%s%-*s%s' % (head, key_len, key_str(k), item_str(self[k]))
lines = [item_line(k) for k in keys]
return prefix + sep.join(lines) + suffix
def getModelDict(self):
f = self._getToModelFunc()
return {p: f(p, v) for p, v in self.items()}
......@@ -233,7 +301,9 @@ class ParamArray(list):
def __setitem__(self, key, value):
self.__checkitem__(value)
model_value = self.__to_model__(value)
# If val is not from the expected class, pass to model layer
direct = not is_special_type_item(value, self.__data__['klass'])
model_value = value if direct else self.__to_model__(value)
self.__model__.__setitem__(key, model_value)
# store a copy of the value
value = self.__from_model__(model_value)
......@@ -270,6 +340,31 @@ class ParamArray(list):
klass = self.__class__
return klass(self.getModelList(), data=self.__data__)
def dumps(self, **kws):
if not len(self):
return '[]'
indent = kws.get('indent', None)
level = kws.get('level', 0)
klass = self.__data__['klass']
is_special = any([issubclass(klass, b)
for b in (EnumParam, ParamBase, ParamArray)])
if indent is None or not is_special or issubclass(klass, EnumParam):
prefix, suffix = '[', ']'
sep = ', '
else:
close_indent = level * indent
prefix, suffix = '[\n', '\n' + ' ' * close_indent + ']'
sep = ',\n'
item_kws = dict(kws)
item_kws['level'] += 1
def item_str(v):
if is_special:
return v.dumps(**item_kws)
else:
return repr(v)
return prefix + sep.join([item_str(x) for x in self]) + suffix
def get_schema_type(schema, name=None, sub_schema=None, name_prefix=''):
def RefName(x):
......@@ -296,7 +391,7 @@ def get_schema_type(schema, name=None, sub_schema=None, name_prefix=''):
HasEnum = lambda p: 'enum' in p
def SchemaEnum(n, items):
enum_name = f'{type_name}.{cpp_2_python_name(n)}'
return Enum(enum_name, {e.upper(): e for e in items})
return EnumParam(enum_name, {e.upper(): e for e in items})
enums = {n: SchemaEnum(n, p['enum'])
for n, p in schema_items if HasEnum(p)}
......@@ -334,6 +429,14 @@ def get_schema_type(schema, name=None, sub_schema=None, name_prefix=''):
return type(type_name, (ParamBase,), type_dict)
def get_type_from_factory(factory):
return factory.func if type(factory) is partial else factory
def is_special_type_item(item, factory):
return isinstance(item, get_type_from_factory(factory))
def get_type_data(name, schema_json):
schema = json.loads(schema_json)
klass = get_schema_type(schema)
......@@ -428,29 +531,49 @@ class Detector:
return self.__queue.__iter__()
def __init__(self, ctrl_dev, *recv_devs, timeout=DefaultTimeout):
self.__fsm = self.FSM(self)
self.__tasks = {}
self.__events = []
self.__state_queues = []
self.__state_events = {'State': {}, 'NotState': {}}
self.__state_follower = None
self.__event_ids = {}
self.__attrs = {}
if not recv_devs:
raise ValueError('Must provide at least one receiver')
self.__ctrl = tango.DeviceProxy(ctrl_dev)
self.__recvs = [tango.DeviceProxy(r) for r in recv_devs]
for d in self._getDevs():
d.set_green_mode(tango.GreenMode.Gevent)
d.set_timeout_millis(int(timeout * 1000))
self._initAttrs()
self.__fsm = self.FSM(self)
self.__tasks = {}
self.__events = []
self.__state_queues = []
self.__state_events = {'State': {}, 'NotState': {}}
self.__state_follower = None
self.__event_ids = {}
self.checkConnection()
def __del__(self):
for q in self.__state_queues:
q.put(StopIteration)
@property
def det_info(self):
return json.loads(self.__ctrl.det_info)
def checkConnection(self):
try:
self.ping()
if not self.__attrs:
self._initAttrs()
return True
except:
return False
def _getAttrs(self):
if not self.checkConnection():
raise RuntimeError("Connection error")
return self.__attrs
def _initAttrs(self):
self.__attrs = {}
for d in self._getDevs()[:2]:
tango_db = d.get_device_db()
tango_class = d.info().dev_class
......@@ -471,24 +594,37 @@ class Detector:
return (cpp_2_python_name(n), attr_data['type_data'].klass)
self.__dict__.update([GetKlassData(n, attr_data)
for n, attr_data in attrs.items()])
# The descriptors returning the dynamic params' attrs for each detector
class Descriptor:
def __init__(self):
self.__attr = {}
def addDeviceAttrData(self, dev, attr):
self.__attr[hash(dev)] = attr
def __get__(self, obj, objtype=None):
if obj is None:
return self
# Descriptors are class-based, ensure instance attrs are inited
if hash(obj) not in self.__attr:
obj._initAttrs()
return self.__attr[hash(obj)]['value']
def __set__(self, obj, value):
self.__attr[hash(obj)]['value'] = value
for n, attr_data in self.__attrs.items():
desc = getattr(self.__class__, n, Descriptor())
desc.addDeviceAttrData(self, attr_data)
setattr(self.__class__, n, desc)
def ping(self):
for d in self._getDevs():
d.ping()
def getAttrTypeData(self, name):
if name not in self.__attrs:
attrs = self._getAttrs()
if name not in attrs:
raise ValueError(f'Could not find TypeData for attribute {name}')
return self.__attrs[name]['type_data']
return attrs[name]['type_data']
def _updateAttrs(self):
for d in self._getDevs():
......@@ -500,7 +636,7 @@ class Detector:
def _getAttrs4Dev(self, dev):
attrs = {n: attr_data['value']
for n, attr_data in self.__attrs.items()
for n, attr_data in self._getAttrs().items()
for attr in dev.attribute_list_query() if attr.name == n}
if 'acq_params' in attrs:
......@@ -768,6 +904,67 @@ def SafeDetectorParams(d, param_list=['acq_params', 'proc_params']):
setattr(d, param, value)
def run_acquisition(device, uuid=None, cb={}):
if uuid is None:
uuid = uuid1()
with ExitStack() as stack:
state_cb = cb.get('state', None)
if state_cb:
def state_monitor(device, end_event, cb):
follower = device.createStateFollower(end_event)
for state in follower:
cb(state)
cb(None)
end_event = gevent.event.Event()
monitor = gevent.spawn(state_monitor, device, end_event, state_cb)
def cleanup():
end_event.set()
monitor.join()
stack.callback(cleanup)
proc = stack.enter_context(device.prepareAcq(uuid))
prepare_cb = cb.get('prepare', None)
if prepare_cb:
prepare_cb(uuid, proc)
end_cb = cb.get('end', None)
if end_cb:
stack.callback(end_cb)
device.startAcq()
start_cb = cb.get('start', None)
if start_cb:
start_cb()
def stop_wait_proc():
# Stop if running
if device.getState() == Detector.State.Running:
print('Stopping running acquisition ...')
device.stopAcq()
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
while not event.ready():
event.wait(1.0)
# Signal that acq has stopped
stop_cb = cb.get('stop', None)
if stop_cb:
stop_cb(device.nb_frames_xferred)
# Wait for the end of the processing
while not proc.isFinished():
gevent.sleep(0.1)
stack.callback(stop_wait_proc)
event = device.getNotStateEvent(Detector.State.Running)
while not event.ready():
xfer_cb = cb.get('xfer', None)
if xfer_cb:
xfer_cb(device.nb_frames_xferred)
event.wait(1.0)
def pretty_print_json(json_str, desc=None):
if desc:
print(f'{desc}:')
......@@ -897,71 +1094,102 @@ def test_proc_params_schema(d):
print(f'recv_dev.proc_params={recv_dev.proc_params}')
def test_acquisition_states(device, args, config):
print('*** Testing states & acquisition ***')
print(f'State={device.getState()}')
class TestAcqStates:
for name, val in config.items():
param = getattr(device, name)
param.update(config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
def __init__(self, device, args, config):
self.device = device
self.args = args
self.config = config
def state_monitor(device, end_event):
follower = device.createStateFollower(end_event)
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
self.start_event = gevent.event.Event()
self.end_event = gevent.event.Event()
self.xferred_frames = gevent.event.AsyncResult()
self.uuid = self.args.get('uuid', None)
self.proc = None
end_event = gevent.event.Event()
monitor = gevent.spawn(state_monitor, device, end_event)
def run(self):
self.show_state('Run')
uuid = uuid1()
with device.prepareAcq(uuid) as proc:
print(f'UUID={uuid}')
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
for name in self.config:
param = getattr(self.device, name)
param.update(self.config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
cb = dict(state=self.state_cb,
prepare=self.prepare_cb,
start=self.start_cb,
xfer=self.xfer_cb,
stop=self.stop_cb,
end=self.end_cb)
return run_acquisition(self.device, self.uuid, cb)
def show_state(self, name):
print(f'{name + ":":8s} state={self.device.getState()}')
def state_cb(self, state):
if state is not None:
print(f'Monitor: state={state}')
else:
print('\nMonitor finished!')
def proc_loop(self, proc):
self.start_event.wait()
roi_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
# only if nb_frames > 5
def ready_for_stop():
ok = False
if args.stop_after_nb_frames:
ok = device.nb_frames_xferred >= args.stop_after_nb_frames
return event.ready() or ok
while not ready_for_stop():
print(f'nb_frames_xferred={device.nb_frames_xferred}')
while not (self.end_event.ready() or proc.isFinished()):
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
if rc:
roi_counters.append(rc)
gevent.sleep(1)
print(f'Final counters={proc.counters}')
# Finish ROI counter download
while True:
rc = proc.popRoiCounters(5)
if not rc:
break
roi_counters.append(rc)
event.wait(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.isFinished():
print(f'Counters={proc.counters}')
gevent.sleep(0.1)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(0.1)
print(f'ROI Counters={roi_counters}')
print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
nb_frames_xferred = self.xferred_frames.get()
print(f'Frame={proc.getFrame(nb_frames_xferred - 1)}')
def prepare_cb(self, uuid, proc):
self.uuid = uuid
print(f'UUID={uuid}')
self.show_state('Prepare')
self.proc = gevent.spawn(self.proc_loop, proc)
end_event.set()
monitor.join()
def start_cb(self):
self.show_state('Start')
self.start_event.set()
def xfer_cb(self, nb_frames_xferred):
print(f'Curr. nb_frames_xferred={nb_frames_xferred}')
# Simulate an asynchronous Stop, if requested
stop_frames = self.args.get('stop_after_nb_frames', 0)
if (stop_frames and nb_frames_xferred >= stop_frames and
self.device.getState() == Detector.State.Running):
print(f'##### STOP #####')
self.device.stopAcq()
def stop_cb(self, nb_frames_xferred):
self.show_state('Stop')
print(f'Total nb_frames_xferred={nb_frames_xferred}')
self.xferred_frames.set(nb_frames_xferred)
def end_cb(self):
self.show_state('End')
self.end_event.set()
self.proc.join()
def test_acquisition_states(device, args, config):
print('*** Testing states & acquisition ***')
test = TestAcqStates(device, args.__dict__, config)
test.run()
def main():
......@@ -975,6 +1203,8 @@ def main():
help='test proc_params schema')
parser.add_argument('--acq_params',
help='JSON string with acq_params')
parser.add_argument('--uuid',
help='Acquisition UUID')
parser.add_argument('--acq_params_file',
help='file containg JSON string with acq_params')
parser.add_argument('--proc_params',
......@@ -1012,6 +1242,8 @@ def main():
test_acquisition_states(device, args, config)
input('Press Enter to close')
if __name__ == '__main__':
t = gevent.spawn(main)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment