Skip to content 13.9 KiB
Newer Older
# This file is part of the bliss project
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

# run tests for this module from the bliss root directory with:
# python -m unittest discover -s tests/acquisition -v

import numpy
from collections import namedtuple
from bliss.common.utils import add_conversion_function
def flat_namespace(dct):
    """A namespace allowing names with dots."""
    mapping = dict(dct)

    class getter(object):
        def __init__(self, parent, prefix):
            self.parent = parent
            self.prefix = prefix

        def __getattr__(self, key):
            return getattr(self.parent, self.prefix + key)

    class namespace(tuple):

        __slots__ = ()
        _fields = sorted(mapping)
        __dict__ = property(lambda _: mapping)

        def __getattr__(self, arg):
            if arg in mapping:
                return mapping[arg]
            if arg.startswith("__"):
                raise AttributeError(arg)
            for field in self._fields:
                if field.startswith(arg + "."):
                    return getter(self, arg + ".")
            raise AttributeError(arg)

        def __setattr__(self, arg, value):
            raise AttributeError("can't set attribute")

        def __repr__(self):
            reprs = ("{}={!r}".format(field, mapping[field]) for field in self._fields)
            return "{}({})".format("namespace", ", ".join(reprs))

    return namespace(mapping[field] for field in namespace._fields)

def namespace(dct):
    if any("." in key for key in dct):
        return flat_namespace(dct)
    return namedtuple("namespace", sorted(dct))(**dct)

def counter_namespace(counters):
    return namespace({ counter for counter in counters})

# Base counter class

class GroupedReadMixin(object):
    def __init__(self, controller):
Vincent Michel's avatar
Vincent Michel committed
        self._controller_ref = weakref.ref(controller)
    def name(self):

    def controller(self):
Vincent Michel's avatar
Vincent Michel committed
        return self._controller_ref()
    def id(self):
        return id(self.controller)
    def prepare(self, *counters):
    def start(self, *counters):
Vincent Michel's avatar
Vincent Michel committed
class BaseCounter(object):
    """Define a standard counter interface."""

Vincent Michel's avatar
Vincent Michel committed
    def controller(self):
        """A controller or None."""
        return None

    def master_controller(self):
        """A master controller or None."""
        return None

Vincent Michel's avatar
Vincent Michel committed
    def name(self):
        """A unique name within the controller scope."""
        raise NotImplementedError

    def dtype(self):
        """The data type as used by numpy."""
        raise NotImplementedError

    def shape(self):
        """The data shape as used by numpy."""
        raise NotImplementedError

    def create_acquisition_device(self, scan_pars, **settings):
        """Instanciate the corresponding acquisition device."""
        raise NotImplementedError

    # Extra logic
Vincent Michel's avatar
Vincent Michel committed

    def fullname(self):
        """A unique name within the session scope.

        The standard implementation defines it as:
Vincent Michel's avatar
Vincent Michel committed
        args = []
        # Master controller
        if self.master_controller is not None:
        # Controller
        if self.controller is not None:
        # Name
        return ".".join(args)
Vincent Michel's avatar
Vincent Michel committed

class Counter(BaseCounter):
    GROUPED_READ_HANDLERS = weakref.WeakKeyDictionary()
    def __init__(
        self, name, grouped_read_handler=None, conversion_function=None, controller=None
Vincent Michel's avatar
Vincent Michel committed
        self._name = name
        self._controller = controller
        self._conversion_function = conversion_function
        if grouped_read_handler:
            Counter.GROUPED_READ_HANDLERS[self] = grouped_read_handler
Vincent Michel's avatar
Vincent Michel committed
    # Standard interface

    def controller(self):
        return self._controller

    def name(self):
Vincent Michel's avatar
Vincent Michel committed
        return self._name

    def dtype(self):
        return numpy.float

    def shape(self):
        return ()

    # Default chain handling

    def get_acquisition_device_class(cls):
        raise NotImplementedError

    def create_acquisition_device(self, scan_pars, **settings):
        read_handler = self.GROUPED_READ_HANDLERS.get(self, self)
        return self.get_acquisition_device_class()(read_handler, **scan_pars)

Vincent Michel's avatar
Vincent Michel committed
    # Extra interface

Vincent Michel's avatar
Vincent Michel committed
        return self._conversion_function
class SamplingCounter(Counter):
    def get_acquisition_device_class(cls):
        from bliss.scanning.acquisition.counter import SamplingCounterAcquisitionDevice
        return SamplingCounterAcquisitionDevice

    class GroupedReadHandler(GroupedReadMixin):
        def read(self, *counters):
            this method should return a list of read values in the same order
            as counters
            raise NotImplementedError
    class ConvertValue(object):
        def __init__(self, grouped_read_handler):
            return [
                cnt.conversion_function(x) if cnt.conversion_function else x
                for x, cnt in zip(*counters), counters)

    def __init__(
        self, name, controller, grouped_read_handler=None, conversion_function=None
        if grouped_read_handler is None and hasattr(controller, "read_all"):
            grouped_read_handler = DefaultSamplingCounterGroupedReadHandler(controller)
            if not isinstance(, self.ConvertValue):
       = self.ConvertValue(grouped_read_handler)
                add_conversion_function(self, "read", conversion_function)
        super(SamplingCounter, self).__init__(
            name, grouped_read_handler, conversion_function, controller
            grouped_read_handler = Counter.GROUPED_READ_HANDLERS[self]
        except KeyError:
            raise NotImplementedError
class SoftCounter(SamplingCounter):
    Transforms any given python object into a sampling counter.
    By default it assumes the object has a member called *value* which will be
    used on a read.
    You can overwrite this behaviour by passing the name of the object member
    as value. It can be an object method, a property/descriptor or even a simple
    attribute of the given object.

    If no name is given, the counter name is the string representation of the
    value argument.
    The counter full name is `` + '.' + counter_name. If no
    controller is given, the is used instead of If no
    obj is given the counter full name is counter name.

    You can pass an optional apply function if you need to transform original
    value given by the object into something else.

    Here are some examples::

        from bliss.common.measurement import SoftCounter

        class Potentiostat:

            def __init__(self, name):
       = name

            def potential(self):
                return float(self.comm.write_readline('POT?\n'))

            def get_voltage(self):
                return float(self.comm.write_readline('VOL?\n'))

        pot = Potentiostat('p1')

        # counter from an object property (its name is 'potential'.
        # Its full name is 'p1.potential')
        pot_counter = SoftCounter(pot, 'potential')

        # counter form an object method
        milivol_counter = SoftCounter(pot, 'get_voltage', name='voltage',
                                      apply=lambda v: v*1000)

        # you can use the counters in any scan
        from bliss.common.standard import loopscan
        loopscan(10, 0.1, pot_counter, milivol_counter)

    class Controller(object):
        def __init__(self, name):
   = name

    def __init__(self, obj=None, value="value", name=None, controller=None, apply=None):
        if obj is None and inspect.ismethod(value):
            obj = value.__self__
        self.get_value, value_name = self.get_read_func(obj, value)
        name = value_name if name is None else name
        obj_has_name = hasattr(obj, "name") and isinstance(, str)
        if controller is None:
            if obj_has_name:
                ctrl_name =
            elif obj is None:
                ctrl_name = name
                ctrl_name = type(obj).__name__
            controller = self.Controller(ctrl_name)
        if apply is None:
            apply = lambda x: x
        self.apply = apply
        super(SoftCounter, self).__init__(name, controller)

    def get_read_func(obj, value):
        if callable(value):
            value_name = value.__name__
            value_func = value
            otype = type(obj)
            value_name = value
            val = getattr(otype, value_name, None)
            if val is None or not callable(val):
                def value_func():
                    return getattr(obj, value_name)
            value_func.__name__ = value_name
        return value_func, value_name

    def read(self):
        return self.apply(self.get_value())

def DefaultSamplingCounterGroupedReadHandler(
    controller, handlers=weakref.WeakValueDictionary()
    class DefaultSamplingCounterGroupedReadHandler(SamplingCounter.GroupedReadHandler):
        Default read all handler for controller which have read_all method
        def read(self, *counters):
            return self.controller.read_all(*counters)
    return handlers.setdefault(
        controller, DefaultSamplingCounterGroupedReadHandler(controller)
class IntegratingCounter(Counter):
    def get_acquisition_device_class(cls):
        from bliss.scanning.acquisition.counter import (

        return IntegratingCounterAcquisitionDevice

    def master_controller(self):
        return self._master_controller_ref()

    class GroupedReadHandler(GroupedReadMixin):
        def get_values(self, from_index, *counters):
            this method should return a list of numpy arrays in the same order
            as the counter_name
            raise NotImplementedError

    class ConvertValues(object):
        def __init__(self, grouped_read_handler):
            self.get_values = grouped_read_handler.get_values

        def __call__(self, from_index, *counters):
            return [
                cnt.conversion_function(x) if cnt.conversion_function else x
                for x, cnt in zip(self.get_values(from_index, *counters), counters)

    def __init__(
        if grouped_read_handler is None and hasattr(controller, "get_values"):
            grouped_read_handler = DefaultIntegratingCounterGroupedReadHandler(
            if not isinstance(grouped_read_handler.get_values, self.ConvertValues):
                grouped_read_handler.get_values = self.ConvertValues(
                add_conversion_function(self, "get_values", conversion_function)
        super(IntegratingCounter, self).__init__(
            name, grouped_read_handler, conversion_function, controller
        if master_controller is None:
            self._master_controller_ref = lambda: None
            self._master_controller_ref = weakref.ref(master_controller)

    def get_values(self, from_index=0):
        Overwrite in your class to provide a useful integrated counter class

Vincent Michel's avatar
Vincent Michel committed
        This method is called after the prepare and start on the master handler.
        This method can block until the data is ready or not and return empty data.
        When data is ready should return the data from the acquisition

def DefaultIntegratingCounterGroupedReadHandler(
    controller, handlers=weakref.WeakValueDictionary()
    class DefaultIntegratingCounterGroupedReadHandler(
        Default read all handler for controller which have get_values method
        def get_values(self, from_index, *counters):
            return [
                cnt.conversion_function(x) if cnt.conversion_function else x
                for x, cnt in zip(
                    self.controller.get_values(from_index, *counters), counters

    return handlers.setdefault(
        controller, DefaultIntegratingCounterGroupedReadHandler(controller)