Commit 82b3bcd5 authored by Anthony Mauro's avatar Anthony Mauro

Remove custom_method_input&output

factorizing alarm_reset and alarm_status
parent 4daef5c7
Pipeline #11646 failed with stages
in 31 minutes and 8 seconds
......@@ -109,6 +109,30 @@ class LakeshoreInput(Input):
channel = self.config.get("channel")
return self.controller._lakeshore._sensor_type(channel, **kwargs)
def alarm_status(self):
""" Shows high and low alarm state for given input
Args:
None
Returns:
high and low alarm state (str, str): "On/Off"
"""
self._logger.info("alarm_status")
channel = self.config.get("channel")
return self.controller._lakeshore._alarm_status(channel)
def alarm_reset(self):
""" Clears both the high and low status of all alarms
Args:
None (though this command does not need the input channel,
we put it here, since alarms are related to the state
on input like for ex. measured temperature above
alarm high-limit etc.)
Returns:
None
"""
self._logger.info("alarm_reset")
return self.controller._lakeshore._alarm_reset()
class LakeshoreOutput(Output):
def __repr__(self):
......
......@@ -121,19 +121,16 @@ class LakeShore331(LogMixin):
# ------------
def _initialize_input(self, input):
self._logger.info("_initialize_input")
self._add_custom_method_input(input)
# - Output object
# -------------
def _initialize_output(self, output):
self._logger.info("_initialize_output")
# self._add_custom_method_output(output)
# - Loop object
# -----------
def _initialize_loop(self, loop):
self._logger.info("_initialize_loop")
self._add_custom_method_loop(loop)
# Get input object channel
ipc = loop.input.config["channel"]
# Get output object unit
......@@ -182,45 +179,15 @@ class LakeShore331(LogMixin):
"Temperature OverRange in Sensor_unit on input %s" % channel
)
# Adding CUSTOM INPUT-object related method(s)
# --------------------------------------------
def _add_custom_method_input(self, input):
self._logger.info("_add_custom_method_input")
def alarm_status():
""" Shows high and low alarm state for given input
Args:
None
Returns:
high and low alarm state (str, str): "On/Off"
"""
self._logger.info("alarm_status")
return self._alarm_status(input.config.get("channel"))
input.alarm_status = alarm_status
def alarm_reset():
""" Clears both the high and low status of all alarms
Args:
None (though this command does not need the input channel,
we put it here, since alarms are related to the state
on input like for ex. measured temperature above
alarm high-limit etc.)
Returns:
None
"""
self._logger.info("alarm_reset")
return self._alarm_reset()
input.alarm_reset = alarm_reset
def _sensor_type(self, channel, type=None, compensation=None):
""" Read
""" Read or set input type parameters
Args: According to the model, use the appropriate args
type (int): 0 to ?
compensation (int): 0=off and 1=on
example: input.sensor_type(type=3,compensation=1)
Returns:
<type>, <compensation>
"""
self._logger.info("_sensor_type")
if type is None:
......@@ -263,7 +230,6 @@ class LakeShore331(LogMixin):
state = "ON" if int(r[0]) == 1 else "OFF"
rate_value = float(r[1])
return {"state": state, "rate": rate_value}
if value < 0.1 or value > 100:
raise ValueError("Ramp value %s is out of bounds [0.1,100]" % value)
self.send_cmd("RAMP", 0, value, channel=channel)
......@@ -340,11 +306,6 @@ class LakeShore331(LogMixin):
kp, ki, kd = self.send_cmd("PID?", channel=channel).split(",")
return float(kp), float(ki), float(kd)
# Adding CUSTOM LOOP-object related method(s)
# ---------------------------------------------
def _add_custom_method_loop(self, loop):
self._logger.info("_add_custom_method_loop")
# General CUSTOM methods [valid for any type of object:
# input, output, loop]
# -----------------------------------------------------
......@@ -527,6 +488,7 @@ class LakeShore331(LogMixin):
self._logger.debug("values = {0}".format(values))
if "?" in command:
asw = self._comm.write_readline(cmd.encode() + self.eol.encode())
# print("asw = {0}".format(asw.decode()))
return asw.decode()
else:
self._comm.write(cmd.encode() + self.eol.encode())
......
......@@ -125,19 +125,16 @@ class LakeShore332(LogMixin):
# ------------
def _initialize_input(self, input):
self._logger.info("_initialize_input")
self._add_custom_method_input(input)
# - Output object
# -------------
def _initialize_output(self, output):
self._logger.info("_initialize_output")
# self._add_custom_method_output(output)
# - Loop object
# -----------
def _initialize_loop(self, loop):
self._logger.info("_initialize_loop")
self._add_custom_method_loop(loop)
# Get input object channel
ipc = loop.input.config["channel"]
# Get output object unit
......@@ -154,7 +151,7 @@ class LakeShore332(LogMixin):
Args:
channel (int): input channel. Valid entries: A or B
scale (str): temperature unit for reading: Kelvin or Celsius
or Sensor unit (Ohm or Volt)
or Sensor_unit (Ohm or Volt)
Returns:
(float): current temperature
"""
......@@ -186,38 +183,6 @@ class LakeShore332(LogMixin):
"Temperature OverRange in Sensor_unit on input %s" % channel
)
# Adding CUSTOM INPUT-object related method(s)
# --------------------------------------------
def _add_custom_method_input(self, input):
self._logger.info("_add_custom_method_input")
def alarm_status():
""" Shows high and low alarm state for given input
Args:
None
Returns:
high and low alarm state (str, str): "On/Off"
"""
self._logger.info("alarm_status")
return self._alarm_status(input.config.get("channel"))
input.alarm_status = alarm_status
def alarm_reset():
""" Clears both the high and low status of all alarms
Args:
None (though this command does not need the input channel,
we put it here, since alarms are related to the state
on input like for ex. measured temperature above
alarm high-limit etc.)
Returns:
None
"""
self._logger.info("alarm_reset")
self._alarm_reset()
input.alarm_reset = alarm_reset
def _sensor_type(self, channel, type=None, compensation=None):
""" Read or set input type parameters
Args: According to the model, use the appropriate args
......@@ -231,11 +196,6 @@ class LakeShore332(LogMixin):
if type is None:
return self.send_cmd("INTYPE?", channel=channel)
else:
typec, compensationc = self.send_cmd("INTYPE?", channel=channel)
if type is None:
type = typec
if compensation is None:
compensation = compensationc
self.send_cmd("INTYPE", type, compensation, channel=channel)
# Standard OUTPUT-object related method(s)
......@@ -350,11 +310,6 @@ class LakeShore332(LogMixin):
kp, ki, kd = self.send_cmd("PID?", channel=channel).split(",")
return float(kp), float(ki), float(kd)
# Adding CUSTOM LOOP-object related method(s)
# ---------------------------------------------
def _add_custom_method_loop(self, loop):
self._logger.info("_add_custom_method_loop")
# General CUSTOM methods [valid for any type of object:
# input, output, loop]
# -----------------------------------------------------
......@@ -541,6 +496,7 @@ class LakeShore332(LogMixin):
self._logger.debug("values = {0}".format(values))
if "?" in command:
asw = self._comm.write_readline(cmd.encode() + self.eol.encode())
# print("asw = {0}".format(asw.decode()))
return asw.decode()
else:
self._comm.write(cmd.encode() + self.eol.encode())
......
......@@ -12,7 +12,6 @@ yml configuration example:
#controller:
- class: lakeshore335
module: lakeshore.lakeshore335
#eos: '\r\n'
name: lakeshore335
timeout: 3
gpib:
......@@ -54,7 +53,6 @@ yml configuration example:
unit: Kelvin
- name: ls335o_2
channel: 2
unit: Kelvin
ctrl_loops:
- name: ls335l_1
......@@ -65,83 +63,49 @@ yml configuration example:
input: $ls335_B
output: $ls335o_2
channel: 2
"""
import types
import time
import os
import re
# Logging messages
# from bliss.common import log
import logging
# logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(format="%(levelname)s - %(message)s")
# communication
from bliss.comm.gpib import Gpib
import enum
from bliss.common import mapping
from bliss.comm import serial
from bliss.controllers.temperature.lakeshore.lakeshore import Base
class LakeShore335(object):
MODE335 = (
"Off",
"Closed Loop PID",
"Zone",
"Open Loop",
"Monitor Out",
"Warmup Supply",
)
SPUNITS335 = {"Kelvin": 1, "Celsius": 2, "Sensor_unit": 3}
REVSPUNITS335 = {1: "Kelvin", 2: "Celsius", 3: "Sensor_unit"}
CURVEFORMAT335 = {1: "mV/K", 2: "V/K", 3: "Ohms/K", 4: "logOhms/K"}
CURVETEMPCOEF335 = {1: "negative", 2: "positive"}
SENSORTYPE335 = ("Disabled", "Diode", "Platinum RTD", "NTC RTD", "Thermocouple")
IPSENSORUNITS335 = {1: "volts", 2: "ohms"}
HTRSTATUS335 = {0: "OK", 1: "Heater open load", 2: "Heater short"}
INPUT335 = {"None": 0, "A": 1, "B": 2}
REVINPUT335 = {0: "None", 1: "A", 2: "B"}
def __init__(self, comm_type, url, **kwargs):
self.eos = kwargs.get("eos", "\r\n")
timeout = kwargs.get("timeout", 0.5)
if "gpib" in comm_type:
self._comm = Gpib(
url, pad=kwargs["extra_param"], eos=self.eos, timeout=timeout
)
elif ("serial" or "usb") in comm_type:
baudrate = kwargs.get("extra_param", 9600)
self._comm = serial.Serial(
url,
baudrate=baudrate,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE,
timeout=timeout,
eol=self.eos,
)
elif "tcp" in comm_type:
return RuntimeError(
"Lakeshore 335 model has no ethernet connection-->use USB or gpib"
)
else:
raise RuntimeError("Unknown communication protocol")
from bliss.comm.util import get_interface, get_comm
from bliss.common.logtools import LogMixin
from bliss.controllers.temperature.lakeshore.lakeshore import LakeshoreBase
_last_call = time.time()
# limit number of commands per second
# lakeshore 331 supports at most 20 commands per second
def _send_limit(func):
def f(*args, **kwargs):
global _last_call
delta_t = time.time() - _last_call
if delta_t <= 0.15:
time.sleep(0.15 - delta_t)
try:
return func(*args, **kwargs)
finally:
_last_call = time.time()
return f
class LakeShore335(LogMixin):
UNITS331 = {"Kelvin": 1, "Celsius": 2, "Sensor unit": 3}
REVUNITS331 = {1: "Kelvin", 2: "Celsius", 3: "Sensor unit"}
IPSENSORUNITS331 = {1: "volts", 2: "ohms"}
def __init__(self, comm, **kwargs):
self._comm = comm
self._channel = None
self.log = logging.getLogger(type(self).__name__)
# self.log.setLevel(logging.NOTSET)
# Set initial log level to logging.WARNING = 30
# to get only Warning, Error and Critical messages logged
self.log.setLevel(logging.WARNING)
mapping.register(self, children_list=[self._comm])
self.log.info("__init__")
self._logger.info("__init__")
@property
def eol(self):
return self._comm._eol
# Initialization methods
# ----------------------
......@@ -149,7 +113,7 @@ class LakeShore335(object):
# - Controller
# ----------
def clear(self):
""" Clears the bits in the Status Byte, Standard Event and Operation
"""Clears the bits in the Status Byte, Standard Event and Operation
Event Registers. Terminates all pending operations.
Returns:
None
......@@ -159,30 +123,28 @@ class LakeShore335(object):
# - Input object
# ------------
def _initialize_input(self, input):
self.log.info("_initialize_input")
self._logger.info("_initialize_input")
self._add_custom_method_input(input)
# - Output object
# -------------
def _initialize_output(self, output):
self.log.info("_initialize_output")
self._add_custom_method_output(output)
self._logger.info("_initialize_output")
# self._add_custom_method_output(output)
# - Loop object
# -----------
def _initialize_loop(self, loop):
self.log.info("_initialize_loop")
self._logger.info("_initialize_loop")
self._add_custom_method_loop(loop)
# Get input object channel
ipc = loop.input.config["channel"]
# Get output object channel
opc = loop.output.config["channel"]
# Get output object unit
opu = loop.output.config["unit"]
opu = loop.input.config["unit"]
# Get loop object channel
loop_channel = loop.config["channel"]
self._intype(ipc, unit=self.SPUNITS335[opu])
self._outmode(opc, input=ipc)
self.set_loop_params(loop_channel, input=ipc, unit=opu)
# Standard INPUT-object related method(s)
# ---------------------------------------
......@@ -191,152 +153,42 @@ class LakeShore335(object):
Args:
channel (int): input channel. Valid entries: A or B
scale (str): temperature unit for reading: Kelvin or Celsius
or Sensor_unit
or Sensor_unit (Ohm or Volt)
Returns:
(float): current temperature in Kelvin, Celsius or Sensor_unit
(float): current temperature
"""
self.log.info("read_temperature")
self._channel = channel
self._logger.info("read_temperature")
# Query Input Status before reading temperature
# If status is OK, then read the temperature
asw = int(self.send_cmd("RDGST?"))
asw = int(self.send_cmd("RDGST?", channel=channel))
if asw == 0:
if scale == "Kelvin":
return float(self.send_cmd("KRDG?"))
return float(self.send_cmd("KRDG?", channel=channel))
elif scale == "Celsius":
return float(self.send_cmd("CRDG?"))
return float(self.send_cmd("CRDG?", channel=channel))
elif scale == "Sensor_unit":
return float(self.send_cmd("SRDG?"))
return float(self.send_cmd("SRDG?", channel=channel))
if asw & 16:
self.log.warning("Temperature UnderRange on input %s" % channel)
self._logger.warning("Temperature UnderRange on input %s" % channel)
raise ValueError("Temperature value on input %s is invalid" % channel)
if asw & 32:
self.log.warning("Temperature OverRange on input %s" % channel)
self._logger.warning("Temperature OverRange on input %s" % channel)
raise ValueError("Temperature value on input %s is invalid" % channel)
if asw & 64:
self.log.warning("0 value in sensor units on input %s" % channel)
raise ValueError("Value in sensor units on input %s is invalid" % channel)
self._logger.warning("Temperature in Sensor_unit = 0 on input %s" % channel)
raise ValueError("Temperature in Sensor_unit = 0 on input %s" % channel)
if asw & 128:
self.log.warning("Overrange of value in sensor units on input %s" % channel)
raise ValueError("Value in sensor units on input %s is invalid" % channel)
self._logger.warning(
"Temperature OverRange in Sensor_unit on input %s" % channel
)
raise ValueError(
"Temperature OverRange in Sensor_unit on input %s" % channel
)
# Adding CUSTOM INPUT-object related method(s)
# --------------------------------------------
def _add_custom_method_input(self, input):
self.log.info("_add_custom_method_input")
def curve_used_curve():
""" Get the input curve used
Prints:
curve number (int): 0=none, 1->20 standard, 21->59 user defined curves
curve name (str): limited to 15 characters
curve SN (str): limited to 10 characters (Standard,...)
curve format (int): 1=mV/K, 2=V/K, 3=Ohms/K, 4=logOhms/K
curve temperature limit (float): in Kelvin
curve temperature coefficient (int): 1=negative, 2=positive
"""
self.log.info("curve_used_curve")
return self._curve_used_curve(input.config.get("channel"))
input.curve_used_curve = curve_used_curve
def curve_to_use(crvn=0):
""" Select the curve to be used on in input
Args:
crvn (int): Curve number: 1->20 standard,
21->59 user defined curves
"""
self.log.info("curve_to_use")
self._curve_to_use(input.config.get("channel"), crvn)
input.curve_to_use = curve_to_use
def curve_list_all():
""" List all the curves
Returns:
a row for all the curves from 1 to 59
"""
self.log.info("curve_list_all")
return self._curve_list_all()
input.curve_list_all = curve_list_all
def curve_write(crvn=None, crvfile=None):
""" Write the user curve into the Lakeshore
Args:
crvn (int): The user curve number from 21 to 59
crvfile (str): full file name
Returns:
Status of curve written
"""
self.log.info("curve_write")
return self._curve_write(crvn, crvfile)
input.curve_write = curve_write
def curve_delete(crvn=None):
""" Delete a user curve from the Lakeshore
Args:
crvn (int): The user curve number from 21 to 59
Returns:
None.
"""
self.log.info("curve_delete")
self._curve_delete(crvn)
input.curve_delete = curve_delete
def filter(onoff=None, points=None, window=None):
""" Configure input filter parameters
Args:
onoff (int): 1 = enable, 0 = disable
points (int): specifies how many points the filtering
function uses. Valid values: 2 to 64.
window (int): specifies what percent of full scale reading
limits the filtering function. Reading
changes greater than this percentage reset
the filter. Valid range: 1 to 10%.
Returns:
None
"""
self.log.info("filter")
return self._filter(
input.config.get("channel"), onoff=onoff, points=points, window=window
)
input.filter = filter
def intype(
sensor_type=None, autorange=None, iprange=None, compensation=None, unit=None
):
""" Read/set input type parameters
Args:
sensor_type (int): Valid entries: 0=Disabled, 1=Diode,
2=Platinum RTD, 3=NTC RTD,
4=Thermocouple,
autorange (int): 0=off, 1=on
iprange (int): input range when autorange in off ;
see table 6-8 on page 118 of manual
compensation (int): input compensation. 0=off, 1=on
unit (int): prefered unit for sensor reading AND for the
control setpoint. 1=Kelvin, 2=Celsius, 3=Sensor_unit
Returns:
None if set
sensor_type, autorange, iprange, compensation, unit
"""
self.log.info("intype")
return self._intype(
input.config.get("channel"),
sensor_type=sensor_type,
autorange=autorange,
iprange=iprange,
compensation=compensation,
unit=unit,
)
input.intype = intype
self._logger.info("_add_custom_method_input")
def alarm_status():
""" Shows high and low alarm state for given input
......@@ -345,7 +197,7 @@ class LakeShore335(object):
Returns:
high and low alarm state (str, str): "On/Off"
"""
self.log.info("alarm_status")
self._logger.info("alarm_status")
return self._alarm_status(input.config.get("channel"))
input.alarm_status = alarm_status
......@@ -356,7 +208,7 @@ class LakeShore335(object):
None (though this command does not need the input channel,
we put it here, since alarms are related to the state
on input like for ex. measured temperature above
alarm high-limit etc)
alarm high-limit etc.)
Returns:
None
"""
......
......@@ -124,19 +124,16 @@ class LakeShore340(LogMixin):
# ------------
def _initialize_input(self, input):
self._logger.info("_initialize_input")
self._add_custom_method_input(input)
# - Output object
# -------------
def _initialize_output(self, output):
self._logger.info("_initialize_output")
# self._add_custom_method_output(output)
# - Loop object
# -----------
def _initialize_loop(self, loop):
self._logger.info("_initialize_loop")
self._add_custom_method_loop(loop)
# Get input object channel
ipc = loop.input.config["channel"]
# Get output object unit
......@@ -153,7 +150,7 @@ class LakeShore340(LogMixin):
Args:
channel (int): input channel. Valid entries: A or B
scale (str): temperature unit for reading: Kelvin or Celsius
or Sensor unit (Ohm or Volt)
or Sensor_unit (Ohm or Volt)
Returns:
(float): current temperature
"""
......@@ -185,38 +182,6 @@ class LakeShore340(LogMixin):
"Temperature OverRange in Sensor_unit on input %s" % channel
)
# Adding CUSTOM INPUT-object related method(s)
# --------------------------------------------
def _add_custom_method_input(self, input):
self._logger.info("_add_custom_method_input")
def alarm_status():
""" Shows high and low alarm state for given input
Args:
None
Returns:
high and low alarm state (str, str): "On/Off"
"""
self._logger.info("alarm_status")
return self._alarm_status(input.config.get("channel"))
input.alarm_status = alarm_status
def alarm_reset():
""" Clears both the high and low status of all alarms
Args:
None (though this command does not need the input channel,
we put it here, since alarms are related to the state
on input like for ex. measured temperature above
alarm high-limit etc.)
Returns:
None
"""
self._logger.info("alarm_reset")
return self._alarm_reset(input.config.get("channel"))
input.alarm_reset = alarm_reset
def _sensor_type(
self,
channel,
......@@ -366,11 +331,6 @@ class LakeShore340(LogMixin):
kp, ki, kd = self.send_cmd("PID?", channel=channel).split(",")
return float(kp), float(ki), float(kd)
# Adding CUSTOM LOOP-object related method(s)
# ---------------------------------------------
def _add_custom_method_loop(self, loop):
self._logger.info("_add_custom_method_loop")
# General CUSTOM methods [valid for any type of object:
# input, output, loop]
# -----------------------------------------------------
......@@ -437,6 +397,7 @@ class LakeShore340(LogMixin):
def _alarm_status(self, channel):
""" Shows high and low alarm state for given input
Args:
channel (str): A or B
Returns:
high and low alarm state (str, str): "On/Off"
......@@ -555,7 +516,7 @@ class LakeShore340(LogMixin):
self._logger.debug("values = {0}".format(values))
if "?" in command:
asw = self._comm.write_readline(cmd.encode() + self.eol.encode())
# print("asw = {0}".format(asw))
# print("asw = {0}".format(asw.decode()))
return asw.decode()
else:
self._comm.write(cmd.encode() + self.eol.encode())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment