counter.py 8.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
Matias Guijarro's avatar
Matias Guijarro committed
10
11
import gevent
from gevent import event
12
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
13
from ..chain import AcquisitionDevice, AcquisitionChannel
14
from bliss.common.measurement import GroupedReadMixin, Counter
15
from bliss.common.utils import all_equal
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
        first_counter = list_iter.next()
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
                raise RuntimeError("Counters %s doesn't belong to the same group" %\
                                   counters_or_groupreadhandler)
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
31

32
class BaseCounterAcquisitionDevice(AcquisitionDevice):
33
    def __init__(self, counter, count_time, **keys):
34
        npoints = max(1, keys.pop('npoints', 1))
35
        prepare_once = keys.pop('prepare_once', True)
36
37
        start_once = keys.pop('start_once', npoints > 1)

38
        AcquisitionDevice.__init__(self, counter, counter.name,
39
40
41
42
43
44
45
46
47
                                   npoints=npoints,
                                   trigger_type=AcquisitionDevice.SOFTWARE,
                                   prepare_once=prepare_once,
                                   start_once=start_once)

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

48
        if not isinstance(counter, GroupedReadMixin):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
49
            self.channels.append(AcquisitionChannel(
50
                counter.name, numpy.double, ()))
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
51

52
53
54
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
55

56
57
58
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
59

60
61
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
62
63
            raise RuntimeError(
                "Cannot add counter to single-read counter acquisition device")
64
65

        self.__grouped_read_counters_list.append(counter)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
66
        self.channels.append(AcquisitionChannel(
67
            counter.name, numpy.double, ()))
68
69

    def _emit_new_data(self, data):
70
        self.channels.update_from_iterable(data)
71
72
73


class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
74
    SIMPLE_AVERAGE, TIME_AVERAGE, INTEGRATE = range(3)
75

76
77
    def __init__(self, counters_or_groupreadhandler,
                 count_time=None, mode=SIMPLE_AVERAGE, **keys):
78
79
80
        """
        Helper to manage acquisition of a sampling counter.

81
82
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or 
        a group_read_handler
83
84
85
86
87
88
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
89
90
91
92
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
93
        """
94
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
95
        BaseCounterAcquisitionDevice.__init__(
96
            self, reader, count_time, **keys)
97

98
99
100
101
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
        self._ready_flag = True
102
103
        self.__mode = mode

104
105
106
        for cnt in counters:
            self.add_counter(cnt)

107
108
109
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
110

111
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
112
    def mode(self, value):
113
        self.__mode = value
114
115

    def prepare(self):
116
117
        self.device.prepare(*self.grouped_read_counters)

118
    def start(self):
119
120
121
122
123
        self._nb_acq_points = 0
        self._stop_flag = False
        self._ready_flag = True
        self._event.clear()

124
        self.device.start(*self.grouped_read_counters)
125
126

    def stop(self):
127
128
        self.device.stop(*self.grouped_read_counters)

129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
        while not self._ready_flag:
            self._ready_event.wait()
            self._ready_event.clear()

    def reading(self):
        while self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
147
            # trigger wait
148
149
150
151
            self._event.wait()
            self._event.clear()
            self._ready_flag = False
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
152
153
154
155
            if trig_time is None:
                continue
            if self._stop_flag:
                break
156
157
158

            nb_read = 0
            acc_read_time = 0
159
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
160
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
161
            # Counter integration loop
162
163
            while not self._stop_flag:
                start_read = time.time()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
164
165
                read_value = numpy.array(self.device.read(
                    *self.grouped_read_counters), dtype=numpy.double)
166
                end_read = time.time()
167
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
168

169
                if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
170
                    acc_value += read_value * (end_read - start_read)
171
172
173
                else:
                    acc_value += read_value

174
175
176
177
178
179
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
180
                gevent.sleep(0)  # to be able to kill the task
181
            self._nb_acq_points += 1
Matias Guijarro's avatar
Matias Guijarro committed
182
            if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
183
                data = acc_value / acc_read_time
184
185
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
186

Matias Guijarro's avatar
Matias Guijarro committed
187
            if self.__mode == SamplingCounterAcquisitionDevice.INTEGRATE:
188
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
189

190
191
            self._emit_new_data(data)

192
193
            self._ready_flag = True
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
194

195

196
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
197
198
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
199
        BaseCounterAcquisitionDevice.__init__(
200
201
202
            self, reader, count_time, **keys)
        for cnt in counters:
            self.add_counter(cnt)
203
204

    def prepare(self):
205
        self.device.prepare(*self.grouped_read_counters)
206
207

    def start(self):
208
209
210
        self._nb_acq_points = 0
        self._stop_flag = False

211
        self.device.start(*self.grouped_read_counters)
212
213

    def stop(self):
214
        self.device.stop(*self.grouped_read_counters)
215
216
217
218
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
219

220
221
222
223
224
225
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

226
    def reading(self):
227
        from_index = 0
228
        while self._nb_acq_points < self.npoints and not self._stop_flag:
229
            data = self._read_data(from_index)
230
            if not all_equal([len(d) for d in data]):
231
                raise RuntimeError("Read data can't have different sizes")
232
            if len(data[0]) > 0:
233
234
235
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
236
237
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
238
                gevent.sleep(self.count_time / 2.)