Commit fad723be authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'PI_and_axisds' into 'master'

Pi and axisds

See merge request bliss/bliss!3750
parents eca98ae9 4c06c244
Pipeline #48013 failed with stages
in 100 minutes and 45 seconds
......@@ -1123,9 +1123,6 @@ class Axis(Scannable):
"""
Return the axis state
Keyword Args:
read_hw (bool): read from hardware [default: False]
Return:
AxisState: axis state
"""
......
......@@ -98,4 +98,4 @@ class PI_E517(PI_E51X):
)
log_debug(self, "set_gate : _cmd = %s" % _cmd)
self.send_no_ans(axis, _cmd)
self.command(_cmd)
......@@ -69,8 +69,8 @@ class PI_E518(PI_E51X):
# print "gate %s on axis channel %d %f" % (state , self.gate_axis.channel, time.time())
if state:
_cmd = "CTO %d 7 1" % (self.gate_axis.channel)
_cmd = "CTO %d 7 1" % (axis.channel)
else:
_cmd = "CTO %d 7 0" % (self.gate_axis.channel)
_cmd = "CTO %d 7 0" % (axis.channel)
self.send_no_ans(self.gate_axis, _cmd)
self.command(_cmd)
......@@ -43,7 +43,7 @@ responses to them are).
"""
class PI_E51X(Controller):
class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
""" Base class for E517 and E518
"""
......@@ -52,6 +52,8 @@ class PI_E51X(Controller):
model = None # defined in inherited classes.
def __init__(self, *args, **kwargs):
pi_gcs.Communication.__init__(self)
pi_gcs.Recorder.__init__(self)
Controller.__init__(self, *args, **kwargs)
self._axis_online = weakref.WeakKeyDictionary()
self._axis_closed_loop = weakref.WeakKeyDictionary()
......@@ -80,6 +82,12 @@ class PI_E51X(Controller):
log_debug(self, "mvt started, gate set to 1")
def initialize(self):
"""
Controller intialization.
*
"""
self.com_initialize()
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
......@@ -112,7 +120,7 @@ class PI_E51X(Controller):
self.set_on(axis)
# set velocity control mode
self.send_no_ans(axis, "VCO %s 1" % axis.chan_letter)
self.command(f"VCO {axis.chan_letter} 1")
# Closed loop
self._axis_closed_loop[axis] = self._get_closed_loop_status(axis)
......@@ -128,6 +136,7 @@ class PI_E51X(Controller):
# automatic gate (OFF by default)
self._axis_auto_gate[axis] = False
# connect move_done for auto_gate mode
event.connect(axis, "move_done", self.move_done_event_received)
......@@ -147,12 +156,12 @@ class PI_E51X(Controller):
def set_on(self, axis):
log_debug(self, "set %s ONLINE" % axis.name)
self.send_no_ans(axis, "ONL %d 1" % axis.channel)
self.command(f"ONL {axis.channel} 1")
self._axis_online[axis] = 1
def set_off(self, axis):
log_debug(self, "set %s OFFLINE" % axis.name)
self.send_no_ans(axis, "ONL %d 0" % axis.channel)
self.command(f"ONL {axis.channel} 0")
self._axis_online[axis] = 0
def read_position(
......@@ -204,27 +213,35 @@ class PI_E51X(Controller):
Returns:
- <velocity> : float
"""
_ans = self.send(axis, "VEL? %s" % axis.chan_letter)
_ans = self.command(f"VEL? {axis.chan_letter}")
# _ans should looks like "A=+0012.0000"
# removes 'X=' prefix
_velocity = float(_ans[2:])
_velocity = float(_ans)
log_debug(self, "read %s velocity : %g " % (axis.name, _velocity))
return _velocity
def set_velocity(self, axis, new_velocity):
self.send_no_ans(axis, "VEL %s %f" % (axis.chan_letter, new_velocity))
self.command(f"VEL {axis.chan_letter} {new_velocity}")
log_debug(self, "%s velocity set : %g" % (axis.name, new_velocity))
return self.read_velocity(axis)
def state(self, axis):
"""
"""
# print(f"pi_e51x.py -- state({axis.name})")
if not self._axis_online[axis]:
# print("Axis is OFF")
return AxisState("OFF")
if self._axis_closed_loop[axis]:
log_debug(self, "%s state: CLOSED-LOOP active" % axis.name)
_pos = self._get_pos()
if self._get_on_target_status(axis):
# print("ONT")
return AxisState("READY")
else:
# print("not ONT")
return AxisState("MOVING")
else:
log_debug(self, "%s state: CLOSED-LOOP is not active" % axis.name)
......@@ -246,23 +263,17 @@ class PI_E51X(Controller):
Returns:
- None
"""
chan_letter = motion.axis.chan_letter
tg_pos = motion.target_pos
if self._axis_closed_loop[motion.axis]:
log_debug(
self,
"Move %s in position to %g" % (motion.axis.name, motion.target_pos),
)
# Command in position.
self.send_no_ans(
motion.axis, "MOV %s %g" % (motion.axis.chan_letter, motion.target_pos)
)
log_debug(self, "Move %s in position to %g" % (motion.axis.name, tg_pos))
self.command(f"MOV {chan_letter} {tg_pos}")
else:
log_debug(
self, "Move %s in voltage to %g" % (motion.axis.name, motion.target_pos)
)
# Command in voltage.
self.send_no_ans(
motion.axis, "SVA %s %g" % (motion.axis.chan_letter, motion.target_pos)
)
log_debug(self, f"Move {motion.axis.name} in voltage to {tg_pos}")
self.command(motion.axis, f"SVA {chan_letter} {tg_pos}")
def stop(self, axis):
"""
......@@ -271,99 +282,13 @@ class PI_E51X(Controller):
* 24 -> stop asap
* to check : copy of current position into target position ???
"""
self.send_no_ans(axis, "HLT %s" % axis.chan_letter)
# self.comm.write(b"STP\n")
self.command("HLT %s" % axis.chan_letter)
def _get_cto(self, axis):
_ans = [bs.decode() for bs in self.comm.write_readlines(b"CTO?\n", 24)]
return _ans
# Raw communication commands
def raw_write(self, com):
"""
<com> must be 'str'
"""
self.comm.write(com.encode())
def raw_read(self):
"""
Read one line from device.
"""
self.comm.readline()
def raw_write_read(self, com):
"""
* <com> must be 'str'
* Return 'str'
"""
return self.comm.write_readline(com.encode()).decode()
def raw_write_readlines(self, cmd: str, lines: int):
"""
- Adds '\\n' terminator to `cmd` string
- Sends `cmd` string to the controller and read back <lines> lines
"""
_cmd = cmd.encode() + b"\n"
return "\n".join(self.comm.write_readlines(_cmd, lines).decode())
"""
E51x communications
"""
def send(self, axis, cmd):
"""
- Converts <cmd> into 'bytes' and sends it to controller.
- Adds terminator to <cmd> string.
- <axis> is passed for debugging purposes.
- Channel must be defined in <cmd>.
- Returns the answer from controller.
- Type of <cmd> must be 'str'.
- Type of returned string is 'str'.
"""
_cmd = cmd.encode() + b"\n"
_t0 = time.time()
_ans = self.comm.write_readline(_cmd).decode()
# "\n" in answer has been removed by tcp lib.
_duration = time.time() - _t0
if _duration > 0.005:
log_info(
self,
"PI_E51X.py : Received %r from Send %s (duration : %g ms)",
_ans,
_cmd,
_duration * 1000,
)
self.check_error(_cmd)
_cto_ans = self.command("CTO?", nb_line=24)
_ans = [bs.decode() for bs in _cto_ans]
return _ans
def send_no_ans(self, axis, cmd):
"""
- Adds the 'newline' terminator character : "\\\\n"
- Sends command <cmd> to the PI E51X controller.
- Channel is already defined in <cmd>.
- <axis> is passed for debugging purposes.
- Used for answer-less commands, thus returns nothing.
"""
_cmd = cmd.encode() + b"\n"
self.comm.write(_cmd)
self.check_error(_cmd)
def check_error(self, command):
"""
- Checks error code
- <command> : 'str' : displayed in case of error
"""
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
print(":( error #%d (%s) in send_no_ans(%r)" % (_err_nb, _err_str, command))
"""
E51X specific
"""
......@@ -373,14 +298,12 @@ class PI_E51X(Controller):
Args:
- <axis> :
Returns:
- <position> Returns real position (POS? command) read by capacitive sensor.
- <position>: real positions (POS? command) read by capacitive sensor.
Raises:
?
"""
_bs_ans = self.comm.write_readlines(b"POS?\n", 3)
_ans = [bs.decode() for bs in _bs_ans]
_ans = self.command("POS?", nb_line=3)
_pos = list(map(float, [x[2:] for x in _ans]))
return _pos
......@@ -396,25 +319,20 @@ class PI_E51X(Controller):
- list of float
"""
if self._axis_closed_loop[axis]:
_bs_ans = self.comm.write_readlines(b"MOV?\n", 3)
_ans = self.command("MOV?", nb_line=3)
else:
_bs_ans = self.comm.write_readlines(b"SVA?\n", 3)
_ans = [bs.decode() for bs in _bs_ans]
# _pos = float(_ans[2:])
_ans = self.command("SVA?", nb_line=3)
_pos = list(map(float, [x[2:] for x in _ans]))
return _pos
def _get_low_limit(self, axis):
_ans = self.send(axis, "NLM? %s" % axis.chan_letter)
# A=+0000.0000
return float(_ans[2:])
_ans = self.command(f"NLM? {axis.chan_letter}")
return float(_ans)
def _get_high_limit(self, axis):
_ans = self.send(axis, "PLM? %s" % axis.chan_letter)
# A=+0035.0000
return float(_ans[2:])
_ans = self.command(f"PLM? {axis.chan_letter}")
return float(_ans)
"""
DCO : Drift Compensation Offset.
......@@ -430,9 +348,9 @@ class PI_E51X(Controller):
@object_attribute_get(type_info="bool")
def get_dco(self, axis):
dco = self.send(axis, "DCO? %s" % axis.chan_letter)
val = int(dco[2:])
return bool(val)
_dco = self.command(f"DCO? {axis.chan_letter}")
val = bool(int(_dco))
return val
@object_method(types_info=("bool", "None"))
def set_dco(self, axis, onoff):
......@@ -440,7 +358,7 @@ class PI_E51X(Controller):
self._set_dco(axis, onoff)
def _set_dco(self, axis, onoff):
self.send_no_ans(axis, "DCO %s %d" % (axis.chan_letter, onoff))
self.command(f"DCO {axis.chan_letter}, {onoff}")
"""
Voltage commands
......@@ -450,7 +368,7 @@ class PI_E51X(Controller):
"""
Returns Voltage Of Output Signal Channel (VOL? command)
"""
_ans = self.send(axis, "VOL? %s" % axis.channel)
_ans = self.command(f"VOL? {axis.channel}")
_vol = float(_ans.split("=+")[-1])
return _vol
......@@ -463,13 +381,13 @@ class PI_E51X(Controller):
Returns Closed loop status (Servo state) (SVO? command)
-> True/False
"""
_ans = self.send(axis, "SVO? %s" % axis.chan_letter)
_status = bool(int(_ans[2:]))
_ans = self.command(f"SVO? {axis.chan_letter}")
_status = bool(int(_ans))
return _status
def _set_closed_loop(self, axis, onoff):
log_debug(self, "set %s closed_loop to %s" % (axis.name, onoff))
self.send_no_ans(axis, "SVO %s %d" % (axis.chan_letter, onoff))
self.command(f"SVO {axis.chan_letter} {onoff}")
self._axis_closed_loop[axis] = self._get_closed_loop_status(axis)
log_debug(
self, "effective closed_loop is now %s" % self._axis_closed_loop[axis]
......@@ -485,8 +403,8 @@ class PI_E51X(Controller):
Returns << On Target >> status (ONT? command).
True/False
"""
_ans = self.send(axis, "ONT? %s" % axis.chan_letter)
_status = bool(int(_ans[2:]))
_ans = self.command(f"ONT? {axis.chan_letter}")
_status = bool(int(_ans))
return _status
@object_method(types_info=("None", "None"))
......@@ -531,17 +449,6 @@ class PI_E51X(Controller):
"""
raise NotImplementedError
def _get_error(self):
"""
- Checks error code
- <command> : 'bytes' : Previous command string displayed in case of error
"""
# Does not use send() to be able to call _get_error in send().
_error_number = int(self.comm.write_readline(b"ERR?\n").decode())
_error_str = pi_gcs.get_error_str(int(_error_number))
return (_error_number, _error_str)
"""
ID/INFO
"""
......@@ -588,7 +495,7 @@ class PI_E51X(Controller):
"""
- Returns a 'str' string.
"""
return self.send(axis, "*IDN?")
return self.command("*IDN?")
@object_method(types_info=("None", "string"))
def get_info(self, axis):
......@@ -613,7 +520,7 @@ class PI_E51X(Controller):
("Drift compensation Offset ", "DCO? %s" % axis.chan_letter),
("Online ", "ONL? %s" % axis.channel),
("On target ", "ONT? %s" % axis.chan_letter),
("On target window ", "SPA? %s 0x07000900" % axis.channel),
("On target window ", "SPA? %s 0x07000900" % axis.chan_letter),
("ADC Value of input signal ", "TAD? %s" % axis.channel),
("Input Signal Position value", "TSP? %s" % axis.channel),
("Velocity control mode ", "VCO? %s" % axis.chan_letter),
......@@ -629,36 +536,35 @@ class PI_E51X(Controller):
# Read pre-defined infos (1 line answers)
for i in _infos:
_txt = _txt + " %s %s\n" % (i[0], self.send(axis, (i[1])))
_txt = _txt + " %s %s\n" % (i[0], self.command((i[1])))
if self.model == "E517":
# Communication parameters.
_txt = _txt + " %s \n%s\n" % (
"Communication parameters",
"\n".join(self.sock.write_readlines("IFC?\n", 6)),
"\n".join(self.command("IFC?", nb_line=6)),
)
# Firmware version
_txt = _txt + " %s \n%s\n" % (
"Firmware version",
"\n".join(self.sock.write_readlines("VER?\n", 3)),
"\n".join(self.command("VER?", nb_line=3)),
)
if self.model == "E518":
# Communication parameters.
_txt = _txt + " %s \n%s\n" % (
"Communication parameters",
"\n".join(self.sock.write_readlines("IFC?\n", 5)),
"\n".join(self.command("IFC?", nb_line=5)),
)
# Firmware version
_txt = _txt + " %s \n%s\n" % (
"Firmware version",
"\n".join(self.sock.write_readlines("VER?\n", 5)),
"\n".join(self.command("VER?", nb_line=5)),
)
# error ?
(error_nb, err_str) = self._get_error()
_txt += " Last error code %d= %s" % (error_nb, err_str)
(error_nb, err_str) = self.get_error()
_txt += ' ERR nb=%d : "%s"\n' % (error_nb, err_str)
return _txt
......@@ -43,7 +43,8 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
# Init of controller.
def initialize(self):
"""
Controller intialization: open a single socket for all 3 axes.
Controller intialization.
*
"""
self.com_initialize()
......@@ -71,6 +72,8 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
# Enables the closed-loop.
# Can be dangerous ??? test diff between target and position before ???
# self._set_closed_loop(axis, True)
# ?
self._add_recoder_enum_on_axis(axis)
def initialize_encoder(self, encoder):
......@@ -190,9 +193,9 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
def _get_pos(self):
"""
- no axis parameter as _get_pos() is also used by encoder object.
- no <axis> parameter as _get_pos() is also used by encoder object.
Returns : float'
Returns : float
Real position of axis read by capacitive sensor.
"""
return float(self.command("POS? 1"))
......
......@@ -485,6 +485,7 @@ class Communication:
"""
with self.sock.lock:
# print(" CMD=", cmd)
cmd = cmd.strip()
need_reply = cmd.find("?") > -1
cmd = cmd.encode()
......@@ -501,11 +502,11 @@ class Communication:
)
if nb_line > 1:
# Multi-lines answer or multiple commands
# print("Multi-lines answer or multiple commands")
parsed_reply = list()
commands = cmd.split(b"\n")
if len(commands) == nb_line:
# Many queries, one reply per query
# print("# Many queries, one reply per query")
# Return a tuple of str
for cmd, rep in zip(commands, reply):
space_pos = cmd.find(b" ")
......@@ -516,18 +517,27 @@ class Communication:
# No space in cmd => no param to parse. ex: "*IDN?" "CCL?"
parsed_reply.append(rep)
else:
# One command with reply in several lines
# print("# One command with reply in several lines")
# Return a list of str
space_pos = cmd.find(b" ")
if space_pos > -1:
# print("space_pos > -1")
args = cmd[space_pos + 1 :]
for arg, rep in zip(args.split(), reply):
parsed_reply.append(self._parse_reply(rep, arg, cmd))
parsed_reply.append(
self._parse_reply(rep, arg, cmd).strip()
)
else:
# TSP? TAD? IFC? etc.
# print("# TSP? TAD? IFC? POS? etc.")
# !!!! return non-parsed lines !!!
# ex:
# pp.controller.command("POS?", nb_line=3)
# return:
# ['A=32.9347', 'B=9.9985', 'C=15.3014']
for ans in reply:
parsed_reply.append(ans.decode())
parsed_reply.append(ans.decode().strip())
reply = parsed_reply
# print(" REPLY=", reply)
else:
# Single line answer.
......
......@@ -446,8 +446,11 @@ class BlissAxis(Device):
return get_worker().execute(self._dev_state)
def _dev_state(self):
""" This command gets the device state (stored in its device_state
data member) and returns it to the caller.
"""
* get the device state from self.axis.state
* map it into a Tango STATE
* store tango state
* return tango state
:param : none
:type: tango.DevVoid
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment