Skip to content
temperature.py 15 KiB
Newer Older
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

"""
Classes implemented with temperature Controller
"""

import gevent
import gevent.event
from bliss.common.task import task
Cyril Guilloud's avatar
Cyril Guilloud committed
from bliss.common import mapping
from bliss.common.logtools import LogMixin
Cyril Guilloud's avatar
Cyril Guilloud committed
from bliss.common.utils import with_custom_members, autocomplete_property
from bliss.common.measurement import SamplingCounter, counter_namespace
class TempControllerCounter(SamplingCounter):
    """ Implements access to counter object for
        Input and Output type objects
    """
    def __init__(self, name, parent):
Linus Pithan's avatar
Linus Pithan committed
        SamplingCounter.__init__(
            self, name, parent, unit=parent.config.get("unit", None)
        )
        self.parent = parent

    def read(self):
        data = self.parent.read()
        return data
class Input(LogMixin):
    """ Implements the access to temperature sensors
    """
    def __init__(self, controller, config):
        """ Constructor """
        self.__controller = controller
        self.__name = config["name"]
        self.__config = config
Cyril Guilloud's avatar
Cyril Guilloud committed
        mapping.register(self, children_list=[controller])
        mapping.register(
            self.counter, parents_list=[self, "counters"], children_list=[controller]
        )
        # useful attribute for a temperature controller writer
        self._attr_dict = {}

    @property
    def controller(self):
        """ Returns the temperature controller """
        return self.__controller
    @property
    def name(self):
        """ returns the sensor name """
        return self.__name

    @property
    def config(self):
        """ returns the sensor config """
        return self.__config

        """ returns the counter object """
        return TempControllerCounter(self.name, self)

    @property
    def counters(self):
        """Standard counter namespace."""
        return counter_namespace([self.counter])
    def read(self):
        """ returns the sensor value """
        self._logger.debug("On Input:read")
        return self.controller.read_input(self)
        """ returns the sensor state """
        self._logger.debug("On Input:state")
        return self.controller.state_input(self)
class Output(LogMixin):
    """ Implements the access to temperature heaters """
    def __init__(self, controller, config):
        """ Constructor """
        self.__controller = controller
        self.__name = config["name"]
            self.__limits = (config.get("low_limit"), config.get("high_limit"))
        except:
            self.__limits = (None, None)
        self.__setpoint_task = None
        self.__setpoint_event_poll = 0.02
        try:
            self.__deadband = float(config["deadband"])
        except:
            self.__deadband = None
        self.__config = config
        self.__mode = 0
        # if defined as  self.deadband, attribute available from the instance
        # if defined as  self.__deadband, not available.
        #     in that case, use of decorator property offers it (read only) to world
Cyril Guilloud's avatar
Cyril Guilloud committed
        mapping.register(self, children_list=[controller])

        # useful attribute for a temperature controller writer
        self._attr_dict = {}

    @property
    def controller(self):
        """ returns the temperature controller """
        return self.__controller

    @property
    def name(self):
        """ returns the heater name """
        return self.__name

    @property
    def config(self):
        """ returns the heater config """
        return self.__config

    @property
    def limits(self):
        """ returns the limits for the heater temperature setting """
        return self.__limits

    @property
    def deadband(self):
        """ returns the deadband acceptable for the heater temperature setting.
            After a ramp or a set, the setpoint is considered to be reached
            only if heater value is within the deadband.
            While the setpoint is not reached, a wait will block on it."""
        return self.__deadband

        """ returns the counter object """
        return TempControllerCounter(self.name, self)

    @property
    def counters(self):
        """Standard counter namespace."""
        return counter_namespace([self.counter])
        """ returns the heater value """
        self._logger.debug("On Output:read")
        return self.controller.read_output(self)
Cyril Guilloud's avatar
Cyril Guilloud committed
    def ramp(self, new_setpoint=None, wait=False, **kwargs):
        """ Starts a ramp on an output.
            - if no setpoint is provided, returns the present setpoint value
            - by default does not wait.
            - it is possible to provide kwargs arguments
            The related controller methods to be filled are:
            - get_setpoint
            - setpoint_stop
            - setpoint_abort
            - start_ramp
        """
        self._logger.debug("On Output:ramp %s" % new_setpoint)
        self.__mode = 1
        return self._ramp(new_setpoint, wait, **kwargs)

Cyril Guilloud's avatar
Cyril Guilloud committed
    def set(self, new_setpoint=None, wait=False, **kwargs):
        """ Sets as quickly as possible a temperature.
            - if no setpoint is provided, returns the present setpoint value
            - by default does not wait.
            - it is possible to provide kwargs arguments
            The related controller methods to be filled are:
            - get_setpoint
            - setpoint_stop
            - setpoint_abort
            - set
        """
        self._logger.debug("On Output:set %s" % new_setpoint)
        self.__mode = 0
Cyril Guilloud's avatar
Cyril Guilloud committed
        return self._ramp(new_setpoint, wait, **kwargs)

    def _ramp(self, new_setpoint=None, wait=False, **kwargs):
        """ starts the ramp tasks.
        """
        self._logger.debug("On Output:_ramp %s" % new_setpoint)
        if new_setpoint is not None:
            ll, hl = self.limits
            if ll is not None and new_setpoint < ll:
                raise RuntimeError(
                    "Invalid setpoint `%f', below low limit (%f)" % (new_setpoint, ll)
                )
            if hl is not None and new_setpoint > hl:
                raise RuntimeError(
                    "Invalid setpoint `%f', above high limit (%f)" % (new_setpoint, hl)
                )
Cyril Guilloud's avatar
Cyril Guilloud committed

            self.__setpoint_task = self._start_setpoint(new_setpoint, **kwargs)

            if wait:
                self.wait()
        else:
            return self.controller.get_setpoint(self)
Cyril Guilloud's avatar
Cyril Guilloud committed

        """ Waits on a setpoint task
        """
        self._logger.debug("On Output:wait")
        except KeyboardInterrupt:
            self.stop()

        """
        Return a string representing the setpoint state of an Output class type object.
        If a setpoint is set (by ramp or by direct setting) on an ouput, the status
        will be RUNNING until it is in the deadband.
        This RUNNING state is used by the ramping event loop in the case a user wants
        to block on the Output ramp method (wait=True)

        If deadband is None, returns immediately READY

        Args:
           toutput:  Output class type object
           deadband: deadband attribute of toutput.

        Returns:
           object state string: READY/RUNNING from [READY/RUNNING/ALARM/FAULT]

        """
        deadband = self.deadband if deadband is None else deadband
        self._logger.debug("On output:_setpoint_state: %s" % (deadband))
            return "READY"
        mysp = self.controller.get_setpoint(self)
            return "READY"
        if math.fabs(self.controller.read_output(self) - mysp) <= deadband:
            return "READY"
        else:
            return "RUNNING"

        """ Stops a setpoint task.
            Calls the controller method setpoint_stop
        """
        self._logger.debug("On Output: stop")
        if self.__setpoint_task and not self.__setpoint_task.ready():
            self.__setpoint_task.kill()
        self.controller.setpoint_stop(self)
        """ Aborts a setpoint task.
            Calls the controller method setpoint_abort
        """
        self._logger.debug("On Output: abort")
        if self.__setpoint_task and not self.__setpoint_task.ready():
            self.__setpoint_task.kill()
        self.controller.setpoint_abort(self)

    def rampstate(self):
        """
        Returns the setpoint state:

        - RUNNING: means that output object read value
          is outside setpoint deadband
        - READY: inside the deadband
        """
        if self.__ramping == 1:
            return "RUNNING"
    def _do_setpoint(self, setpoint):
        """ Subtask launching the setpoint
            Polls until setpoint is reached
            Is a gevent coroutine
        self._logger.debug("On Output:_do_setpoint : mode = %s" % (self.__mode))
            while self._setpoint_state() == "RUNNING":
                gevent.sleep(self.__setpoint_event_poll)
        finally:
            self.__ramping = 0
    def _start_setpoint(self, setpoint, **kwargs):
        """ launches the coroutine doing the setpoint
        """
        self._logger.debug("On Output:_start_setpoint")
        @task
        def setpoint_task(setpoint):
            sync_event.set()
            if self.__mode == 1:
                self.controller.start_ramp(self, setpoint, **kwargs)
            else:
                self.controller.set(self, setpoint, **kwargs)
            self.__ramping = 1
            self._do_setpoint(setpoint)
        sp_task = setpoint_task(setpoint, wait=False)
        sync_event.wait()
        return sp_task
Cyril Guilloud's avatar
Cyril Guilloud committed

    def state(self):
        """ returns the the state of a heater """
        self._logger.debug("On Output:state")
        return self.controller.state_output(self)
        Setting/reading the polling time (in seconds) in the event loop waiting
        for setpoint being reached (== read value inside the deadband)
        default is 0.02 sec
        """
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_poll is not None:
            self.__setpoint_event_poll = new_poll
            return self.__setpoint_event_poll
        Setting/reading the setpoint ramp rate value
        self._logger.debug("On Output:ramprate: %s " % (new_ramp))
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_ramp is not None:
            self.controller.set_ramprate(self, new_ramp)
            return self.controller.read_ramprate(self)
        Setting/reading the setpoint step value (for step mode ramping)
        self._logger.debug("On Output:step: %s " % (new_step))
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_step is not None:
            self.controller.set_step(self, new_step)
            return self.controller.read_step(self)
        """
        Setting/reading the setpoint dwell value (for step mode ramping)

        """
        self._logger.debug("On Output:setpoint dwell: %s " % (new_dwell))
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_dwell is not None:
            self.controller.set_dwell(self, new_dwell)
            return self.controller.read_dwell(self)
    def _add_custom_method(self, method, name, types_info=(None, None)):
        """ necessary to add custom methods to this class """
        setattr(self, name, method)
        self.__custom_methods_list.append((name, types_info))
class Loop(LogMixin):
    """ Implements the access to temperature regulation loop """
    def __init__(self, controller, config):
        """ Constructor """
        self.__controller = controller
        self.__name = config["name"]
        self.__config = config
Cyril Guilloud's avatar
Cyril Guilloud committed
        self.__input = controller.get_object(config["input"][1:])
        self.__output = controller.get_object(config["output"][1:])
        self._Pval = None
        self._Ival = None
        self._Dval = None
Cyril Guilloud's avatar
Cyril Guilloud committed
        mapping.register(self, children_list=[controller])
        mapping.register(
            self.input.counter,
            parents_list=[self, "counters"],
            children_list=[controller],
        )

        # useful attribute for a temperature controller writer
        self._attr_dict = {}

    @property
    def controller(self):
        """ returns the temperature controller """
        return self.__controller

    @property
    def name(self):
        """ returns the loop name """
        return self.__name

    @property
    def config(self):
        """ returns the loop config """
        return self.__config

Cyril Guilloud's avatar
Cyril Guilloud committed
    @autocomplete_property
    def input(self):
        """ returns the loop input object """
Cyril Guilloud's avatar
Cyril Guilloud committed
    @autocomplete_property
    def output(self):
        """ returns the loop output object """
        return self.__output

    def set(self, new_setpoint=None, wait=False, **kwargs):
        """ same as a call to the the method set on its output object """
        self._logger.debug(("On Loop: set %s") % new_setpoint)
        return self.__output.set(new_setpoint, wait, **kwargs)

    def ramp(self, new_setpoint=None, wait=False, **kwargs):
        """ same as the call to the method ramp on its output object """
        self._logger.debug(("On Loop: ramp %s") % new_setpoint)
        return self.__output.ramp(new_setpoint, wait, **kwargs)
        """ same as the call to the method stop on its output object """
        self._logger.debug("On Loop: stop")
        """ Sets the regulation on
            - call to the method 'on' of the controller
        """
        self._logger.debug("On Loop: on")
        self.controller.on(self)

    def off(self):
        """ Sets the regulation off
            - call to the method 'off' of the controller
        """
        self._logger.debug("On Loop: off")
        self.controller.off(self)

        self._logger.debug("On Loop: kp (PID): ")
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_kp is not None:
            self.controller.set_kp(self, new_kp)
            return self.controller.read_kp(self)
        self._logger.debug("On Loop: ki (PID): ")
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_ki is not None:
            self.controller.set_ki(self, new_ki)
            return self.controller.read_ki(self)
        self._logger.debug("On Loop: kd (PID): ")
Cyril Guilloud's avatar
Cyril Guilloud committed
        if new_kd is not None:
            self.controller.set_kd(self, new_kd)
            return self.controller.read_kd(self)