counter.py 8.22 KB
Newer Older
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import numpy
import time
Matias Guijarro's avatar
Matias Guijarro committed
10
11
import gevent
from gevent import event
12
from bliss.common.event import dispatcher
Matias Guijarro's avatar
Matias Guijarro committed
13
from ..chain import AcquisitionDevice, AcquisitionChannel
14
from bliss.common.measurement import GroupedReadMixin, Counter
15
from bliss.common.utils import all_equal
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _get_group_reader(counters_or_groupreadhandler):
    try:
        list_iter = iter(counters_or_groupreadhandler)
    except TypeError:
        return counters_or_groupreadhandler, list()
    else:
        first_counter = list_iter.next()
        reader = Counter.GROUPED_READ_HANDLERS.get(first_counter)
        for cnt in list_iter:
            cnt_reader = Counter.GROUPED_READ_HANDLERS.get(cnt)
            if cnt_reader != reader:
                raise RuntimeError("Counters %s doesn't belong to the same group" %\
                                   counters_or_groupreadhandler)
        return reader, counters_or_groupreadhandler
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
31

32
class BaseCounterAcquisitionDevice(AcquisitionDevice):
33
    def __init__(self, counter, count_time, **keys):
34
        npoints = max(1, keys.pop('npoints', 1))
35
        prepare_once = keys.pop('prepare_once', True)
36
37
        start_once = keys.pop('start_once', npoints > 1)

38
        AcquisitionDevice.__init__(self, counter, counter.name,
39
40
41
42
43
44
45
46
47
                                   npoints=npoints,
                                   trigger_type=AcquisitionDevice.SOFTWARE,
                                   prepare_once=prepare_once,
                                   start_once=start_once)

        self.__count_time = count_time
        self.__grouped_read_counters_list = list()
        self._nb_acq_points = 0

48
        if not isinstance(counter, GroupedReadMixin):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
49
            self.channels.append(AcquisitionChannel(
50
                counter.name, numpy.double, ()))
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
51

52
53
54
    @property
    def count_time(self):
        return self.__count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
55

56
57
58
    @property
    def grouped_read_counters(self):
        return self.__grouped_read_counters_list
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
59

60
61
    def add_counter(self, counter):
        if not isinstance(self.device, GroupedReadMixin):
62
63
64
            # Ignore if the counter is already the provided device
            if self.device == counter:
                return
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
65
66
            raise RuntimeError(
                "Cannot add counter to single-read counter acquisition device")
67
68

        self.__grouped_read_counters_list.append(counter)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
69
        self.channels.append(AcquisitionChannel(
70
            counter.name, numpy.double, ()))
71
72

    def _emit_new_data(self, data):
73
        self.channels.update_from_iterable(data)
74
75
76


class SamplingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
77
    SIMPLE_AVERAGE, TIME_AVERAGE, INTEGRATE = range(3)
78

79
80
    def __init__(self, counters_or_groupreadhandler,
                 count_time=None, mode=SIMPLE_AVERAGE, **keys):
81
82
83
        """
        Helper to manage acquisition of a sampling counter.

84
        counters_or_groupreadhandler -- can be a list,tuple of SamplingCounter or
85
        a group_read_handler
86
87
88
89
90
91
        count_time -- the master integration time.
        mode -- three mode are available *SIMPLE_AVERAGE* (the default)
        which sum all the sampling values and divide by the number of read value.
        the *TIME_AVERAGE* which sum all integration  then divide by the sum
        of time spend to measure all values. And *INTEGRATION* which sum all integration
        and then normalize it when the *count_time*.
92
93
94
95
        Other keys are:
          * npoints -- number of point for this acquisition
          * prepare_once --
          * start_once --
96
        """
97
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
98
        BaseCounterAcquisitionDevice.__init__(
99
            self, reader, count_time, **keys)
100

101
102
103
104
        self._event = event.Event()
        self._stop_flag = False
        self._ready_event = event.Event()
        self._ready_flag = True
105
106
        self.__mode = mode

107
108
109
        for cnt in counters:
            self.add_counter(cnt)

110
111
112
    @property
    def mode(self):
        return self.__mode
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
113

114
    @mode.setter
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
115
    def mode(self, value):
116
        self.__mode = value
