Commit a2b77a8e authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

[CLIENT] Add run_acquisition with code from test_acquisition_states:

* Support callbacks to inform high-level client of the different transitions
parent 92a0e238
......@@ -12,7 +12,7 @@ from enum import Enum, auto
from functools import partial, reduce
from uuid import uuid1
from collections import namedtuple
from contextlib import contextmanager
from contextlib import contextmanager, ExitStack
import tango
import warlock
......@@ -859,6 +859,67 @@ def SafeDetectorParams(d, param_list=['acq_params', 'proc_params']):
setattr(d, param, value)
def run_acquisition(device, uuid=None, cb={}):
if uuid is None:
uuid = uuid1()
with ExitStack() as stack:
state_cb = cb.get('state', None)
if state_cb:
def state_monitor(device, end_event, cb):
follower = device.createStateFollower(end_event)
for state in follower:
cb(state)
cb(None)
end_event = gevent.event.Event()
monitor = gevent.spawn(state_monitor, device, end_event, state_cb)
def cleanup():
end_event.set()
monitor.join()
stack.callback(cleanup)
proc = stack.enter_context(device.prepareAcq(uuid))
prepare_cb = cb.get('prepare', None)
if prepare_cb:
prepare_cb(uuid, proc)
end_cb = cb.get('end', None)
if end_cb:
stack.callback(end_cb)
device.startAcq()
start_cb = cb.get('start', None)
if start_cb:
start_cb()
def stop_wait_proc():
# Stop if running
if device.getState() == Detector.State.Running:
print('Stopping running acquisition ...')
device.stopAcq()
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
while not event.ready():
event.wait(1.0)
# Signal that acq has stopped
stop_cb = cb.get('stop', None)
if stop_cb:
stop_cb(device.nb_frames_xferred)
# Wait for the end of the processing
while not proc.isFinished():
gevent.sleep(0.1)
stack.callback(stop_wait_proc)
event = device.getNotStateEvent(Detector.State.Running)
while not event.ready():
xfer_cb = cb.get('xfer', None)
if xfer_cb:
xfer_cb(device.nb_frames_xferred)
event.wait(1.0)
def pretty_print_json(json_str, desc=None):
if desc:
print(f'{desc}:')
......@@ -988,71 +1049,102 @@ def test_proc_params_schema(d):
print(f'recv_dev.proc_params={recv_dev.proc_params}')
def test_acquisition_states(device, args, config):
print('*** Testing states & acquisition ***')
print(f'State={device.getState()}')
class TestAcqStates:
for name, val in config.items():
param = getattr(device, name)
param.update(config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
def __init__(self, device, args, config):
self.device = device
self.args = args
self.config = config
def state_monitor(device, end_event):
follower = device.createStateFollower(end_event)
for state in follower:
print(f'Monitor: state={state}')
print('Monitor finished!')
self.start_event = gevent.event.Event()
self.end_event = gevent.event.Event()
self.xferred_frames = gevent.event.AsyncResult()
self.uuid = self.args.get('uuid', None)
self.proc = None
end_event = gevent.event.Event()
monitor = gevent.spawn(state_monitor, device, end_event)
def run(self):
self.show_state('Run')
uuid = uuid1()
with device.prepareAcq(uuid) as proc:
print(f'UUID={uuid}')
print(f'State={device.getState()}')
print(f'Counters={proc.counters}')
for name in self.config:
param = getattr(self.device, name)
param.update(self.config[name])
pretty_print_json(json.dumps(param.getModelDict()), name)
cb = dict(state=self.state_cb,
prepare=self.prepare_cb,
start=self.start_cb,
xfer=self.xfer_cb,
stop=self.stop_cb,
end=self.end_cb)
return run_acquisition(self.device, self.uuid, cb)
def show_state(self, name):
print(f'{name + ":":8s} state={self.device.getState()}')
def state_cb(self, state):
if state is not None:
print(f'Monitor: state={state}')
else:
print('\nMonitor finished!')
def proc_loop(self, proc):
self.start_event.wait()
roi_counters = []
device.startAcq()
print(f'State={device.getState()}')
event = device.getNotStateEvent(Detector.State.Running)
# Simulate a Stop when about half of the frames are in the pipeline
# only if nb_frames > 5
def ready_for_stop():
ok = False
if args.stop_after_nb_frames:
ok = device.nb_frames_xferred >= args.stop_after_nb_frames
return event.ready() or ok
while not ready_for_stop():
print(f'nb_frames_xferred={device.nb_frames_xferred}')
while not (self.end_event.ready() or proc.isFinished()):
print(f'Counters={proc.counters}')
# ROI counter acquisition 5 at a time
rc = proc.popRoiCounters(5)
if rc:
roi_counters.append(rc)
gevent.sleep(1)
print(f'Final counters={proc.counters}')
# Finish ROI counter download
while True:
rc = proc.popRoiCounters(5)
if not rc:
break
roi_counters.append(rc)
event.wait(1.0)
# If acquisition is still running, stop it
if device.getState() == Detector.State.Running:
print(f'##### STOP #####')
device.stopAcq()
print(f'State={device.getState()}')
print(f'nb_frames_xferred={device.nb_frames_xferred}')
print(f'Counters={proc.counters}')
# Wait for the end of the processing
while not proc.isFinished():
print(f'Counters={proc.counters}')
gevent.sleep(0.1)
print(f'Counters={proc.counters}')
while rc := proc.popRoiCounters(5):
# ROI counter acquisition
roi_counters.append(rc)
gevent.sleep(0.1)
print(f'ROI Counters={roi_counters}')
print(f'Frame={proc.getFrame(device.nb_frames_xferred - 1)}')
if device.getState() == Detector.State.Closing:
event = device.getStateEvent(Detector.State.Idle)
input('Press Enter to close')
nb_frames_xferred = self.xferred_frames.get()
print(f'Frame={proc.getFrame(nb_frames_xferred - 1)}')
def prepare_cb(self, uuid, proc):
self.uuid = uuid
print(f'UUID={uuid}')
self.show_state('Prepare')
self.proc = gevent.spawn(self.proc_loop, proc)
end_event.set()
monitor.join()
def start_cb(self):
self.show_state('Start')
self.start_event.set()
def xfer_cb(self, nb_frames_xferred):
print(f'Curr. nb_frames_xferred={nb_frames_xferred}')
# Simulate an asynchronous Stop, if requested
stop_frames = self.args.get('stop_after_nb_frames', 0)
if (stop_frames and nb_frames_xferred >= stop_frames and
self.device.getState() == Detector.State.Running):
print(f'##### STOP #####')
self.device.stopAcq()
def stop_cb(self, nb_frames_xferred):
self.show_state('Stop')
print(f'Total nb_frames_xferred={nb_frames_xferred}')
self.xferred_frames.set(nb_frames_xferred)
def end_cb(self):
self.show_state('End')
self.end_event.set()
self.proc.join()
def test_acquisition_states(device, args, config):
print('*** Testing states & acquisition ***')
test = TestAcqStates(device, args.__dict__, config)
test.run()
def main():
......@@ -1066,6 +1158,8 @@ def main():
help='test proc_params schema')
parser.add_argument('--acq_params',
help='JSON string with acq_params')
parser.add_argument('--uuid',
help='Acquisition UUID')
parser.add_argument('--acq_params_file',
help='file containg JSON string with acq_params')
parser.add_argument('--proc_params',
......@@ -1103,6 +1197,8 @@ def main():
test_acquisition_states(device, args, config)
input('Press Enter to close')
if __name__ == '__main__':
t = gevent.spawn(main)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment