Commit 0e983a1e authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'pi_e727_unification' into 'master'

communication of PI E727 + HW tests + comments + ignore error on STP

Closes #2689

See merge request !3833
parents 087aec83 2cd10321
Pipeline #49833 passed with stages
in 130 minutes and 51 seconds
......@@ -126,6 +126,9 @@ class _Group(object):
@property
def position_with_reals(self):
"""
???
"""
return self._dial_or_position("position", with_reals=True)
@property
......@@ -133,6 +136,9 @@ class _Group(object):
return self._dial_or_position("dial", with_reals=True)
def _dial_or_position(self, attr, with_reals=False):
"""
???
"""
positions_dict = dict()
if with_reals:
......
......@@ -41,7 +41,8 @@ responses to them are).
class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
""" Base class for E517 and E518
"""
Base class for E517 and E518
"""
CHAN_LETTER = {1: "A", 2: "B", 3: "C"}
......@@ -96,6 +97,9 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
# should have been set at init of inherited classes.
log_debug(self, "model=%s", self.model)
def close(self):
"""
Called at session exit. 6 times ???
......@@ -106,7 +110,7 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
def initialize_hardware(self):
"""
Called once per controller at first axis use.
Called once per controller at first use of any of the axes.
"""
# Initialize socket communication.
self.com_initialize()
......@@ -119,6 +123,8 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
def initialize_axis(self, axis):
"""
Called at first access to <axis> (eg: __info__())
- Reads specific config
- Adds specific methods
- Switches piezo to ONLINE mode so that axis motion can be caused
......
......@@ -6,72 +6,82 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.controllers.motor import Controller
from bliss.common.utils import object_method
from bliss.common.utils import object_attribute_get, object_attribute_set, object_method
from bliss.common.axis import AxisState
from bliss.common.logtools import log_debug, log_error
from bliss.common.logtools import log_debug, log_error, log_info
from bliss import global_map
from . import pi_gcs
from bliss.comm.util import TCP
"""
Bliss controller for ethernet PI E727 piezo controller.
"""
class PI_E727(pi_gcs.Communication, pi_gcs.Recorder, Controller):
"""
Bliss controller for ethernet PI E727 piezo controller.
"""
model = None
class PI_E727(Controller):
def __init__(self, *args, **kwargs):
# Called at session startup
# No hardware access
pi_gcs.Communication.__init__(self)
pi_gcs.Recorder.__init__(self)
Controller.__init__(self, *args, **kwargs)
self.cname = "E727"
# Init of controller.
def initialize(self):
"""
Controller intialization : open a single socket for all 3 axes.
Controller intialization.
Called at session startup.
Called Only once per controller even if more than
one axis is declared in config.
"""
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
self.trace("controller initialize")
self.host = self.config.get("host")
self.trace("opening socket")
self.sock = pi_gcs.get_pi_comm(self.config, TCP)
global_map.register(self, children_list=[self.sock])
# model of the controller.
self.model = "E727"
log_debug(self, "model=%s", self.model)
def finalize(self):
def close(self):
"""
Close the controller socket.
Called at session exit. many times ???
"""
# not called at end of device server ??? :(
# called on a new axis creation ???
self.trace("controller finalize")
try:
self.trace("closing socket")
self.sock.close()
except:
pass
def trace(self, str):
log_debug(self, "{s:{c}<{n}}".format(s=str, n=80, c="-"))
# Init of each axis.
def initialize_axis(self, axis):
self.trace("axis initialization")
self.com_close()
def initialize_hardware(self):
"""
Called once per controller at first use of any of the axes.
NB: not initialized if accessing to the controller before the axis:
ex: LILAB [1]: napix.controller.command("CCL 1 advanced")
!!! === AttributeError: 'NoneType' object has no attribute 'lock' === !!!
"""
# Initialize socket communication.
self.com_initialize()
def initialize_hardware_axis(self, axis):
"""
Called once per axis at first use of the axis
"""
pass
def initialize_axis(self, axis):
"""
Called at first access to <axis> (eg: __info__())
"""
axis.channel = axis.config.get("channel", int)
# check communication
try:
_ans = self.get_identifier(axis, 0.1)
except Exception as ex:
_str = '%r\nERROR on "%s": switch on the controller' % (ex, self.host)
# by default, an exception will be raised
log_error(self, _str)
_ans = self.get_identifier(axis)
# Enables the closed-loop.
# To be imporved.
self._set_closed_loop(axis, True)
# supposed that we are on target on init
axis._last_on_target = True
def initialize_encoder(self, encoder):
pass
......@@ -85,17 +95,31 @@ class PI_E727(Controller):
def set_off(self, axis):
pass
"""
POSITION
"""
def read_position(self, axis):
_ans = self._get_target_pos(axis)
log_debug(self, "read_position = %f" % _ans)
return _ans
"""
Return position of the axis.
After movement is finished, return the target position.
"""
if axis._last_on_target:
_pos = self._get_target_pos(axis)
log_debug(self, "position read : %g" % _pos)
else:
# if moving return real position
_pos = self._get_pos(axis)
return _pos
def read_encoder(self, encoder):
_ans = self._get_pos(axis)
_ans = self._get_pos(encoder.axis)
log_debug(self, "read_position measured = %f" % _ans)
return _ans
""" VELOCITY """
"""
VELOCITY
"""
def read_velocity(self, axis):
return self._get_velocity(axis)
......@@ -103,15 +127,16 @@ class PI_E727(Controller):
def set_velocity(self, axis, new_velocity):
log_debug(self, "set_velocity new_velocity = %f" % new_velocity)
_cmd = "VEL %s %f" % (axis.channel, new_velocity)
self.send_no_ans(_cmd)
self.check_error(_cmd)
self.command(_cmd)
return self.read_velocity(axis)
""" STATE """
"""
STATE
"""
def state(self, axis):
self.trace("axis state")
# self.trace("axis state")
if self._get_closed_loop_status(axis):
if self._get_on_target_status(axis):
return AxisState("READY")
......@@ -120,7 +145,9 @@ class PI_E727(Controller):
else:
raise RuntimeError("closed loop disabled")
""" MOVEMENTS """
"""
MOVEMENTS
"""
def prepare_move(self, motion):
pass
......@@ -128,106 +155,53 @@ class PI_E727(Controller):
def start_one(self, motion):
log_debug(self, "start_one target_pos = %f" % motion.target_pos)
# the controller latches the previous error
self.clear_error()
axis = motion.axis
_cmd = "MOV %s %g" % (axis.channel, motion.target_pos)
self.send_no_ans(_cmd)
self.check_error(_cmd)
_cmd = f"MOV {axis.channel} {motion.target_pos}"
self.command(_cmd)
def stop(self, axis):
log_debug(self, "stop requested")
self.send_no_ans("STP %s" % (axis.channel))
""" COMMUNICATIONS"""
def send(self, cmd, timeout=None):
_cmd = self._append_eoc(cmd)
_ans = self.sock.write_readline(_cmd.encode(), timeout=timeout).decode()
_ans = self._remove_eoc(_ans)
return _ans
def clear_error(self):
self._get_error()
def check_error(self, cmd):
# Check error code
(_err_nb, _err_str) = self._get_error()
if _err_nb != 0:
_str = 'ERROR on cmd "%s": #%d(%s)' % (cmd, _err_nb, _err_str)
# by default, an exception will be raised
log_error(self, _str)
raise RuntimeError(_str)
def send_no_ans(self, cmd):
_cmd = self._append_eoc(cmd)
self.sock.write(_cmd.encode())
@object_method(types_info=("None", "None"))
def raw_flush(self, axis):
self.sock.flush()
""" RAW COMMANDS """
def raw_write(self, cmd):
_cmd = self._append_eoc(cmd)
self.sock.write(_cmd.encode())
def raw_write_read(self, cmd):
_cmd = self._append_eoc(cmd)
return self.sock.write_readline(_cmd.encode()).decode()
def raw_write_readlines(self, cmd, lines):
_cmd = self._append_eoc(cmd)
ans = "\n".join(
[
r.decode().strip()
for r in self.sock.write_readlines(_cmd.encode(), lines)
]
)
_ans = self._remove_eoc(ans)
return _ans
def _append_eoc(self, cmd):
_cmd = cmd.strip()
if not _cmd.endswith("\n"):
_cmd = cmd + "\n"
log_debug(self, ">>>> %s" % (_cmd.strip("\n")))
return _cmd
def _remove_eoc(self, ans):
_ans = ans.strip()
log_debug(self, "<<<< %s" % _ans)
return _ans
"""
* STP -> stop asap
* 24 -> stop asap
* HLT -> stop smoothly
* Copy of current position into target position at stop
* all 3 commands generate (by default) a 'Disable Error 10'.
(This can be disabled via parameter 0x0E000301)
"""
log_debug(self, "stop (HLT) requested")
self.command("HLT %s" % (axis.channel))
"""
E727 specific
"""
@object_method(types_info=("None", "str"))
def get_identifier(self, axis, timeout=None):
return self.send("IDN?", timeout)
def get_identifier(self, axis):
return self.command("IDN?")
@object_method(types_info=("None", "float"))
def get_voltage(self, axis):
""" Return voltage read from controller."""
_ans = self.send("SVA?")
_voltage = float(_ans.split("=")[1])
_ans = self.command(f"SVA? {axis.channel}")
_voltage = float(_ans)
return _voltage
@object_method(types_info=("None", "float"))
def get_output_voltage(self, axis):
""" Return output voltage read from controller. """
return float(self.command(f"VOL? {axis.channel}"))
def set_voltage(self, axis, new_voltage):
""" Set Voltage to the controller."""
_cmd = "SVA %s %g" % (axis.channel, new_voltage)
self.send_no_ans(_cmd)
self.check_error(_cmd)
self.command(_cmd)
def _get_velocity(self, axis):
"""
Return velocity taken from controller.
"""
_ans = self.send("VEL? %s" % (axis.channel))
_velocity = float(_ans.split("=")[1])
_ans = self.command(f"VEL? {axis.channel}")
_velocity = float(_ans)
return _velocity
......@@ -235,57 +209,50 @@ class PI_E727(Controller):
"""
Return real position read by capacitive sensor.
"""
_ans = self.send("POS? %s" % (axis.channel))
_pos = float(_ans.split("=")[1])
_ans = self.command(f"POS? {axis.channel}")
_pos = float(_ans)
return _pos
""" ON TARGET """
"""
CLOSED LOOP
"""
def _get_target_pos(self, axis):
"""
Return last target position (setpoint value).
"""
_ans = self.send("MOV? %s" % (axis.channel))
# _ans should looks like "1=-8.45709419e+01"
_pos = float(_ans.split("=")[1])
_ans = self.command(f"MOV? {axis.channel}")
_pos = float(_ans)
return _pos
def _get_on_target_status(self, axis):
_ans = self.send("ONT? %s" % (axis.channel))
_status = _ans.split("=")[1]
if _status == "1":
return True
elif _status == "0":
return False
else:
log_error(self, "ERROR on _get_on_target_status, _ans=%r" % _ans)
return -1
""" CLOSED LOOP"""
"""
Return 'On-target status' (ONT? command) indicating
if movement is finished and measured position is within
on-target window.
"""
last_on_target = bool(int(self.command(f"ONT? {axis.channel}")))
axis._last_on_target = last_on_target
return last_on_target
def _get_closed_loop_status(self, axis):
_ans = self.send("SVO? %s" % (axis.channel))
_status = _ans.split("=")[1]
_status = self.command(f"SVO? {axis.channel}")
if _status == "1":
return True
elif _status == "0":
if _status == "0":
return False
else:
log_error(self, "ERROR on _get_closed_loop_status, _ans=%r" % _ans)
return -1
log_error(self, "ERROR on _get_closed_loop_status, _status=%r" % _status)
return -1
def _set_closed_loop(self, axis, state):
if state:
_cmd = "SVO %s 1" % (axis.channel)
_cmd = f"SVO {axis.channel} 1"
else:
_cmd = "SVO %s 0" % (axis.channel)
self.send_no_ans(_cmd)
self.check_error(_cmd)
_cmd = f"SVO {axis.channel} 0"
self.command(_cmd)
@object_method(types_info=("None", "None"))
def open_loop(self, axis):
......@@ -295,45 +262,49 @@ class PI_E727(Controller):
def close_loop(self, axis):
self._set_closed_loop(axis, True)
def _get_error(self):
# Does not use send() to be able to call _get_error in send().
_error_number = int(self.raw_write_read("ERR?"))
_error_str = pi_gcs.get_error_str(int(_error_number))
@object_attribute_get(type_info="bool")
def get_closed_loop(self, axis):
return self._get_closed_loop_status(axis)
return (_error_number, _error_str)
"""
INFO
"""
def __info__(self):
_tab = 30
@object_attribute_get(type_info="str")
def get_model(self, axis):
return self.model
(_err_nb, _err_str) = self._get_error()
_txt = "%*s: %d (%s)\n" % (_tab, "Last Error", _err_nb, _err_str)
def get_id(self, axis):
"""
Return controller identifier.
"""
return self.command("*IDN?")
_txt = _txt + "%*s:\n%s\n" % (
_tab,
"Communication parameters",
",".join(self.raw_write_readlines("IFC?", 5).split("\n")),
)
_infos = [
("Identifier", "IDN?"),
("Com level", "CCL?"),
("Firmware developer", "SEP? 1 0xffff000f"),
("Firmware name", "SEP? 1 0xffff0007"),
("Firmware version", "SEP? 1 0xffff0008"),
("Firmware description", "SEP? 1 0xffff000d"),
("Firmware date", "SEP? 1 0xffff000e"),
("Firmware developer", "SEP? 1 0xffff000f"),
]
def get_axis_info(self, axis):
"""
Return Controller specific info about <axis> for user.
Detailed infos are in get_info().
"""
info_str = "PI AXIS INFO:\n"
info_str += f" voltage (SVA) = {self.get_voltage(axis)}\n"
info_str += f" output voltage (VOL) = {self.get_output_voltage(axis)}\n"
info_str += f" closed loop = {self.get_closed_loop(axis)}\n"
for i in _infos:
_cmd = i[1]
_ans = self.send(_cmd)
_txt = _txt + "%*s: %s\n" % (_tab, i[0], _ans)
self.check_error(_cmd)
return _txt
return info_str
def __info__(self):
info_str = "CONTROLLER:\n"
(_err_nb, _err_str) = self.get_error()
info_str = f"Last Error: {_err_nb} ({_err_str})\n"
info_str += "COMMUNICATION CONFIG:\n "
info_str += self.sock.__info__()
return info_str
@object_method(types_info=("None", "string"))
def get_info(self, axis):
"""
Return information about controller.
Return detailed information about controller.
Helpful to tune the device.
"""
axis.position # force axis initialization
......@@ -342,6 +313,10 @@ class PI_E727(Controller):
_txt = ""
# use command "HPA?" to get parameters address + description
# NB: 0x7000000 used and not 0x07000000
# because PI 727 remove first 0 in the command answer
# -> cause problem in answer check.
_infos = [
("Real Position", "POS? %s"),
("Setpoint Position", "MOV? %s"),
......@@ -352,23 +327,19 @@ class PI_E727(Controller):
("Analog input setpoint", "AOS? %s"),
("ADC value of analog input", "TAD? %s"),
("Analog setpoints", "TSP? %s"),
("AutoZero Low Voltage", "SPA? 1 0x07000A00"),
("AutoZero High Voltage", "SPA? 1 0x07000A01"),
("Range Limit min", "SPA? 1 0x07000000"),
("Range Limit max", "SPA? 1 0x07000001"),
("ON Target Tolerance", "SPA? 1 0x07000900"),
("Settling time", "SPA? 1 0X07000901"),
("AutoZero Low Voltage", "SPA? %s 0x7000A00"),
("AutoZero High Voltage", "SPA? %s 0x7000A01"),
("Range Limit min", "SPA? %s 0x7000000"),
("Range Limit max", "SPA? %s 0x7000001"),
("ON Target Tolerance", "SPA? %s 0x7000900"),
("Settling time", "SPA? %s 0X7000901"),
]
for i in _infos:
_cmd = i[1]
if "%s" in _cmd:
_cmd = _cmd % (axis.channel)
_ans = self.send(_cmd)
_ans = self.command(_cmd)
_txt = _txt + "%*s: %s\n" % (_tab, i[0], _ans)
self.check_error(_cmd)
return _txt
def get_axis_info(self, axis):
return self.get_info(axis)
......@@ -11,10 +11,11 @@ from bliss.controllers.motor import Controller
from bliss.common.utils import object_attribute_get, object_attribute_set, object_method
from bliss.common.axis import AxisState
from bliss.common import axis as axis_module
from bliss.common.logtools import log_debug, log_info
from bliss.common.logtools import log_debug, log_error, log_info
import gevent
from . import pi_gcs
import gevent.lock
"""
......@@ -35,16 +36,24 @@ responses to them are).
class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
"""
Bliss controller for ethernet PI E753 / E754 piezo controllers.
"""
def __init__(self, *args, **kwargs):
pi_gcs.Communication.__init__(self)
pi_gcs.Recorder.__init__(self)
Controller.__init__(self, *args, **kwargs)