counter.py 9.39 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
10
import warnings
Matias Guijarro's avatar
Matias Guijarro committed
11
12
import gevent
from gevent import event
13
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
14
from ..chain import AcquisitionDevice, AcquisitionChannel
15
from bliss.common.measurement import GroupedReadMixin, Counter
16
from bliss.common.utils import all_equal
17

18

19
20
21
22
23
24
25
26
27
28
29
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
        first_counter = list_iter.next()
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
30
31
32
33
                raise RuntimeError(
                    "Counters %s doesn't belong to the same group"
                    % counters_or_groupreadhandler
                )
34
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
35

36

37
class BaseCounterAcquisitionDevice(AcquisitionDevice):
38
    def __init__(self, counter, count_time, **keys):
39
40
41
        npoints = keys.pop("npoints")
        prepare_once = keys.pop("prepare_once")
        start_once = keys.pop("start_once")
42
43
44
45
46
47
48
49
50
        AcquisitionDevice.__init__(
            self,
            counter,
            counter.name,
            npoints=npoints,
            trigger_type=AcquisitionDevice.SOFTWARE,
            prepare_once=prepare_once,
            start_once=start_once,
        )
51
52
53
54
55

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

56
        if not isinstance(counter, GroupedReadMixin):
57
58
59
            self.channels.append(
                AcquisitionChannel(counter.name, counter.dtype, counter.shape)
            )
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
60

61
62
63
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
64

65
66
67
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
68

69
70
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
71
72
73
            # Ignore if the counter is already the provided device
            if self.device == counter:
                return
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
74
            raise RuntimeError(
75
76
                "Cannot add counter to single-read counter acquisition device"
            )
77
78

        self.__grouped_read_counters_list.append(counter)
79
80
81
        self.channels.append(
            AcquisitionChannel(counter.name, counter.dtype, counter.shape)
        )
82
83

    def _emit_new_data(self, data):
84
        self.channels.update_from_iterable(data)
85
86
87


class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
88
    SIMPLE_AVERAGE, TIME_AVERAGE, INTEGRATE = range(3)
89

90
91
92
    def __init__(
        self, counters_or_groupreadhandler, count_time=None, mode=SIMPLE_AVERAGE, **keys
    ):
93
94
95
        """
        Helper to manage acquisition of a sampling counter.

96
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or
97
        a group_read_handler
98
99
100
101
102
103
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
104
105
106
107
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
108
        """
109
110
111
112
        npoints = max(1, keys.pop("npoints", 1))
        prepare_once = keys.pop("prepare_once", True)
        start_once = keys.pop("start_once", npoints > 1)

113
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
114
115
116
117
118
119
120
121
122
        BaseCounterAcquisitionDevice.__init__(
            self,
            reader,
            count_time,
            npoints=npoints,
            prepare_once=prepare_once,
            start_once=start_once,
            **keys
        )
123

124
125
126
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
127
        self._ready_event.set()
128
129
        self.__mode = mode

130
131
132
        for cnt in counters:
            self.add_counter(cnt)

133
134
135
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
136

137
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
138
    def mode(self, value):
139
        self.__mode = value
140
141

    def prepare(self):
142
143
        self.device.prepare(*self.grouped_read_counters)

144
    def start(self):
145
146
        self._nb_acq_points = 0
        self._stop_flag = False
147
        self._ready_event.set()
148
149
        self._event.clear()

150
        self.device.start(*self.grouped_read_counters)
151
152

    def stop(self):
153
154
        self.device.stop(*self.grouped_read_counters)

155
156
157
158
159
160
161
162
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

163
164
165
    def trigger_ready(self):
        return self._ready_event.is_set()

166
167
168
169
    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
170
        self._ready_event.wait()
171
172

    def reading(self):
173
        while not self._stop_flag and self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
174
            # trigger wait
175
176
            self._event.wait()
            self._event.clear()
177
            self._ready_event.clear()
178
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
179
180
181
182
            if trig_time is None:
                continue
            if self._stop_flag:
                break
183
184
185

            nb_read = 0
            acc_read_time = 0
186
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
187
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
188
            # Counter integration loop
189
190
            while not self._stop_flag:
                start_read = time.time()
191
192
193
                read_value = numpy.array(
                    self.device.read(*self.grouped_read_counters), dtype=numpy.double
                )
194
                end_read = time.time()
195
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
196

197
                if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
198
                    acc_value += read_value * (end_read - start_read)
199
200
201
                else:
                    acc_value += read_value

202
203
204
205
206
207
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
208
                gevent.sleep(0)  # to be able to kill the task
209
            self._nb_acq_points += 1
Matias Guijarro's avatar
Matias Guijarro committed
210
            if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
211
                data = acc_value / acc_read_time
212
213
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
214

Matias Guijarro's avatar
Matias Guijarro committed
215
            if self.__mode == SamplingCounterAcquisitionDevice.INTEGRATE:
216
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
217

218
219
            self._emit_new_data(data)

220
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
221

222

223
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
224
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
225
226
227
228
229
230
231
232
233
234
235
236
237

        if any(
            filter(
                None,
                (keys.pop(k, None) for k in ("npoints", "prepare_once", "start_once")),
            )
        ):
            warnings.warn(
                "IntegratingCounterAcquisitionDevice: npoints, \
                          prepare_once or start_once flags will be overwritten\
                          by master controller"
            )

238
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
239
240
241
242
243
244
245
246
247
        BaseCounterAcquisitionDevice.__init__(
            self,
            reader,
            count_time,
            npoints=None,
            prepare_once=None,
            start_once=None,
            **keys
        )
248
249
        for cnt in counters:
            self.add_counter(cnt)
250

251
252
253
254
255
256
257
    @AcquisitionDevice.parent.setter
    def parent(self, p):
        self._AcquisitionDevice__parent = p
        self._AcquisitionDevice__npoints = p.npoints
        self._AcquisitionDevice__prepare_once = p.prepare_once
        self._AcquisitionDevice__start_once = p.start_once

258
    def prepare(self):
259
        self.device.prepare(*self.grouped_read_counters)
260
261

    def start(self):
262
263
264
        self._nb_acq_points = 0
        self._stop_flag = False

265
        self.device.start(*self.grouped_read_counters)
266
267

    def stop(self):
268
        self.device.stop(*self.grouped_read_counters)
269
270
271
272
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
273

274
275
276
277
278
279
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

280
    def reading(self):
281
        from_index = 0
282
283
284
        while (
            not self.npoints or self._nb_acq_points < self.npoints
        ) and not self._stop_flag:
285
            data = self._read_data(from_index)
286
            if not all_equal([len(d) for d in data]):
287
                raise RuntimeError("Read data can't have different sizes")
288
            if len(data[0]) > 0:
289
290
291
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
292
293
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
294
                gevent.sleep(self.count_time / 2.)