Commit 550edc62 authored by Linus Pithan's avatar Linus Pithan
Browse files

SingleRead_Counter, SingleRead_AcquisitionDevice and Epoch

parent 3902a6d4
......@@ -12,6 +12,7 @@ import numpy
import inspect
import weakref
from collections import namedtuple
import time
from bliss.common.utils import add_conversion_function
from bliss.common.alias import AliasMixin
......@@ -505,3 +506,51 @@ def DefaultIntegratingCounterGroupedReadHandler(
return handlers.setdefault(
controller, DefaultIntegratingCounterGroupedReadHandler(controller)
class SingleRead_Counter(Counter):
A counter that is read only once per acquisition (contrary to SamplingCounter)
def __init__(self, name, config):
Counter.__init__(self, name)
self._name = name
self.config = config
self.acq_device = None
def create_acquisition_device(self, scan_pars):
from bliss.scanning.acquisition.counter import SingleRead_AcquisitionDevice
self.acq_device = SingleRead_AcquisitionDevice(
self, npoints=scan_pars.get("npoints", 1)
return self.acq_device
def get_acquisition_device(self):
return self.acq_device
def name(self):
return self._name
def read(self):
Overwrite in your class to provide a useful SingleRead counter class
will be called on each trigger.
raise NotImplementedError
class Epoch_Counter(SingleRead_Counter):
Simple SingleRead_Counter that emits epoch (The epoch is the number of seconds
that have elapsed since January 1, 1970)
def __init__(self, name, config):
SingleRead_Counter.__init__(self, name, config)
def read(self):
return time.time()
......@@ -308,3 +308,41 @@ class IntegratingCounterAcquisitionDevice(BaseCounterAcquisitionDevice):
gevent.sleep(self.count_time / 2.0)
class SingleRead_AcquisitionDevice(AcquisitionDevice):
Helper to manage acquisition of a SingleRead_Counter.
def __init__(self, counter, npoints):
self.counter = counter
prepare_once=True, # Do not call prepare at each point.
start_once=True, # Do not call start at each point.
# add a new channel to the acq dev.
AcquisitionChannel(counter,, counter.dtype, counter.shape)
def prepare(self):
def start(self):
def stop(self):
def trigger(self):
Called at each point
""" # publishes the data
plugin: bliss
class: Epoch_Counter
package: bliss.common.measurement
- name: epoch
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment