counter.py 9.68 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
10
import warnings
11
import enum
Matias Guijarro's avatar
Matias Guijarro committed
12
13
import gevent
from gevent import event
14
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
15
from ..chain import AcquisitionDevice, AcquisitionChannel
16
from bliss.common.measurement import GroupedReadMixin, Counter
17
from bliss.common.utils import all_equal
18

19

20
21
22
23
24
25
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
Vincent Michel's avatar
Vincent Michel committed
26
        first_counter = next(list_iter)
27
28
29
30
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
31
32
33
34
                raise RuntimeError(
                    "Counters %s doesn't belong to the same group"
                    % counters_or_groupreadhandler
                )
35
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
36

37

38
class BaseCounterAcquisitionDevice(AcquisitionDevice):
39
    def __init__(self, counter, count_time, **keys):
40
41
42
        npoints = keys.pop("npoints")
        prepare_once = keys.pop("prepare_once")
        start_once = keys.pop("start_once")
43
44
45
46
47
48
49
50
51
        AcquisitionDevice.__init__(
            self,
            counter,
            counter.name,
            npoints=npoints,
            trigger_type=AcquisitionDevice.SOFTWARE,
            prepare_once=prepare_once,
            start_once=start_once,
        )
52
53
54
55
56

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

57
        if not isinstance(counter, GroupedReadMixin):
58
59
60
            self.channels.append(
                AcquisitionChannel(counter.name, counter.dtype, counter.shape)
            )
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
61

62
63
64
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
65

66
67
68
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
69

70
71
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
72
73
74
            # Ignore if the counter is already the provided device
            if self.device == counter:
                return
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
75
            raise RuntimeError(
76
77
                "Cannot add counter to single-read counter acquisition device"
            )
78
79

        self.__grouped_read_counters_list.append(counter)
80
81
82
        self.channels.append(
            AcquisitionChannel(counter.name, counter.dtype, counter.shape)
        )
83
84

    def _emit_new_data(self, data):
85
        self.channels.update_from_iterable(data)
86
87


88
89
90
91
92
93
94
95
@enum.unique
class SamplingMode(enum.IntEnum):
    """SamplingCounterAcquisitionDevice Mode Class """

    SIMPLE_AVERAGE = 0
    TIME_AVERAGE = 1
    INTEGRATE = 2

96

97
class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
98
    def __init__(
99
100
101
102
103
        self,
        counters_or_groupreadhandler,
        count_time=None,
        mode=SamplingMode.SIMPLE_AVERAGE,
        **keys
104
    ):
105
106
107
        """
        Helper to manage acquisition of a sampling counter.

108
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or
109
        a group_read_handler
110
111
112
113
114
115
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
116
117
118
119
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
120
        """
121
122
123
124
        npoints = max(1, keys.pop("npoints", 1))
        prepare_once = keys.pop("prepare_once", True)
        start_once = keys.pop("start_once", npoints > 1)

125
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
126
127
128
129
130
131
132
133
134
        BaseCounterAcquisitionDevice.__init__(
            self,
            reader,
            count_time,
            npoints=npoints,
            prepare_once=prepare_once,
            start_once=start_once,
            **keys
        )
135

136
137
138
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
139
        self._ready_event.set()
140
141
        self.__mode = mode

142
143
144
        for cnt in counters:
            self.add_counter(cnt)

145
146
147
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
148

149
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
150
    def mode(self, value):
151
152
153
154
155
156
157
        try:
            self.__mode = SamplingMode[value]
        except KeyError:
            raise ValueError(
                "Invalid mode '%s', the mode must be in %s"
                % (value, list(SamplingMode.__members__.keys()))
            )
158
159

    def prepare(self):
160
161
        self.device.prepare(*self.grouped_read_counters)

162
    def start(self):
163
164
        self._nb_acq_points = 0
        self._stop_flag = False
165
        self._ready_event.set()
166
167
        self._event.clear()

168
        self.device.start(*self.grouped_read_counters)
169
170

    def stop(self):
171
172
        self.device.stop(*self.grouped_read_counters)

173
174
175
176
177
178
179
180
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

181
182
183
    def trigger_ready(self):
        return self._ready_event.is_set()

184
185
186
187
    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
188
        self._ready_event.wait()
189
190

    def reading(self):
191
        while not self._stop_flag and self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
192
            # trigger wait
193
194
            self._event.wait()
            self._event.clear()
195
            self._ready_event.clear()
196
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
197
198
199
200
            if trig_time is None:
                continue
            if self._stop_flag:
                break
201
202
203

            nb_read = 0
            acc_read_time = 0
204
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
205
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
206
            # Counter integration loop
207
208
            while not self._stop_flag:
                start_read = time.time()
209
210
211
                read_value = numpy.array(
                    self.device.read(*self.grouped_read_counters), dtype=numpy.double
                )
212
                end_read = time.time()
213
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
214

215
                if self.__mode == SamplingMode.TIME_AVERAGE:
216
                    acc_value += read_value * (end_read - start_read)
217
218
219
                else:
                    acc_value += read_value

220
221
222
223
224
225
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
226
                gevent.sleep(0)  # to be able to kill the task
227
            self._nb_acq_points += 1
228
            if self.__mode == SamplingMode.TIME_AVERAGE:
229
                data = acc_value / acc_read_time
230
231
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
232

233
            if self.__mode == SamplingMode.INTEGRATE:
234
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
235

236
237
            self._emit_new_data(data)

238
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
239

240

241
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
242
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
243
244
245
246
247
248
249
250

        if any(
            filter(
                None,
                (keys.pop(k, None) for k in ("npoints", "prepare_once", "start_once")),
            )
        ):
            warnings.warn(
251
252
                "IntegratingCounterAcquisitionDevice: npoints, prepare_once or "
                "start_once flags will be overwritten by master controller"
253
254
            )

255
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
256
257
258
259
260
261
262
263
264
        BaseCounterAcquisitionDevice.__init__(
            self,
            reader,
            count_time,
            npoints=None,
            prepare_once=None,
            start_once=None,
            **keys
        )
265
266
        for cnt in counters:
            self.add_counter(cnt)
267

268
269
270
271
272
273
274
    @AcquisitionDevice.parent.setter
    def parent(self, p):
        self._AcquisitionDevice__parent = p
        self._AcquisitionDevice__npoints = p.npoints
        self._AcquisitionDevice__prepare_once = p.prepare_once
        self._AcquisitionDevice__start_once = p.start_once

275
276
277
    def prepare(self):
        self._nb_acq_points = 0
        self._stop_flag = False
278
        self.device.prepare(*self.grouped_read_counters)
279

280
    def start(self):
281
        self.device.start(*self.grouped_read_counters)
282
283

    def stop(self):
284
        self.device.stop(*self.grouped_read_counters)
285
286
287
288
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
289

290
291
292
293
294
295
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

296
    def reading(self):
297
        from_index = 0
298
299
300
        while (
            not self.npoints or self._nb_acq_points < self.npoints
        ) and not self._stop_flag:
301
            data = self._read_data(from_index)
302
            if not all_equal([len(d) for d in data]):
303
                raise RuntimeError("Read data can't have different sizes")
304
            if len(data[0]) > 0:
305
306
307
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
308
309
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
310
                gevent.sleep(self.count_time / 2.)