Commit c7a54579 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'id15' into 'master'

Id15

See merge request !429
parents 8e25c8aa 588e3235
......@@ -8,6 +8,7 @@
from ..chain import AcquisitionDevice, AcquisitionChannel
from bliss.common.event import dispatcher
import gevent
from gevent import event
import numpy
class MusstAcquisitionDevice(AcquisitionDevice):
......@@ -32,11 +33,13 @@ class MusstAcquisitionDevice(AcquisitionDevice):
self.program_template_replacement = dict()
self.vars = vars if vars is not None else dict()
store_list = store_list if store_list is not None else list()
self.channels.extend((AcquisitionChannel(name,numpy.uint32, (1,)) for name in store_list))
self.channels.extend((AcquisitionChannel(name,numpy.int32, (1,)) for name in store_list))
self.next_vars = None
self._iter_index = 0
self._ready_flag = True
self._ready_event = event.Event()
def __iter__(self):
if isinstance(self.vars,(list,tuple)):
vars_iter = iter(self.vars)
......@@ -58,24 +61,34 @@ class MusstAcquisitionDevice(AcquisitionDevice):
for var_name, value in self.next_vars.iteritems():
self.musst.putget("VAR %s %s" % (var_name,value))
self._ready_flag = True
def start(self):
self.musst.run()
def stop(self):
self.musst.ABORT
def wait_ready(self):
while not self._ready_flag:
self._ready_event.wait()
self._ready_event.clear()
def reading(self):
last_read_event = 0
while self.musst.STATE == self.musst.RUN_STATE:
new_read_event = self._send_data(last_read_event)
if new_read_event != last_read_event:
last_read_event = new_read_event
gevent.sleep(100e-6) # be able to ABORT the musst card
else:
gevent.sleep(10e-3) # relax a little bit.
self._send_data(last_read_event) # final send
try:
while self.musst.STATE == self.musst.RUN_STATE:
new_read_event = self._send_data(last_read_event)
if new_read_event != last_read_event:
last_read_event = new_read_event
gevent.sleep(100e-6) # be able to ABORT the musst card
else:
gevent.sleep(10e-3) # relax a little bit.
self._send_data(last_read_event) # final send
finally:
self._ready_flag = True
self._ready_event.set()
def _send_data(self,last_read_event):
data = self.musst.get_data(len(self.channels),last_read_event)
if data.size > 0:
......
......@@ -259,7 +259,7 @@ class AcquisitionDevice(object):
return True
class AcquisitionChainIter(object):
def __init__(self,acquisition_chain,parallel_prepare = False):
def __init__(self,acquisition_chain,parallel_prepare = True):
self.__sequence_index = -1
self._parallel_prepare = parallel_prepare
self.__acquisition_chain = weakref.proxy(acquisition_chain)
......@@ -306,9 +306,11 @@ class AcquisitionChainIter(object):
self._execute("_start")
def stop(self):
self._execute("stop", master_to_slave=True)
self._execute("stop", master_to_slave=True,wait_all_tasks=True)
preset_tasks = [gevent.spawn(preset.stop) for preset in self.__acquisition_chain._presets_list]
gevent.joinall(preset_tasks) # wait to call all stop on preset
gevent.joinall(preset_tasks, raise_error=True)
def next(self):
......@@ -331,7 +333,8 @@ class AcquisitionChainIter(object):
return self
def _execute(self, func_name,
master_to_slave=False, wait_between_levels=True):
master_to_slave=False, wait_between_levels=True,
wait_all_tasks=False):
tasks = list()
prev_level = None
......@@ -350,6 +353,11 @@ class AcquisitionChainIter(object):
prev_level = level
func = getattr(dev, func_name)
tasks.append(gevent.spawn(func))
# ensure that all tasks are executed
# (i.e: don't raise the first exception on stop)
if wait_all_tasks:
gevent.joinall(tasks)
gevent.joinall(tasks, raise_error=True)
class AcquisitionChain(object):
......
......@@ -39,7 +39,7 @@ class StepScanDataWatch(object):
self._channel_end_nb = 0
self._init_done = False
def __call__(self,data_events,nodes):
def __call__(self,data_events,nodes,info):
if self._init_done is False:
for acq_device,data_node in nodes.iteritems():
if data_node.type() == 'zerod':
......@@ -190,6 +190,8 @@ class Container(object):
self.node = _get_or_create_node(self.__name, "container", parent=self.root_node)
class Scan(object):
IDLE_STATE,PREPARE_STATE,START_STATE,STOP_STATE = range(4)
def __init__(self,chain, name=None,
parent=None, scan_info=None, writer=None,
data_watch_callback=None):
......@@ -208,6 +210,10 @@ class Scan(object):
the callback will get:
- data_event : a dict with Acq(Device/Master) as key and a set of signal as values
- nodes : a dict with Acq(Device/Master) as key and the associated data node as value
- info : dictionnary which contains the current scan state...
if the callback is a class and have a method **on_state**, it will be called on each
scan transition state. The return of this method will activate/deactivate
the calling of the callback during this stage.
"""
if parent is None:
self.root_node = None
......@@ -258,6 +264,7 @@ class Scan(object):
self._acq_chain = chain
self._scan_info = scan_info if scan_info is not None else dict()
self._scan_info['node_name'] = self._node.db_name()
self._state = self.IDLE_STATE
@property
def name(self):
......@@ -300,7 +307,8 @@ class Scan(object):
while self._data_watch_running and not self._data_watch_task.ready():
self._data_watch_callback_done.wait()
self._data_watch_callback_done.clear()
self._data_watch_callback(data_events,self.nodes)
self._data_watch_callback(data_events,self.nodes,
{'state':self._state})
else:
self._data_watch_callback_event.set()
......@@ -330,16 +338,52 @@ class Scan(object):
self._node.set_ttl()
def run(self):
class _Wakeup(object):
def __init__(self,cnt,active):
self.__active = active
self.__task = None
self.__cnt = weakref.proxy(cnt)
def __enter__(self):
if self.__active:
self.__task = gevent.spawn(self._timer)
def __exit__(self,*args):
if self.__task is not None:
gevent.kill(self.__task)
def _timer(self):
try:
while True:
self.__cnt._data_watch_callback_event.set()
gevent.sleep(0.1)
except ReferenceError:
pass
if hasattr(self._data_watch_callback,'on_state'):
call_on_prepare = self._data_watch_callback.on_state(self.PREPARE_STATE)
call_on_stop = self._data_watch_callback.on_state(self.STOP_STATE)
else:
call_on_prepare,call_on_stop = False,False
try:
for i in self.acq_chain:
i.prepare(self,self.scan_info)
self._state = self.PREPARE_STATE
with _Wakeup(self,call_on_prepare):
i.prepare(self,self.scan_info)
self._state = self.START_STATE
i.start()
except:
i.stop()
self.stop()
self._state = self.STOP_STATE
with _Wakeup(self,call_on_stop):
i.stop()
self.stop()
raise
else:
self._state = self.STOP_STATE
i.stop()
finally:
self._state = self.IDLE_STATE
@staticmethod
def _data_watch(scan,event,event_done):
......@@ -349,9 +393,9 @@ class Scan(object):
try:
data_events = scan._data_events
scan._data_events = dict()
if not data_events : continue
scan._data_watch_running = True
scan._data_watch_callback(data_events,scan.nodes)
scan._data_watch_callback(data_events,scan.nodes,
{'state':scan._state})
scan._data_watch_running = False
except ReferenceError:
break
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment