External writer END_SCAN handling
@pithan There is an issue with the external writer logic: the writer associated to a scan is killed upon END_SCAN event but not all nodes may have been published yet (or at least their NEW_NODE event not received/processed yet). As an example: save the code below in test.py
Start scan loop:
BEACON_HOST=... python test.py session_name
Run listener:
BEACON_HOST=... python test.py session_name --listen --delay=0.001
The listener process will end when some nodes are not detected (NEW_NODE event) before END_SCAN. Increase the delay until it fails (or have more channels, I tested with 100 channels). The solution would be that the END_SCAN event is picked up by the scan listener, not the session listener. How can I do that?
Example script (test.py):
import argparse
import gevent
from bliss import global_map
from bliss.config import static
from bliss.data.node import get_session_node
from bliss.common.standard import *
def alldetectors(env_dict):
"""
Get all detectors and their expected nodes
"""
detectors = set()
nodenames = set()
for ctr in global_map.get_counters_iter():
defaultname = env_dict.get(ctr.fullname.split(':')[-1], None)
detectors.add(env_dict.get(ctr.controller.name, defaultname))
nodenames.add(ctr.fullname)
return detectors, nodenames
def listenscan(scannode, nodes, event_proctime):
"""
Process scan events: list all new nodes
"""
for event_type, node in scannode.iterator.walk_events():
if event_type.name == "NEW_NODE":
if node.fullname:
nodes.add(node.fullname)
gevent.sleep(event_proctime)
def listen(session_name, event_proctime):
"""
Listen to session events
"""
session_node = get_session_node(session_name)
listeners = {}
print('Listening ...')
for event_type, node in session_node.iterator.walk_on_new_events(filter='scan'):
if event_type.name == "NEW_NODE":
# Scan starts
print('Scan {} is started'.format(node.name))
nodes = set()
listeners[node.db_name] = gevent.spawn(listenscan, node, nodes, event_proctime), nodes
elif event_type.name == "END_SCAN":
# Scan ends
print('Scan {} is finished'.format(node.name))
tpl = listeners.pop(node.db_name, None)
if not tpl:
continue
listener, nodes = tpl
listener.kill()
listener.join()
missing = expectednodes - nodes
if missing:
print('{}/{} missing channels: {}'
.format(len(missing), len(expectednodes), missing))
break
else:
print('OK')
for listener, nodes in listeners.values():
listener.kill()
listener.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('session_name', type=str)
parser.add_argument('--delay', type=float, default=0.1)
parser.add_argument('--listen', action='store_true')
args = parser.parse_args()
session_name = args.session_name
session = static.get_config().get(session_name)
session.setup()
env_dict = session.env_dict
detectors, expectednodes = alldetectors(env_dict)
print('Number of channels: {}'.format(len(expectednodes)))
if args.listen:
listen(session_name, args.delay)
else:
while True:
scan = ct(0.1, *detectors, run=True, save=False)
print(scan)