Commit fff3304c authored by bliss administrator's avatar bliss administrator

temperature: ltr1200

parent 2f989ab9
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
""" Meerstetter thermo-electric controller
(based on LTR1200.py which was made before BLISS existed)
Example of YML configuration:
plugin: temperature
class: ltr1200
host: xxx [can be hostname or IP address]
dev_addr: 1
outputs:
- name: heater
"""
from bliss.controllers.temp import Controller
# TODO: see if need to add Input and Loop
# [for the moment work only with Output object]
from bliss.common.temperature import Output
from bliss.comm import tcp
import struct
# Different debug levels are:
# log.NOTSET = 0, log.DEBUG = 10, log.INFO = 20, log.ERROR=40
import logging
def set_ltr_log_level(level):
level = level.upper()
logging.getLogger('Ltr1200').setLevel(level)
logging.getLogger('ltr1200').setLevel(level)
set_ltr_log_level('debug')
#from bliss.common.utils import object_method, object_method_type
from bliss.common.utils import object_attribute_get, object_attribute_type_get
#from bliss.common.utils import object_attribute_set, object_attribute_type_set
import mecom
######################################################################
##################### ######################
##################### LTR1200 Low-Level Class ######################
##################### ######################
######################################################################
class Ltr1200(object):
"""
Low-level class which takes care of all the communication
with the hardware with the help of other classes which
implement MeCom protocol
"""
def __init__(self, host=None, dev_addr=1, timeout=10, debug=False):
self.host = host
self.dev_addr = dev_addr
self.timeout = timeout
# Port is always 50000 for Meerstetter TEC controller
self._sock = tcp.Socket(self.host, 50000, self.timeout)
self._tec = mecom.TECFamilyProtocol(self._sock, self.dev_addr)
self.log = logging.getLogger('Ltr1200')
self.log.info("__init__: %s %s %d" % (host, self._sock, dev_addr))
def exit(self):
self._sock.close()
def init(self):
self.log.info("init()")
self.model = self._tec.getModel()
self.log.debug("init(): Model = %s" %(self.model))
# TODO: see what else could add here i.e. which other
# operations/actions would be suitable.
def getModel(self):
self.log.info("getModel()")
#self.model = self._tec.putget("?IF",4)
self.model = self._tec.getModel()
self.log.debug("getModel: %s" %(self.model))
return self.model
def getObjectTemperature(self,instance):
self.log.info("getObjectTemperature(): instance = %d" %(instance))
answer = self._tec._getParameter (1000, 8, instance)
#answer = self._tec._getParameter (1000, 1, instance)
if answer is not None:
answer = struct.unpack(">f", answer.decode("hex"))[0]
self.log.debug("getObjectTemperature: temp = %s" % answer)
return answer
def getSinkTemperature(self,instance):
self.log.info("getSinkTemperature(): instance = %d" %(instance))
answer = self._tec._getParameter (1001, 8, instance)
if answer is not None:
answer = struct.unpack(">f", answer.decode("hex"))[0]
self.log.debug("getSinkTemperature: temp = %s" % answer)
return answer
def getTargetTemperature(self,instance):
self.log.info("getTargetTemperature(): instance = %d" %(instance))
answer = self._tec._getParameter (1010, 8, instance)
if answer is not None:
answer = struct.unpack(">f", answer.decode("hex"))[0]
self.log.debug("getTargetTemperature: temp = %s" % answer)
return answer
def setTargetTemperature(self,value,instance):
self.log.info("setTargetTemperature(): instance = %d, value = %f" %(instance,value))
answer = self._tec._setParameter (3000, value, instance)
self.log.debug("setTargetTemperature: %s" % answer) # ACK
return answer
def getOutputCurrent(self,instance):
self.log.info("getOutputCurrent(): instance = %d" %(instance))
answer = self._tec._getParameter (1020, 8, instance)
if answer is not None:
answer = struct.unpack(">f", answer.decode("hex"))[0]
self.log.debug("getOutputCurrent: current = %s" % answer)
return answer
def getDriverStatus(self,instance):
self.log.info("getDriverStatus()")
answer = self._tec._getParameter (1080, 8, instance)
description = ["Init","Ready","Run","Error","Bootloader","Device will Reset within 200ms"]
if answer is not None:
answer=description[int(answer)]
self.log.debug("getDriverStatus: status = %s" % answer)
return answer
def ResetDevice(self):
self.log.info("ResetDevice()")
self._tec.putget("RS")
def EmergencyStop(self):
self.log.info("EmergencyStop()")
self._tec.putget("ES")
######################################################################
##################### ######################
##################### LTR1200 Controller Class ######################
##################### ######################
######################################################################
class ltr1200(Controller):
def __init__(self, config, *args):
if 'host' not in config:
raise RuntimeError("Should have host name or IP address in config")
host = config["host"]
if 'dev_addr' not in config:
dev_addr = 1
else:
dev_addr = config["dev_addr"]
self._ltr1200 = Ltr1200(host, dev_addr)
Controller.__init__(self, config, *args)
self.log = logging.getLogger('ltr1200')
self.log.info("__init__: %s %d" % (host, dev_addr))
def initialize(self):
###config = dict(self.config) -- to use if no __init__()
###print 'HELLO'
self._ltr1200.init()
def __del__(self):
self._ltr1200.exit()
# Remark: In various calls here below use 1 for the instance parameter
# though this is its default value and so it could be omitted
# in the parameter list.
# TODO: see how can pass instance if want it to be different from 1
def read_output(self, toutput):
self.log.info("read_output()")
obj_temp = self._ltr1200.getObjectTemperature(1)
self.log.debug("Object temperature = %f C" % obj_temp)
return obj_temp
# set Set Point Temperature
def set(self, toutput, sp, **kwargs):
self.log.info("set() = set SP temperature: %f C" % sp)
self._ltr1200.setTargetTemperature(sp,1)
# get Set Point Temperature
def get_setpoint(self, toutput):
self.log.info("get_setpoint() = get SP temperature")
sp_temp = self._ltr1200.getTargetTemperature(1)
self.log.debug("SP temperature = %f C" % sp_temp)
return sp_temp
def state_output(self, toutput):
self.log.info("state_output()")
out_state = self._ltr1200.getDriverStatus(1)
self.log.debug("driver status = %s" % out_state)
return out_state
# Remark:
# =======
# For reading/getting entities like:
# - Model (= Firmware ID string),
# - Get Sink Temperature,
# - Get Output Current,
# - etc.
# must use CUSTOM commands/attributes.
@object_attribute_type_get(type_info=("str"), type=Output)
def get_model(self, toutput):
self.log.info("get_model(= firmware identification string)")
model = self._ltr1200.getModel()
self.log.debug("Firmware id string = %s" % model)
return model
@object_attribute_type_get(type_info=("float"), type=Output)
def get_sink_temperature(self, toutput):
self.log.info("get_sink_temperature: ")
sink_temp = self._ltr1200.getSinkTemperature(1)
self.log.debug("sink_temperature = %f C" % sink_temp)
return sink_temp
@object_attribute_type_get(type_info=("float"), type=Output)
def get_output_current(self, toutput):
self.log.info("get_output_current: ")
op_current = self._ltr1200.getOutputCurrent(1)
self.log.debug("output_current = %f A" % op_current)
return op_current
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
""" Meerstetter communication protocol related
"""
from bliss.comm import tcp
import gevent
import struct
import time
# Different debug levels are:
# log.NOTSET = 0, log.DEBUG = 10, log.INFO = 20, log.ERROR=40
import logging
def set_mecom_log_level(level):
level = level.upper()
logging.getLogger('MeComProtocol').setLevel(level)
logging.getLogger('TECFamilyProtocol').setLevel(level)
set_mecom_log_level('debug')
######################################################################
########################### ###########################
########################### MeCOM PROTOCOL ###########################
########################### ###########################
######################################################################
#
# Frame Fields:
# ------------
# 8 bits: control source field ("!" for input, "#" for output)
# 16 bits: device address
# 32 bits: random sequence identifier
# N * 8 bits: client command (so called payload)
# 32 bits: frane CRC checksum
# 8 bits: frame terminator \r (eof = end of frame)
#
######################################################################
class MeComProtocol(object):
def __init__ (self, sock_comm, dev_addr):
self.sequence = 0
self._sock = sock_comm
self.dev_addr = dev_addr
self.log = logging.getLogger('MeComProtocol')
self.log.info("__init__: %s %d" %(sock_comm,dev_addr))
def PutGet (self, cmd, anslen, eof):
#self.log.info("PutGet: %s, %d, %c" %(cmd,anslen,eof))
self.log.info("PutGet: %s, %d, %r" %(cmd,anslen,eof))
frame = self.FrameConstruction (cmd, eof)
try:
return self._PutGet (frame, cmd, anslen, eof)
except AssertionError,e:
self.log.error("PutGet: Device communication assertion error: %s" %(e))
except tcp.SocketTimeout:
self.log.error("PutGet: Socket communication timed out")
except gevent.socket.error, e:
self.log.error("PutGet: Socket communication error: %s" %(e))
#except RunTimeError, e:
# log.error("MeComProtocol::PutGet: Runtime error: %s" %(e))
def _PutGet (self, frame, cmd, anslen, eof):
self.log.info("_PutGet: Frame = %s" % frame)
self.log.info("_PutGet: cmd = %s" % cmd)
self.log.info("_PutGet: anslen = %d" % anslen)
self.log.info("_PutGet: eof = %r" % eof)
_error=["Unknown error",
"Command not available",
"Device is busy",
"General communication error",
"Format error",
"Parameter is not available",
"Parameter is read only",
"Value out of range",
"Instance not available",
]
self._sock.flush()
time.sleep(0.1)
self._sock.write(frame)
read_buffer = self._sock.raw_read()
self.log.debug("_PutGet: Read buffer = %r " % read_buffer)
if read_buffer == '':
self.log.error("_PutGet: Socket connection broken")
raise RuntimeError("MeComProtocol::_PutGet: Socket connection broken")
answer = filter(None, read_buffer.split("\n"))
if cmd != "?IF" and len(answer) > 1:
self._sock.flush()
self._sock.write(frame)
read_buffer = self._sock.raw_read()
self.log.debug("_PutGet: Read buffer of the second write/raw_read = %r " % read_buffer)
answer = filter(None, read_buffer.split("\n"))
self.log.debug("_PutGet: length (nb of lines) of answer = %d " % len(answer))
for ii in range (len(answer)):
self.log.debug("_PutGet: %d %s" % (ii+1, answer[ii]))
resp = frame[:7].replace("#","!")
self.log.debug("_PutGet: 1st 7 char of frame with ! as 1st char: %s" % resp)
if answer[ii].startswith(resp):
if answer[ii][7]=="+":
err = answer[ii][8:10]
self.log.debug("_PutGet:Error: %s", err)
self.log.debug("_PutGet:Error: %d", int(err))
self.log.debug("_PutGet:Error: %s", _error[int(err)])
else:
if cmd[0]=='?': # query commands
self.log.debug("_PutGet: It is a query command")
assert (len(answer[ii])==(12+anslen)) , "answer length not expected."
answ = answer[ii][7:anslen+7]
blacrc = self._CRC16Algorithm (resp+answ)
assert(answer[ii].endswith(("%04x%s"%(blacrc,eof)).upper())), "answer end not expected."
self.log.debug("_PutGet: %s", answ)
return answ
else: # set commands
assert(answer[ii].endswith(("%04x%s"%(self.CRC,eof)).upper())), "answer end not expected."
self.log.debug("_PutGet: ACK")
return "ACK"
self.log.debug("_PutGet: NAK")
return "NAK"
def FrameConstruction (self, payload, eof):
self.log.info("FrameConstruction: %s, %r" %(payload,eof))
frame = []
try:
frame.extend("%02x"%(self.dev_addr))
frame.extend("%04x"%(self._AssignSequenceNumber ()))
frame.extend(payload)
frame.insert (0,"#")
self.CRC = self._CRC16Algorithm (frame)
if self.CRC>0xffff:
self.log.error("FrameConstruction: too large numeric CRC: %x" %(self.CRC))
raise RuntimeError, "too large numeric CRC: %x."%(self.CRC)
frame.extend ("%04x%s"%(self.CRC,eof))
except RuntimeError,e:
self.log.error("FrameConstruction ERROR %s" %(e))
finally:
pass
#self.log.debug("FrameConstruction %s" %(frame))
self.log.debug("FrameConstruction %r" %(frame))
return "".join(frame).upper()
def _CRC16Algorithm (self, frame):
self.log.info("_CRC16Algorithm %s" %(frame))
crc = 0
genpoly = 0x1021
for c in frame:
c=ord(c.upper())
c2=(c&0x00ff)<<8
crc=crc^c2
for i in range(8):
if (crc&0x8000):
crc=(crc<<1)^genpoly
else:
crc=crc<<1
crc &= 0xffff
self.log.debug("_CRC16Algorithm %04x" %(crc))
return crc
def _AssignSequenceNumber (self):
if (self.sequence<65534):
self.sequence += 1
else:
self.sequence = 0
self.log.debug("_AssignSequenceNumber %d" %(self.sequence))
return self.sequence
######################################################################
############################ ############################
############################ TEC Family ############################
############################ ############################
######################################################################
class TECFamilyProtocol(object):
def __init__ (self, sock_comm, dev_addr):
self.mecom = MeComProtocol(sock_comm, dev_addr)
self.log = logging.getLogger('TECFamilyProtocol')
self.log.info("__init__: %s %d" % (sock_comm, dev_addr))
def putget(self, command, anslen=0, EOF='\r'):
self.log.info("putget")
self.log.debug("putget: cmd = %s, anslen = %d " % (command,anslen))
ret = self.mecom.PutGet(command,anslen,EOF)
return ret
# getModel = get Firmware Identification String
def getModel (self):
self.log.info("getModel")
self.model = self.putget("?IF",20)
self.log.debug("getModel: %s" %(self.model))
return self.model
def _getParameter (self, id, anslen, instance=1):
self.log.debug("_getParameter %04x %d %02x" % (id ,anslen, instance))
if id>0xffff:
raise RuntimeError, "wrong parameter id: %x."%(id)
if instance>0xff:
raise RuntimeError, "wrong parameter instance: %x."%(instance)
payload = ["?","V","R"]
payload.extend("%04x"%(id))
payload.extend("%02x"%(instance))
self.log.debug("_getParameter payload %r" % payload)
answer = self.putget("".join(payload),anslen)
self.log.debug("_getParameter: %s" %(answer))
return answer
def _setParameter (self, id, parameter, instance = 1):
self.log.info("_setParameter %04x %04x %02x" % (id, parameter, instance))
if id>0xffff:
raise RuntimeError, "wrong parameter id: %x."%(id)
if instance>0xff:
raise RuntimeError, "wrong parameter instance: %x."%(instance)
payload = ["V","S"]
payload.extend("%04x"%(id))
payload.extend("%02x"%(instance))
parameter=struct.pack(">f",parameter).encode("hex")
payload.extend("%s"%(parameter))
answer = self.putget(payload)
self.log.debug("_getParameter %s" % (answer))
return answer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment