Commit 1a35ed66 authored by Cyril Guilloud's avatar Cyril Guilloud Committed by Sebastien Petitdemange
Browse files

PI E753 + PI E51X python3 conversion + tests + doc

parent 3c3d0e6c
......@@ -33,7 +33,8 @@ class PI_E517(PI_E51X):
PI_E51X.__init__(self, *args, **kwargs)
def _get_cto(self, axis):
_ans = self.sock.write_readlines("CTO?\n", 24)
# 24 also for 518 ????
_ans = [bs.decode() for bs in self.comm.write_readlines(b"CTO?\n", 24)]
return _ans
"""
......
......@@ -32,7 +32,8 @@ class PI_E518(PI_E51X):
PI_E51X.__init__(self, *args, **kwargs)
def _get_cto(self, axis):
_ans = self.sock.write_readlines("CTO?\n", 24) # 24 also for 518 ????
# 24 also for 518 ????
_ans = [bs.decode() for bs in self.comm.write_readlines(b"CTO?\n", 24)]
return _ans
"""
......
......@@ -49,13 +49,11 @@ class PI_E51X(Controller):
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
self.sock = pi_gcs.get_pi_comm(self.config, TCP)
self.comm = pi_gcs.get_pi_comm(self.config, TCP)
def finalize(self):
"""
Closes the controller socket.
"""
self.sock.close()
def close(self):
if self.comm is not None:
self.comm.close()
def initialize_axis(self, axis):
"""
......@@ -81,9 +79,6 @@ class PI_E51X(Controller):
"""end of move event"""
event.connect(axis, "move_done", self.move_done_event_received)
# Enables the closed-loop.
# self.sock.write("SVO 1 1\n")
self.send_no_ans(axis, "ONL %d 1" % axis.channel)
# VCO for velocity control mode ?
......@@ -222,43 +217,58 @@ class PI_E51X(Controller):
"""
self.send_no_ans(axis, "HLT %s" % axis.chan_letter)
# self.sock.write("STP\n")
# self.comm.write(b"STP\n")
"""
Communication
Raw communication commands
"""
# def flush(self, axis):
# self.sock.flush()
def raw_write(self, cmd):
self.send_no_ans(self.ctrl_axis, cmd)
"""
- <cmd> must be 'str'
"""
self.comm.write(cmd.encode())
def raw_write_read(self, cmd):
return self.send(self.ctrl_axis, cmd)
def send(self, axis, cmd):
"""
- Adds the 'newline' terminator character : "\\\\n"
- Sends command <cmd> to the PI E51X controller.
- Channel is defined in <cmd>.
- <axis> is passed for debugging purposes.
- Returns answer from controller.
- <cmd> must be 'str'
- Returns 'str'
"""
return self.comm.write_readline(cmd.encode()).decode()
Args:
- <axis> : passed for debugging purposes.
- <cmd> : GCS command to send to controller (Channel is already mentionned in <cmd>).
def raw_write_readlines(self, cmd, lines):
"""
- Adds '\n' terminator to <cmd> string
- Sends <cmd> string to the controller and read back <lines> lines
- <cmd>: 'str'
- <lines>: 'int'
"""
_cmd = cmd.encode() + b"\n"
return "\n".join(self.comm.write_readlines(_cmd, lines).decode())
Returns:
- 1-line answer received from the controller (without "\\\\n" terminator).
"""
E51x communications
"""
def send(self, axis, cmd):
"""
- Converts <cmd> into 'bytes' and sends it to controller.
- Adds terminator to <cmd> string.
- <axis> is passed for debugging purposes.
- Channel must be defined in <cmd>.
- Returns the answer from controller.
- Type of <cmd> must be 'str'.
- Type of returned string is 'str'.
"""
_cmd = cmd + "\n"
# print( f"SEND: {cmd}")
_cmd = cmd.encode() + b"\n"
_t0 = time.time()
# PC
_ans = "toto"
_ans = self.sock.write_readline(_cmd)
_ans = self.comm.write_readline(_cmd).decode()
# "\n" in answer has been removed by tcp lib.
# print( f"RECV: {_ans}")
_duration = time.time() - _t0
if _duration > 0.005:
elog.info(
......@@ -266,7 +276,7 @@ class PI_E51X(Controller):
% (_ans, _cmd, _duration * 1000)
)
# self.check_error(axis)
self.check_error(_cmd)
return _ans
......@@ -278,16 +288,18 @@ class PI_E51X(Controller):
- <axis> is passed for debugging purposes.
- Used for answer-less commands, thus returns nothing.
"""
_cmd = cmd + "\n"
self.sock.write(_cmd)
# self.check_error(axis)
_cmd = cmd.encode() + b"\n"
self.comm.write(_cmd)
self.check_error(axis)
def check_error(self):
# Check error code
def check_error(self, command):
"""
- Checks error code
- <command> : 'str' : displayed in case of error
"""
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, cmd))
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, command))
"""
E51X specific
......@@ -303,7 +315,9 @@ class PI_E51X(Controller):
Raises:
?
"""
_ans = self.sock.write_readlines("POS?\n", 3)
_bs_ans = self.comm.write_readlines(b"POS?\n", 3)
_ans = [bs.decode() for bs in _bs_ans]
_pos = list(map(float, [x[2:] for x in _ans]))
return _pos
......@@ -323,35 +337,18 @@ class PI_E51X(Controller):
"""
if self.closed_loop:
# _ans = self.send(axis, "MOV? %s" % axis.chan_letter)
_ans = self.sock.write_readlines("MOV?\n", 3)
_bs_ans = self.comm.write_readlines(b"MOV?\n", 3)
else:
# _ans = self.send(axis, "SVA? %s" % axis.chan_letter)
_ans = self.sock.write_readlines("SVA?\n", 3)
_bs_ans = self.comm.write_readlines(b"SVA?\n", 3)
_ans = [bs.decode() for bs in _bs_ans]
# _pos = float(_ans[2:])
_pos = list(map(float, [x[2:] for x in _ans]))
return _pos
def _get_cto(self, axis):
"""
sss
"""
_ans = self.sock.write_readlines("CTO?\n", 24)
return _ans
"""
CTO?
1 1=+0000.1000 ???
1 2=1 ???
1 3=3 trigger mode
1 4=0 ???
1 5=+0000.0000 min threshold
1 6=+0001.0000 max threshold
1 7=1 polarity
1 12=1 ???
...
"""
def _get_low_limit(self, axis):
_ans = self.send(axis, "NLM? %s" % axis.chan_letter)
# A=+0000.0000
......@@ -483,21 +480,27 @@ class PI_E51X(Controller):
self.send_no_ans(axis, _cmd)
def get_id(self, axis):
def _get_error(self):
"""
Returns Identification information.
- Checks error code
- <command> : 'bytes' : Previous command string displayed in case of error
"""
return self.send(axis, "*IDN?")
def _get_error(self):
# Does not use send() to be able to call _get_error in send().
_error_number = int(self.sock.write_readline("ERR?\n"))
_error_number = int(self.comm.write_readline(b"ERR?\n").decode())
_error_str = pi_gcs.get_error_str(int(_error_number))
_error_str = pi_gcs.get_error_str(_error_number)
return (_error_number, _error_str)
"""
ID/INFO
"""
def get_id(self, axis):
"""
- Returns a 'str' string.
"""
return self.send(axis, "*IDN?")
def get_info(self, axis):
"""
Returns a set of usefull information about controller.
......@@ -507,8 +510,6 @@ class PI_E51X(Controller):
<axis> : bliss axis
Returns:
None
Raises:
?
"""
_infos = [
("Identifier ", "*IDN?"),
......@@ -539,19 +540,22 @@ class PI_E51X(Controller):
("Digital filter order ", "SPA? %s 0x05000002" % axis.channel),
]
_txt = ""
(error_nb, err_str) = self._get_error()
_txt = ' ERR nb=%d : "%s"\n' % (error_nb, err_str)
# Reads pre-defined infos (1 line answers)
for i in _infos:
_txt = _txt + " %s %s\n" % (i[0], self.send(axis, i[1]))
_txt = _txt + " %s %s\n" % (i[0], self.send(axis, (i[1])))
# time.sleep(0.01)
_txt = _txt + " %s \n%s\n" % (
# Reads multi-lines infos.
_ans = [bs.decode() for bs in self.comm.write_readlines(b"IFC?\n", 6)]
_txt = _txt + " %s \n%s\n" % (
"Communication parameters",
"\n".join(self.sock.write_readlines("IFC?\n", 6)),
"\n".join(_ans),
)
_txt = _txt + " %s \n%s\n" % (
"Firmware version",
"\n".join(self.sock.write_readlines("VER?\n", 3)),
)
_ans = [bs.decode() for bs in self.comm.write_readlines(b"VER?\n", 3)]
_txt = _txt + " %s \n%s\n" % ("Firmware version", "\n".join(_ans))
return _txt
......@@ -2,12 +2,13 @@
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Copyright (c) 2014-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.controllers.motor import Controller
from bliss.common import log as elog
from bliss.common.utils import object_method
from bliss.common.utils import object_attribute_get, object_attribute_set
from bliss.common.axis import AxisState
......@@ -20,7 +21,7 @@ import time
"""
Bliss controller for ethernet PI E753 piezo controller.
Cyril Guilloud ESRF BLISS 2014-2016
Model PI E754 should be compatible.. to be tested.
"""
......@@ -28,7 +29,8 @@ class PI_E753(Controller):
def __init__(self, *args, **kwargs):
Controller.__init__(self, *args, **kwargs)
self.cname = "E753"
# Used by axis in info string.
self.ctrl_name = "PI-E753"
# Init of controller.
def initialize(self):
......@@ -38,31 +40,18 @@ class PI_E753(Controller):
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
self.sock = pi_gcs.get_pi_comm(self.config, TCP)
self.comm = pi_gcs.get_pi_comm(self.config, TCP)
def finalize(self):
"""
Closes the controller socket.
"""
# not called at end of device server ??? :(
# called on a new axis creation ???
def close(self):
if self.comm is not None:
self.comm.close()
if self.sock:
self.sock.close()
# Init of each axis.
def initialize_axis(self, axis):
elog.debug("axis initialization")
## To purge controller.
# try:
# self.sock._raw_read()
# except:
# pass
# Enables the closed-loop.
# Can be dangerous ??? test diff between target and position before ???
self._set_closed_loop(axis, True)
# self._set_closed_loop(axis, True)
def initialize_encoder(self, encoder):
pass
......@@ -93,6 +82,9 @@ class PI_E753(Controller):
return self._get_velocity(axis)
def set_velocity(self, axis, new_velocity):
"""
- <new_velocity>: 'float'
"""
elog.debug("set_velocity new_velocity = %f" % new_velocity)
self.send_no_ans(axis, "VEL 1 %f" % new_velocity)
......@@ -107,7 +99,8 @@ class PI_E753(Controller):
else:
return AxisState("MOVING")
else:
raise RuntimeError("closed loop disabled")
# open-loop => always ready.
return AxisState("READY")
""" MOVEMENTS """
......@@ -115,50 +108,105 @@ class PI_E753(Controller):
pass
def start_one(self, motion):
elog.debug("start_one target_pos = %f" % motion.target_pos)
self.send_no_ans(motion.axis, "MOV 1 %g" % motion.target_pos)
"""
- Sends 'MOV' or 'SVA' depending on closed loop mode.
Args:
- <motion> : Bliss motion object.
Returns:
- None
"""
if self._get_closed_loop_status(motion.axis):
# Command in position.
self.send_no_ans(motion.axis, "MOV 1 %g" % motion.target_pos)
else:
# Command in voltage.
self.send_no_ans(motion.axis, "SVA 1 %g" % motion.target_pos)
def stop(self, axis):
# to check : copy of current position into target position ???
"""
* HLT -> stop smoothly
* STP -> stop asap
* 24 -> stop asap
* to check : copy of current position into target position ???
"""
self.send_no_ans(axis, "STP")
""" COMMUNICATIONS"""
def send(self, axis, cmd):
_cmd = cmd + "\n"
"""
- Converts <cmd> into 'bytes' and sends it to controller.
- Adds terminator to <cmd> string.
- <axis> is passed for debugging purposes.
- Type of <cmd> must be 'str'.
- Type of returned string is 'str'.
"""
_cmd = cmd.encode() + b"\n"
_t0 = time.time()
_ans = self.sock.write_readline(_cmd)
_ans = self.comm.write_readline(_cmd).decode()
# "\n" in answer has been removed by tcp lib.
# self.check_error()
_duration = time.time() - _t0
if _duration > 0.005:
elog.info(
"PI_E51X.py : Received %r from Send %s (duration : %g ms) "
% (_ans, _cmd, _duration * 1000)
)
return _ans
self.check_error(_cmd)
def check_error(self):
# Check error code
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, cmd))
return _ans
def send_no_ans(self, axis, cmd):
_cmd = cmd + "\n"
self.sock.write(_cmd)
# self.check_error()
"""
- Sends <cmd> command to controller.
- Adds terminator to <cmd> string.
- Channel is already defined in <cmd>.
- Type of <cmd> must be 'str'.
- Used for answer-less commands, thus returns nothing.
"""
_cmd = cmd.encode() + b"\n"
self.comm.write(_cmd)
self.check_error(_cmd)
""" RAW COMMANDS """
def check_error(self, command):
"""
- Checks error code
- <command> : 'bytes' : string displayed in case of error
"""
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, command))
def raw_write(self, com):
self.sock.write(com)
"""
Raw communication commands
"""
def raw_write_read(self, com):
return self.sock.write_readline(com)
def raw_write(self, cmd):
"""
- <cmd> must be 'str'
"""
self.comm.write(cmd.encode())
def raw_write_readlines(self, com, lines):
return "\n".join(self.sock.write_readlines("%s\n" % com, lines))
def raw_write_read(self, cmd):
"""
- <cmd> must be 'str'
- Returns 'str'
"""
return self.comm.write_readline(cmd.encode()).decode()
def get_identifier(self, axis):
return self.send(axis, "IDN?")
def raw_write_readlines(self, cmd, lines):
"""
- Adds '\n' terminator to <cmd> string
- Sends <cmd> string to the controller and read back <lines> lines
- <cmd>: 'str'
- <lines>: 'int'
"""
_cmd = cmd.encode() + b"\n"
return "\n".join(self.comm.write_readlines(_cmd, lines).decode())
"""
E753 specific
......@@ -166,7 +214,7 @@ class PI_E753(Controller):
def get_voltage(self, axis):
""" Returns voltage read from controller."""
_ans = self.send(axis, "SVA?")
_ans = self.send(axis, "SVA? 1")
_voltage = float(_ans[2:])
return _voltage
......@@ -178,38 +226,41 @@ class PI_E753(Controller):
"""
Returns velocity taken from controller.
"""
_ans = self.send(axis, "VEL?")
_ans = self.send(axis, "VEL? 1")
_velocity = float(_ans.split("=")[1])
return _velocity
def _get_pos(self):
"""
Returns real position read by capacitive sensor.
no axis parameter as _get_pos is used by encoder.... can be a problem???
- no axis parameter as _get_pos is used by encoder.... can be a problem???
- Returns a 'float': real position read by capacitive sensor.
"""
_ans = self.sock.write_readline("POS?\n")
_ans = self.comm.write_readline(b"POS?\n").decode()
# _ans should looks like "1=-8.45709419e+01\n"
# "\n" removed by tcp lib.
_pos = float(_ans[2:])
return _pos
"""ON TARGET """
def _get_target_pos(self, axis):
"""
Returns last target position (setpoint value).
Returns last target position (MOV?/SVA?/VOL? command) (setpoint value).
- SVA? : Query the commanded output voltage (voltage setpoint).
- VOL? : Query the current output voltage (real voltage).
- MOV? : Returns the last valid commanded target position.
"""
_ans = self.send(axis, "MOV?")
# _ans should looks like "1=-8.45709419e+01"
_pos = float(_ans[2:])
if self._get_closed_loop_status(axis):
_ans = self.send(axis, "MOV? 1")
# _ans should looks like "1=-8.45709419e+01"
_pos = float(_ans[2:])
else:
_ans = self.send(axis, "SVA? 1")
# _ans should looks like "???"
_pos = float(_ans[2:])
return _pos
def _get_on_target_status(self, axis):
_ans = self.send(axis, "ONT?")
_ans = self.send(axis, "ONT? 1")
_status = _ans.split("=")[1]
......@@ -224,7 +275,7 @@ class PI_E753(Controller):
""" CLOSED LOOP"""
def _get_closed_loop_status(self, axis):
_ans = self.send(axis, "SVO?")
_ans = self.send(axis, "SVO? 1")
_status = _ans.split("=")[1]
......@@ -250,9 +301,20 @@ class PI_E753(Controller):
def close_loop(self, axis):
self._set_closed_loop(axis, True)
@object_attribute_get(type_info="bool")
def get_closed_loop(self, axis):
return self._get_closed_loop_status(axis)
@object_attribute_set(type_info="bool")
def set_closed_loop(self, axis, value):
print("set_closed_loop DISALBED FOR SECURITY ... ")
# self._set_closed_loop(axis, True)
def _get_error(self):
# Does not use send() to be able to call _get_error in send().
_error_number = int(self.sock.write_readline("ERR?\n"))
"""
- Does not use send() to be able to call _get_error in send().
"""
_error_number = int(self.comm.write_readline(b"ERR?\n").decode())
_error_str = pi_gcs.get_error_str(int(_error_number))
return (_error_number, _error_str)
......@@ -261,6 +323,16 @@ class PI_E753(Controller):
print("????????? PI_E753.py received _stop ???")
self.send_no_ans(axis, "STP")
"""
ID/INFO
"""
def get_id(self, axis):
"""