counter.py 8.27 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
Matias Guijarro's avatar
Matias Guijarro committed
10
11
import gevent
from gevent import event
12
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
13
from ..chain import AcquisitionDevice, AcquisitionChannel
14
from bliss.common.measurement import GroupedReadMixin, Counter
15
from bliss.common.utils import all_equal
16

17

18
19
20
21
22
23
24
25
26
27
28
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
        first_counter = list_iter.next()
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
29
30
31
32
                raise RuntimeError(
                    "Counters %s doesn't belong to the same group"
                    % counters_or_groupreadhandler
                )
33
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
34

35

36
class BaseCounterAcquisitionDevice(AcquisitionDevice):
37
    def __init__(self, counter, count_time, **keys):
38
39
40
41
42
43
44
45
46
47
48
49
50
        npoints = max(1, keys.pop("npoints", 1))
        prepare_once = keys.pop("prepare_once", True)
        start_once = keys.pop("start_once", npoints > 1)

        AcquisitionDevice.__init__(
            self,
            counter,
            counter.name,
            npoints=npoints,
            trigger_type=AcquisitionDevice.SOFTWARE,
            prepare_once=prepare_once,
            start_once=start_once,
        )
51
52
53
54
55

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

56
        if not isinstance(counter, GroupedReadMixin):
57
58
59
            self.channels.append(
                AcquisitionChannel(counter.name, counter.dtype, counter.shape)
            )
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
60

61
62
63
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
64

65
66
67
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
68

69
70
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
71
72
73
            # Ignore if the counter is already the provided device
            if self.device == counter:
                return
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
74
            raise RuntimeError(
75
76
                "Cannot add counter to single-read counter acquisition device"
            )
77
78

        self.__grouped_read_counters_list.append(counter)
79
80
81
        self.channels.append(
            AcquisitionChannel(counter.name, counter.dtype, counter.shape)
        )
82
83

    def _emit_new_data(self, data):
84
        self.channels.update_from_iterable(data)
85
86
87


class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
88
    SIMPLE_AVERAGE, TIME_AVERAGE, INTEGRATE = range(3)
89

90
91
92
    def __init__(
        self, counters_or_groupreadhandler, count_time=None, mode=SIMPLE_AVERAGE, **keys
    ):
93
94
95
        """
        Helper to manage acquisition of a sampling counter.

96
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or
97
        a group_read_handler
98
99
100
101
102
103
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
104
105
106
107
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
108
        """
109
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
110
        BaseCounterAcquisitionDevice.__init__(self, reader, count_time, **keys)
111

112
113
114
115
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
        self._ready_flag = True
116
117
        self.__mode = mode

118
119
120
        for cnt in counters:
            self.add_counter(cnt)

121
122
123
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
124

125
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
126
    def mode(self, value):
127
        self.__mode = value
128
129

    def prepare(self):
130
131
        self.device.prepare(*self.grouped_read_counters)

132
    def start(self):
133
134
135
136
137
        self._nb_acq_points = 0
        self._stop_flag = False
        self._ready_flag = True
        self._event.clear()

138
        self.device.start(*self.grouped_read_counters)
139
140

    def stop(self):
141
142
        self.device.stop(*self.grouped_read_counters)

143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
        while not self._ready_flag:
            self._ready_event.wait()
            self._ready_event.clear()

    def reading(self):
160
        while not self._stop_flag and self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
161
            # trigger wait
162
163
164
165
            self._event.wait()
            self._event.clear()
            self._ready_flag = False
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
166
167
168
169
            if trig_time is None:
                continue
            if self._stop_flag:
                break
170
171
172

            nb_read = 0
            acc_read_time = 0
173
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
174
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
175
            # Counter integration loop
176
177
            while not self._stop_flag:
                start_read = time.time()
178
179
180
                read_value = numpy.array(
                    self.device.read(*self.grouped_read_counters), dtype=numpy.double
                )
181
                end_read = time.time()
182
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
183

184
                if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
185
                    acc_value += read_value * (end_read - start_read)
186
187
188
                else:
                    acc_value += read_value

189
190
191
192
193
194
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
195
                gevent.sleep(0)  # to be able to kill the task
196
            self._nb_acq_points += 1
Matias Guijarro's avatar
Matias Guijarro committed
197
            if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
198
                data = acc_value / acc_read_time
199
200
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
201

Matias Guijarro's avatar
Matias Guijarro committed
202
            if self.__mode == SamplingCounterAcquisitionDevice.INTEGRATE:
203
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
204

205
206
            self._emit_new_data(data)

207
208
            self._ready_flag = True
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
209

210

211
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
212
213
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
214
        BaseCounterAcquisitionDevice.__init__(self, reader, count_time, **keys)
215
216
        for cnt in counters:
            self.add_counter(cnt)
217
218

    def prepare(self):
219
        self.device.prepare(*self.grouped_read_counters)
220
221

    def start(self):
222
223
224
        self._nb_acq_points = 0
        self._stop_flag = False

225
        self.device.start(*self.grouped_read_counters)
226
227

    def stop(self):
228
        self.device.stop(*self.grouped_read_counters)
229
230
231
232
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
233

234
235
236
237
238
239
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

240
    def reading(self):
241
        from_index = 0
242
        while self._nb_acq_points < self.npoints and not self._stop_flag:
243
            data = self._read_data(from_index)
244
            if not all_equal([len(d) for d in data]):
245
                raise RuntimeError("Read data can't have different sizes")
246
            if len(data[0]) > 0:
247
248
249
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
250
251
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
252
                gevent.sleep(self.count_time / 2.)