117
118

    def prepare(self):
119
120
        self.device.prepare(*self.grouped_read_counters)

121
    def start(self):
122
123
124
125
126
        self._nb_acq_points = 0
        self._stop_flag = False
        self._ready_flag = True
        self._event.clear()

127
        self.device.start(*self.grouped_read_counters)
128
129

    def stop(self):
130
131
        self.device.stop(*self.grouped_read_counters)

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
        self._stop_flag = True
        self._trig_time = None
        self._event.set()

    def trigger(self):
        self._trig_time = time.time()
        self._event.set()

    def wait_ready(self):
        """
        will wait until the last triggered point is read
        """
        while not self._ready_flag:
            self._ready_event.wait()
            self._ready_event.clear()

    def reading(self):
        while self._nb_acq_points < self.npoints:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
150
            # trigger wait
151
152
153
154
            self._event.wait()
            self._event.clear()
            self._ready_flag = False
            trig_time = self._trig_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
155
156
157
158
            if trig_time is None:
                continue
            if self._stop_flag:
                break
159
160
161

            nb_read = 0
            acc_read_time = 0
162
            acc_value = numpy.zeros((len(self.channels),), dtype=numpy.double)
163
            stop_time = trig_time + self.count_time or 0
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
164
            # Counter integration loop
165
166
            while not self._stop_flag:
                start_read = time.time()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
167
168
                read_value = numpy.array(self.device.read(
                    *self.grouped_read_counters), dtype=numpy.double)
169
                end_read = time.time()
170
                read_time = end_read - start_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
171

172
                if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
173
                    acc_value += read_value * (end_read - start_read)
174
175
176
                else:
                    acc_value += read_value

177
178
179
180
181
182
                nb_read += 1
                acc_read_time += end_read - start_read

                current_time = time.time()
                if (current_time + (acc_read_time / nb_read)) > stop_time:
                    break
183
                gevent.sleep(0)  # to be able to kill the task
184
            self._nb_acq_points += 1
Matias Guijarro's avatar
Matias Guijarro committed
185
            if self.__mode == SamplingCounterAcquisitionDevice.TIME_AVERAGE:
186
                data = acc_value / acc_read_time
187
188
            else:
                data = acc_value / nb_read
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
189

Matias Guijarro's avatar
Matias Guijarro committed
190
            if self.__mode == SamplingCounterAcquisitionDevice.INTEGRATE:
191
                data *= self.count_time
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
192

193
194
            self._emit_new_data(data)

195
196
            self._ready_flag = True
            self._ready_event.set()
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
197

198

199
class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
200
201
    def __init__(self, counters_or_groupreadhandler, count_time=None, **keys):
        reader, counters = _get_group_reader(counters_or_groupreadhandler)
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
202
        BaseCounterAcquisitionDevice.__init__(
203
204
205
            self, reader, count_time, **keys)
        for cnt in counters:
            self.add_counter(cnt)
206
207

    def prepare(self):
208
        self.device.prepare(*self.grouped_read_counters)
209
210

    def start(self):
211
212
213
        self._nb_acq_points = 0
        self._stop_flag = False

214
        self.device.start(*self.grouped_read_counters)
215
216

    def stop(self):
217
        self.device.stop(*self.grouped_read_counters)
218
219
220
221
        self._stop_flag = True

    def trigger(self):
        pass
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
222

223
224
225
226
227
228
    def _read_data(self, from_index):
        if self.grouped_read_counters:
            return self.device.get_values(from_index, *self.grouped_read_counters)
        else:
            return [numpy.array(self.device.get_value(from_index), dtype=numpy.double)]

229
    def reading(self):
230
        from_index = 0
231
        while self._nb_acq_points < self.npoints and not self._stop_flag:
232
            data = self._read_data(from_index)
233
            if not all_equal([len(d) for d in data]):
234
                raise RuntimeError("Read data can't have different sizes")
235
            if len(data[0]) > 0:
236
237
238
                from_index += len(data[0])
                self._nb_acq_points += len(data[0])
                self._emit_new_data(data)
239
240
                gevent.idle()
            else:
Sebastien Petitdemange's avatar
pep8    
Sebastien Petitdemange committed
241
                gevent.sleep(self.count_time / 2.)