sps_data_watch 5.99 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

"""
Usage: sps_data_watch [(-s | --session)] <name>...
       sps_data_watch (-h | --help)
Options:
    -s, --session                 Follow data from session(s).
    -h, --help                    Show this screen.
"""
import docopt
import gevent

19 20
import json

21
from bliss.common.utils import OrderedDict
22

23 24 25
try:
    import sps
except ImportError:
26 27 28 29
    try:
        from PyMca5.PyMcaIO import sps
    except ImportError:         # last chance
        from PyMca import sps
30

31
from bliss.data.node import DataNodeIterator,_get_or_create_node,is_zerod
32

33 34 35 36 37 38 39 40 41 42 43
ARRAY_NAME = "SCAN_D"

class _MaxPointReach(Exception):
    pass

class _MaxCounterReach(Exception):
    pass

class _NewCounter(Exception):
    pass

44
def copy_data(zerod,zerod_index,
45
              session_name,max_nb_counter,max_nb_points):
46 47
    data_channel = zerod
    channel_name = zerod.name
48 49 50 51 52 53 54 55 56
    previous_cnt_number = len(zerod_index)
    channel_info = zerod_index.setdefault(channel_name,
                                          {'from':0, 'index':len(zerod_index)})
    if len(zerod_index) > max_nb_counter:
        raise _MaxCounterReach()
    elif previous_cnt_number != len(zerod_index):
        raise _NewCounter()

    from_index = channel_info.get('from')
57
    data = data_channel.get(from_index,-1)
58 59 60 61 62
    nb_points = from_index + len(data)
    if nb_points > max_nb_points:
        raise _MaxPointReach()

    channel_info['from'] = nb_points
63
    if len(data):
64 65 66 67 68 69 70 71 72 73
        sps_index = channel_info['index']
        sps_data = sps.getdata(session_name,ARRAY_NAME)
        sps_data.T[sps_index][from_index:from_index+len(data)] = data
        sps.putdata(session_name,ARRAY_NAME,sps_data)

        min_index = nb_points
        for channel_name,channel_info in zerod_index.iteritems():
            point_nb = channel_info.get('from')
            if point_nb < min_index:
                min_index = point_nb
74

75
        sps.putinfo(session_name,ARRAY_NAME,json.dumps((min_index,"running")))
76 77 78 79

def new_scanno(last={"scanno": 0}):
    last["scanno"]+=1
    return last["scanno"]
80 81
 
def watch_data(scan_node,session_name):
82 83
    npoints = max(scan_node.info.get("npoints"),4096)
    title = scan_node.info.get("title") or "unknown scan"
84 85
    nbcounters = 64
    
86

87 88
    while True:
        try:
89 90
            scan_data_iterator = DataNodeIterator(scan_node)
            pubsub = scan_data_iterator.children_event_register()
91 92
            sps.create(session_name,ARRAY_NAME,npoints,nbcounters)
            zerod_index = OrderedDict()
93 94 95 96 97
            for channel in scan_data_iterator.walk(filter='channel',wait=False):
                if is_zerod(channel):
                    channel_name = channel.name
                    zerod_index.setdefault(channel_name,{'from':0, 'index':len(zerod_index)})
                    copy_data(channel,zerod_index,
98 99
                              session_name,nbcounters,npoints)

100 101 102
            allcountersdict = dict([(i,"") for i in range(nbcounters)])
            allcountersdict.update(dict(((i,name) for i,name in enumerate(zerod_index))))

103
            sps.putmetadata(session_name,ARRAY_NAME,
104
                            json.dumps((allcountersdict,
105 106
                                        {"npts":npoints,
                                         "allmotorm": "",
107
                                         "scanno" : new_scanno(),
108
                                         "datafile" : "bliss",
109 110 111
                                         "spec": session_name,
                                         "columnnames": zerod_index.keys()+[""]*(nbcounters-len(zerod_index.keys())),
                                         "columns": len(zerod_index.keys()),
112 113 114
                                         "selectedcounters" : ';'.join(zerod_index.keys())},
                                        {})))

115 116 117
            for event_type, zerod in scan_data_iterator.wait_for_event(pubsub, filter='channel'):
                if not is_zerod(zerod):
                    continue
118
                if event_type is scan_data_iterator.NEW_CHILD_EVENT:
119 120
                    copy_data(zerod, zerod_index,
                              session_name, nbcounters, npoints)
121
                elif event_type is scan_data_iterator.NEW_DATA_IN_CHANNEL_EVENT:
122 123
                    copy_data(zerod, zerod_index,
                              session_name, nbcounters, npoints)
124 125 126 127 128 129 130 131 132 133
        except _MaxCounterReach:
            nbcounters *= 2
        except _MaxPointReach:
            if npoints < 10000:
                npoints *= 32
            else:
                npoints *= 2
        except _NewCounter:
            pass
        
134 135 136 137 138 139 140 141 142 143
def watch_session(session_name):
    session_node = _get_or_create_node(session_name,node_type='session')
    if session_node is not None:
        data_iterator = DataNodeIterator(session_node)
        pubsub = data_iterator.children_event_register()
        last_scan_node = None
        for last_scan_node in data_iterator.walk(filter='scan',wait=False):
            pass
        watch_data_task = None
        if last_scan_node is not None:
144
            watch_data_task = gevent.spawn(watch_data,last_scan_node,session_name)
145
        for event_type,scan_node in data_iterator.wait_for_event(pubsub,filter='scan'):
146 147
            if event_type != data_iterator.NEW_CHILD_EVENT:
                continue
148 149
            if watch_data_task:
                watch_data_task.kill()
150
            watch_data_task = gevent.spawn(watch_data,scan_node,session_name)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
        
def main():
    try:
        # Parse arguments, use file docstring as a parameter definition
        arguments = docopt.docopt(__doc__)
        sessions_name = arguments['<name>']
    except docopt.DocoptExit as e:
        print e.message
    else:
        tasks = list()
        for session in sessions_name:
            tasks.append(gevent.spawn(watch_session,session))
        try:
            gevent.joinall(tasks)
        except KeyboardInterrupt:
            pass
if __name__ == '__main__':
   main()