Commit b78183ad authored by Sebastien Petitdemange's avatar Sebastien Petitdemange Committed by Cyril Guilloud
Browse files

pie_753: use the base code for communication and recorder.

parent abdf06b4
......@@ -10,7 +10,6 @@ from bliss.common.utils import object_method
from bliss.common.utils import object_attribute_get, object_attribute_set
from bliss.common.axis import AxisState
from bliss.common.logtools import log_debug, log_debug_data, log_info, log_warning
from bliss import global_map
from . import pi_gcs
from bliss.comm.util import TCP
......@@ -25,8 +24,10 @@ Model PI E754 should be compatible.. to be tested.
"""
class PI_E753(Controller):
class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
def __init__(self, *args, **kwargs):
pi_gcs.Communication.__init__(self)
pi_gcs.Recorder.__init__(self)
Controller.__init__(self, *args, **kwargs)
# Init of controller.
......@@ -34,14 +35,14 @@ class PI_E753(Controller):
"""
Controller intialization: open a single socket for all 3 axes.
"""
self.com_initialize()
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
self.comm = pi_gcs.get_pi_comm(self.config, TCP)
# Check model.
try:
idn_ans = self.comm.write_readline(b"*IDN?\n").decode()
idn_ans = self.command("*IDN?")
log_info(self, "IDN?: %s", idn_ans)
if idn_ans.find("E-753") > 0:
self.model = "E-753"
......@@ -53,11 +54,6 @@ class PI_E753(Controller):
self.model = "UNKNOWN"
log_debug(self, "model=%s", self.model)
global_map.register(self, children_list=[self.comm])
def close(self):
if self.comm is not None:
self.comm.close()
def initialize_axis(self, axis):
log_debug(self, "axis initialization")
......@@ -65,6 +61,7 @@ class PI_E753(Controller):
# Enables the closed-loop.
# Can be dangerous ??? test diff between target and position before ???
# self._set_closed_loop(axis, True)
self._add_recoder_enum_on_axis(axis)
def initialize_encoder(self, encoder):
pass
......@@ -102,7 +99,7 @@ class PI_E753(Controller):
- <new_velocity>: 'float'
"""
log_debug(self, "set_velocity new_velocity = %f" % new_velocity)
self.send_no_ans(axis, "VEL 1 %f" % new_velocity)
self.command("VEL 1 %f" % new_velocity)
return self.read_velocity(axis)
......@@ -138,10 +135,10 @@ class PI_E753(Controller):
"""
if self._get_closed_loop_status(motion.axis):
# Command in position.
self.send_no_ans(motion.axis, "MOV 1 %g" % motion.target_pos)
self.command("MOV 1 %g" % motion.target_pos)
else:
# Command in voltage.
self.send_no_ans(motion.axis, "SVA 1 %g" % motion.target_pos)
self.command("SVA 1 %g" % motion.target_pos)
def stop(self, axis):
"""
......@@ -150,86 +147,7 @@ class PI_E753(Controller):
* 24 -> stop asap
* to check : copy of current position into target position ???
"""
self.send_no_ans(axis, "STP")
""" COMMUNICATIONS"""
def send(self, axis, cmd):
"""
- Converts <cmd> into 'bytes' and sends it to controller.
- Adds terminator to <cmd> string.
- <axis> is passed for debugging purposes.
- Type of <cmd> must be 'str'.
- Type of returned string is 'str'.
"""
log_debug_data(self, "SEND", cmd)
_cmd = cmd.encode() + b"\n"
_t0 = time.time()
_ans = self.comm.write_readline(_cmd).decode()
# "\n" in answer has been removed by tcp lib.
_duration = time.time() - _t0
if _duration > 0.005:
log_info(
self,
"PI_E753.py : Received %r from Send %s (duration : %g ms)",
_ans,
_cmd,
_duration * 1000,
)
self.check_error(_cmd)
return _ans
def send_no_ans(self, axis, cmd):
"""
- Sends <cmd> command to controller.
- Adds terminator to <cmd> string.
- Channel is already defined in <cmd>.
- Type of <cmd> must be 'str'.
- Used for answer-less commands, thus return nothing.
"""
log_debug_data(self, "SEND_NO_ANS", cmd)
_cmd = cmd.encode() + b"\n"
self.comm.write(_cmd)
self.check_error(_cmd)
def check_error(self, command):
"""
- Checks error code
- <command> : 'bytes' : string displayed in case of error
"""
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, command))
"""
Raw communication commands.
To encode/decode and to be exported in Tango DS.
"""
def raw_write(self, cmd):
"""
- <cmd> must be 'str'
"""
self.comm.write(cmd.encode())
def raw_write_read(self, cmd):
"""
- <cmd> must be 'str'
- Return 'str'
"""
return self.comm.write_readline(cmd.encode()).decode()
def raw_write_readlines(self, cmd: str, lines: int):
"""
- Add '\\n' terminator to `cmd` string
- Send `cmd` string to the controller and read back `lines` lines
"""
_cmd = cmd.encode() + b"\n"
return "\n".join(self.comm.write_readlines(_cmd, lines).decode())
self.command("STP")
"""
E753 specific
......@@ -238,40 +156,29 @@ class PI_E753(Controller):
@object_method(types_info=("None", "float"))
def get_voltage(self, axis):
""" Return voltage read from controller."""
_ans = self.send(axis, "SVA? 1")
_voltage = float(_ans[2:])
return _voltage
return float(self.command(axis, "SVA? 1"))
@object_method(types_info=("None", "float"))
def get_output_voltage(self, axis):
""" Return output voltage read from controller. """
_ans = self.send(axis, "VOL? 1")
_voltage = float(_ans[2:])
return _voltage
return float(self.command("VOL? 1"))
def set_voltage(self, axis, new_voltage):
""" Sets Voltage to the controller."""
self.send_no_ans(axis, "SVA 1 %g" % new_voltage)
self.command("SVA 1 %g" % new_voltage)
def _get_velocity(self, axis):
"""
Return velocity taken from controller.
"""
_ans = self.send(axis, "VEL? 1")
_velocity = float(_ans.split("=")[1])
return _velocity
return float(self.command("VEL? 1"))
def _get_pos(self):
"""
- no axis parameter as _get_pos is used by encoder.... can be a problem???
- Return a 'float': real position read by capacitive sensor.
"""
_ans = self.comm.write_readline(b"POS?\n").decode()
# _ans should looks like "1=-8.45709419e+01\n"
# "\n" removed by tcp lib.
_pos = float(_ans[2:])
return _pos
return float(self.command("POS?"))
def _get_target_pos(self, axis):
"""
......@@ -281,51 +188,35 @@ class PI_E753(Controller):
- MOV? : Return the last valid commanded target position.
"""
if self._get_closed_loop_status(axis):
_ans = self.send(axis, "MOV? 1")
# _ans should looks like "1=-8.45709419e+01"
_pos = float(_ans[2:])
return float(self.command("MOV? 1"))
else:
_ans = self.send(axis, "SVA? 1")
# _ans should looks like "???"
_pos = float(_ans[2:])
return _pos
return float(self.command("SVA? 1"))
def _get_on_target_status(self, axis):
_ans = self.send(axis, "ONT? 1")
_status = _ans.split("=")[1]
_status = self.command("ONT? 1")
if _status == "1":
return True
elif _status == "0":
return False
else:
print("err _get_on_target_status, _ans=%r" % _ans)
return -1
""" CLOSED LOOP"""
def _get_closed_loop_status(self, axis):
_ans = self.send(axis, "SVO? 1")
_status = _ans.split("=")[1]
_status = self.command("SVO? 1")
if _status == "1":
return True
elif _status == "0":
return False
else:
print("err _get_closed_loop_status, _ans=%r" % _ans)
return -1
def _set_closed_loop(self, axis, state):
"""
Activate closed-loop if <state> is True.
"""
if state:
self.send_no_ans(axis, "SVO 1 1")
else:
self.send_no_ans(axis, "SVO 1 0")
self.command("SVO 1 %d" % bool(state))
@object_method(types_info=("None", "None"))
def open_loop(self, axis):
......@@ -348,19 +239,9 @@ class PI_E753(Controller):
def get_model(self, axis):
return self.model
def _get_error(self):
"""
Does not use send() to be able to call _get_error in send().
"""
_error_number = int(self.comm.write_readline(b"ERR?\n").decode())
_error_str = pi_gcs.get_error_str(int(_error_number))
return (_error_number, _error_str)
def _stop(self, axis):
print("????????? PI_E753.py received _stop ???")
self.send_no_ans(axis, "STP")
self.command("STP")
"""
ID/INFO
......@@ -370,7 +251,7 @@ class PI_E753(Controller):
"""
Return controller identifier.
"""
return self.send(axis, "*IDN?")
return self.command("*IDN?")
def get_axis_info(self, axis):
"""Return Controller specific info about <axis>
......@@ -397,11 +278,11 @@ class PI_E753(Controller):
used.
"""
idn = self.comm.write_readline(b"*IDN?\n").decode()
idn = self.command("*IDN?")
# ifc = self.command("IFC? IPADR MACADR IPSTART", 3)
ifc = [
bs.decode()
for bs in self.comm.write_readlines(b"IFC? IPADR MACADR IPSTART\n", 3)
for bs in self.sock.write_readlines(b"IFC? IPADR MACADR IPSTART\n", 3)
]
info_str = "CONTROLLER:\n"
......@@ -411,7 +292,7 @@ class PI_E753(Controller):
_start_mode = "use IPADR" if ifc[2] == "0" else "use default -> 192.168.0.1"
info_str += f" IP start: IPSTART={ifc[2]}({_start_mode})\n"
info_str += "COMMUNICATION CONFIG:\n "
info_str += self.comm.__info__()
info_str += self.sock.__info__()
return info_str
@object_method(types_info=("None", "string"))
......@@ -467,18 +348,18 @@ class PI_E753(Controller):
_infos.append(("Firmware date ", "SEP? 1 0xffff000e"))
_infos.append(("Firmware developer ", "SEP? 1 0xffff000f"))
(error_nb, err_str) = self._get_error()
(error_nb, err_str) = self.get_error()
_txt = ' ERR nb=%d : "%s"\n' % (error_nb, err_str)
# Reads pre-defined infos (1-line answers only)
for i in _infos:
_ans = self.comm.write_readline(f"{i[1]}\n".encode()).decode()
_ans = self.sock.write_readline(f"{i[1]}\n".encode()).decode()
_txt += f" {i[0]} {_ans} \n"
# Reads multi-lines infos.
# IFC
_ans = [bs.decode() for bs in self.comm.write_readlines(b"IFC?\n", 5)]
_ans = [bs.decode() for bs in self.sock.write_readlines(b"IFC?\n", 5)]
_txt = _txt + " %s \n%s\n" % (
"Communication parameters",
"\n".join(_ans),
......@@ -493,13 +374,13 @@ class PI_E753(Controller):
# TSP
_ans = [
bs.decode() for bs in self.comm.write_readlines(b"TSP?\n", _tsp_nb_lines)
bs.decode() for bs in self.sock.write_readlines(b"TSP?\n", _tsp_nb_lines)
]
_txt = _txt + " %s \n%s\n" % ("Analog setpoints", "\n".join(_ans))
# TAD
_ans = [
bs.decode() for bs in self.comm.write_readlines(b"TAD?\n", _tad_nb_lines)
bs.decode() for bs in self.sock.write_readlines(b"TAD?\n", _tad_nb_lines)
]
_txt = _txt + " %s \n%s\n" % (
"ADC value of analog input",
......@@ -510,3 +391,9 @@ class PI_E753(Controller):
# 131071 = pow(2,17)-1
return _txt
class Axis(axis_module.Axis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel = 1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment