counter.py 8.29 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
Matias Guijarro's avatar
Matias Guijarro committed
10
11
import gevent
from gevent import event
12
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
13
from ..chain import AcquisitionDevice, AcquisitionChannel
14
from bliss.common.measurement import GroupedReadMixin, Counter
15
from bliss.common.utils import all_equal
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
        first_counter = list_iter.next()
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
                raise RuntimeError("Counters %s doesn't belong to the same group" %\
                                   counters_or_groupreadhandler)
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
31

32
class BaseCounterAcquisitionDevice(AcquisitionDevice):
33
    def __init__(self, counter, count_time, **keys):
34
        npoints = max(1, keys.pop('npoints', 1))
35
        prepare_once = keys.pop('prepare_once', True)
36
37
        start_once = keys.pop('start_once', npoints > 1)

38
        AcquisitionDevice.__init__(self, counter, counter.name,
39
40
41
42
43
44
45
46
47
                                   npoints=npoints,
                                   trigger_type=AcquisitionDevice.SOFTWARE,
                                   prepare_once=prepare_once,
                                   start_once=start_once)

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

48
        if not isinstance(counter, GroupedReadMixin):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
49
            self.channels.append(AcquisitionChannel(
50
                counter.name, counter.dtype, counter.shape))
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
51

52
53
54
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
55

56
57
58
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
59

60
61
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
62
63
64
            # Ignore if the counter is already the provided device
            if self.device == counter:
                return
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
65
66
            raise RuntimeError(
                "Cannot add counter to single-read counter acquisition device")
67
68

        self.__grouped_read_counters_list.append(counter)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
69
        self.channels.append(AcquisitionChannel(
Vincent Michel's avatar
Vincent Michel committed
70
            counter.name, counter.dtype, counter.shape))
71
72

    def _emit_new_data(self, data):
73
        self.channels.update_from_iterable(data)
74
75
76


class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
77
    SIMPLE_AVERAGE, TIME_AVERAGE, INTEGRATE = range(3)
78

79
80
    def __init__(self, counters_or_groupreadhandler,
                 count_time=None, mode=SIMPLE_AVERAGE, **keys):
81
82
83
        """
        Helper to manage acquisition of a sampling counter.

84
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or
85
        a group_read_handler
86
87
88
89
90
91
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
92
93
94
95
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
96
        """
97
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
98
        BaseCounterAcquisitionDevice.__init__(
99
            self, reader, count_time, **keys)
100

101
102
103
104
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
        self._ready_flag = True
105
106
        self.__mode = mode

107
108
109
        for cnt in counters:
            self.add_counter(cnt)

110
111
112
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
113

114
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
115
    def mode(self, value):
116
        self.__mode = value
117
118

    def prepare(self):
119
120
        self.device.prepare(*self.grouped_read_counters)

121
    def start(self):
122
123
124
125
126
        self._nb_acq_points = 0
        self._stop_flag = False
        self._ready_flag = True
        self._event.clear()

127
        self.device.start(*self.grouped_read_counters)
128
129

    def stop(self):
130
131
        self.device.stop(*self.grouped_read_counters)

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
        while not self._ready_flag:
            self._ready_event.wait()
            self._ready_event.clear()

    def reading(self):
149
150
        while not self._stop_flag and \
              self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
151
            # trigger wait
152
153
154
155
            self._event.wait()
            self._event.clear()
            self._ready_flag = False
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
156
157
158
159
            if trig_time is None:
                continue
            if self._stop_flag:
                break
160
161
162

            nb_read = 0
            acc_read_time = 0
163
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
164
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
165
            # Counter integration loop
166
167
            while not self._stop_flag:
                start_read = time.time()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
168
169
                read_value = numpy.array(self.device.read(
                    *self.grouped_read_counters), dtype=numpy.double)
170
                end_read = time.time()
171
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
172

173
                if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
174
                    acc_value += read_value * (end_read - start_read)
175
176
177
                else:
                    acc_value += read_value

178
179
180
181
182
183
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
184
                gevent.sleep(0)  # to be able to kill the task
185
            self._nb_acq_points += 1
Matias Guijarro's avatar
Matias Guijarro committed
186
            if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
187
                data = acc_value / acc_read_time
188
189
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
190

Matias Guijarro's avatar
Matias Guijarro committed
191
            if self.__mode == SamplingCounterAcquisitionDevice.INTEGRATE:
192
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
193

194
195
            self._emit_new_data(data)

196
197
            self._ready_flag = True
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
198

199

200
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
201
202
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
203
        BaseCounterAcquisitionDevice.__init__(
204
205
206
            self, reader, count_time, **keys)
        for cnt in counters:
            self.add_counter(cnt)
207
208

    def prepare(self):
209
        self.device.prepare(*self.grouped_read_counters)
210
211

    def start(self):
212
213
214
        self._nb_acq_points = 0
        self._stop_flag = False

215
        self.device.start(*self.grouped_read_counters)
216
217

    def stop(self):
218
        self.device.stop(*self.grouped_read_counters)
219
220
221
222
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
223

224
225
226
227
228
229
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

230
    def reading(self):
231
        from_index = 0
232
        while self._nb_acq_points < self.npoints and not self._stop_flag:
233
            data = self._read_data(from_index)
234
            if not all_equal([len(d) for d in data]):
235
                raise RuntimeError("Read data can't have different sizes")
236
            if len(data[0]) > 0:
237
238
239
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
240
241
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
242
                gevent.sleep(self.count_time / 2.)