Commit 1401259f authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'PI_517_518_init_and_tests' into 'master'

Pi 517 518 init and tests

Closes #2799

See merge request !3782
parents fb904a7c 0738f75e
Pipeline #48604 failed with stages
in 115 minutes and 50 seconds
......@@ -213,6 +213,7 @@ class Controller:
def _initialize_hardware(self):
# initialize controller hardware only once.
if not self.__initialized_hw.value:
try:
self.initialize_hardware()
......@@ -224,6 +225,7 @@ class Controller:
@check_disabled
def _initialize_axis(self, axis, *args, **kwargs):
"""
Called by axis.lazy_init
"""
with self.__lock:
if self.__initialized_axis[axis]:
......@@ -231,8 +233,8 @@ class Controller:
self._initialize_hardware()
# Consider axis is initialized => prevent re-entering
# _initialize_axis in lazy_init
# Consider axis is initialized
# => prevent re-entering _initialize_axis() in lazy_init
self.__initialized_axis[axis] = True
try:
......
......@@ -63,12 +63,11 @@ class PI_E518(PI_E51X):
- <state> : bool
Returns:
- None
Raises:
?
"""
# print "gate %s on axis channel %d %f" % (state , self.gate_axis.channel, time.time())
if state:
# TO CHECK: on id16b, signal is inverted.
# Is it hte case always or a config problem ?
if not state:
_cmd = "CTO %d 7 1" % (axis.channel)
else:
_cmd = "CTO %d 7 0" % (axis.channel)
......
......@@ -8,8 +8,6 @@
"""
Bliss controller for ethernet PI E51X piezo controller.
Base controller for E517 and E518
Cyril Guilloud ESRF BLISS
Thu 13 Feb 2014 15:51:41
"""
import time
......@@ -23,8 +21,7 @@ from bliss.common.axis import AxisState
from bliss.common.logtools import log_info, log_debug
from bliss import global_map
from bliss.comm.util import TCP
from bliss.common import event
from bliss.common.event import connect, disconnect
from . import pi_gcs
......@@ -52,9 +49,16 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
model = None # defined in inherited classes.
def __init__(self, *args, **kwargs):
# Called at session startup
# No hardware access
pi_gcs.Communication.__init__(self)
pi_gcs.Recorder.__init__(self)
Controller.__init__(self, *args, **kwargs)
# Keep cache of: online, closed_loop, auto_gate, low_limit, high_limit
# per axis (it cannot be global ones otherwise last axis initialized
# is the only winner !!)
# To be chganged for BLISS setting ?
self._axis_online = weakref.WeakKeyDictionary()
self._axis_closed_loop = weakref.WeakKeyDictionary()
self._axis_auto_gate = weakref.WeakKeyDictionary()
......@@ -84,20 +88,34 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
def initialize(self):
"""
Controller intialization.
*
Called at session startup.
Called Only once per controller even if more than
one axis is declared in config.
"""
self.com_initialize()
# acceleration is not mandatory in config
self.axis_settings.config_setting["acceleration"] = False
self.comm = pi_gcs.get_pi_comm(self.config, TCP)
def close(self):
"""
Called at session exit. 6 times ???
"""
self.com_close()
for axis in self.axes.values():
disconnect(axis, "move_done", self.move_done_event_received)
global_map.register(self, children_list=[self.comm])
def initialize_hardware(self):
"""
Called once per controller at first axis use.
"""
# Initialize socket communication.
self.com_initialize()
def close(self):
if self.comm is not None:
self.comm.close()
def initialize_hardware_axis(self, axis):
"""
Called once per axis at first use of the axis
"""
pass
def initialize_axis(self, axis):
"""
......@@ -111,6 +129,8 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
Returns:
- None
"""
# called at first p1 access (eg: __info__())
axis.channel = axis.config.get("channel", int)
if axis.channel not in (1, 2, 3):
raise ValueError("PI_E51X invalid motor channel : can only be 1, 2 or 3")
......@@ -138,7 +158,7 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
self._axis_auto_gate[axis] = False
# connect move_done for auto_gate mode
event.connect(axis, "move_done", self.move_done_event_received)
connect(axis, "move_done", self.move_done_event_received)
# keep limits for gate
self._axis_low_limit[axis] = self._get_low_limit(axis)
......@@ -364,15 +384,42 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
Voltage commands
"""
def _get_voltage(self, axis):
@object_method(types_info=("None", "float"))
def get_voltage(self, axis):
"""
Returns Voltage Of Output Signal Channel (VOL? command)
Returns Voltage Of Output Signal Channel (SVA? command)
"""
_ans = self.command(f"VOL? {axis.channel}")
_vol = float(_ans.split("=+")[-1])
_ans = self.command(f"SVA? {axis.chan_letter}")
_vol = float(_ans)
return _vol
"""
@object_method(types_info=("None", "float"))
def get_output_voltage(self, axis):
"""
Return output voltage
"""
return float(self.command(f"VOL? {axis.channel}"))
# Voltage Low Hard-Limit (ID 0x0B000007)
# Voltage High Hard-Limit (ID 0x0B000008)
# ("Voltage output high limit ", "VMA? %s" % axis.channel),
# ("Voltage output low limit ", "VMI? %s" % axis.channel),
@object_method(types_info=("None", "float"))
def get_voltage_high_limit(self, axis):
"""
Return voltage HIGH limit
"""
return float(self.command(f"VMA? {axis.channel}"))
@object_method(types_info=("None", "float"))
def get_voltage_low_limit(self, axis):
"""
Return voltage LOW limit
"""
return float(self.command(f"VMI? {axis.channel}"))
"""
Closed loop commands
"""
......@@ -453,6 +500,10 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
ID/INFO
"""
@object_attribute_get(type_info="str")
def get_model(self, axis):
return self.model
def __info__(self, axis=None):
if axis is None:
return self.get_controller_info()
......@@ -471,11 +522,11 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
_txt = "PI_E51X controller :\n"
# Reads pre-defined infos (1 line answers)
for (label, cmd) in _infos:
value = self.comm.write_readline(cmd.encode() + b"\n")
_txt = _txt + "%s %s\n" % (label, value.decode())
value = self.command(cmd)
_txt = _txt + "%s %s\n" % (label, value)
# Reads multi-lines infos.
_ans = [bs.decode() for bs in self.comm.write_readlines(b"IFC?\n", 6)]
_ans = self.command("IFC?", nb_line=6)
_txt = _txt + "\n%s :\n%s\n" % ("Communication parameters", "\n".join(_ans))
"""
......@@ -486,7 +537,7 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
1 = use DHCP to obtain IP address, if this fails, use IPADR (default);
"""
_ans = [bs.decode() for bs in self.comm.write_readlines(b"VER?\n", 3)]
_ans = self.command("VER?", nb_line=3)
_txt = _txt + "\n%s :\n%s\n" % ("Firmware version", "\n".join(_ans))
return _txt
......@@ -497,6 +548,17 @@ class PI_E51X(pi_gcs.Communication, pi_gcs.Recorder, Controller):
"""
return self.command("*IDN?")
def get_axis_info(self, axis):
"""
Return Controller specific info about <axis>
"""
info_str = "PI AXIS INFO:\n"
info_str += f" voltage (SVA) = {self.get_voltage(axis)}\n"
info_str += f" output voltage (VOL) = {self.get_output_voltage(axis)}\n"
info_str += f" closed loop = {self.get_closed_loop(axis)}\n"
return info_str
@object_method(types_info=("None", "string"))
def get_info(self, axis):
"""
......
......@@ -255,10 +255,6 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
print("set_closed_loop DISALBED FOR SECURITY ... ")
# self._set_closed_loop(axis, True)
@object_attribute_get(type_info="str")
def get_model(self, axis):
return self.model
def _stop(self, axis):
print("????????? PI_E753.py received _stop ???")
self.command("STP")
......@@ -267,6 +263,10 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
ID/INFO
"""
@object_attribute_get(type_info="str")
def get_model(self, axis):
return self.model
def get_id(self, axis):
"""
Return controller identifier.
......
......@@ -12,6 +12,7 @@ This module is a common base for PI controllers:
* fro Wave generator
"""
import time
import numpy
from bliss.comm.util import get_comm, get_comm_type, TCP
......@@ -435,17 +436,16 @@ class Communication:
# ???
connect(self.sock, "connect", self._clear_error)
def finalize(self):
def com_close(self):
"""
Closes the controller socket.
Disconnect error clearing.
"""
self.close()
disconnect(self.sock, "connect", self._clear_error)
def close(self):
if self.sock:
self.sock.close()
disconnect(self.sock, "connect", self._clear_error)
def get_error(self):
_error_number = int(self.sock.write_readline(b"ERR?\n"))
_error_str = get_error_str(_error_number)
......
......@@ -612,6 +612,8 @@ def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--axis-name", help="axis name")
parser.addoption("--axis-name2", help="axis name2")
parser.addoption("--axis-name3", help="axis name3")
parser.addoption("--mythen", action="store", help="mythen host name")
parser.addoption(
"--wago",
......
......@@ -9,24 +9,57 @@
PI E-51x: E-517 E-518 piezo controllers hardware test.
Run with:
$ pytest --axis-name <axis-name> ..../test_PI_E517.py
$ pytest --axis-name <axis-name> --axis-name2 <axis-name2>
--axis-name3 <axis-name3> ..../test_PI_E517.py
ex : pytest --axis-name pb2 --axis-name2 pt2 --axis-name3 pcam tests/controllers_hw/test_PI_E517.py
"""
import time
import pytest
from bliss.common import event
@pytest.fixture
def axis(request, beacon_beamline):
"""
Function to access axis given as parameter in test command line.
Function to access axes given as parameter in test command line.
"""
axis_name = request.config.getoption("--axis-name")
test_axis = beacon_beamline.get(axis_name)
axis_list = list()
try:
axis_name = request.config.getoption("--axis-name")
print("axis_name=", axis_name)
test_axis = beacon_beamline.get(axis_name)
# print("test_axis=", test_axis)
axis_list.append(test_axis)
except:
print("cannot get", axis_name)
try:
axis_name2 = request.config.getoption("--axis-name2")
print("axis_name2=", axis_name2)
test_axis2 = beacon_beamline.get(axis_name2)
axis_list.append(test_axis2)
except:
print("")
try:
yield test_axis
finally:
test_axis.close()
axis_name3 = request.config.getoption("--axis-name3")
print("axis_name3=", axis_name3)
test_axis3 = beacon_beamline.get(axis_name3)
axis_list.append(test_axis3)
except:
print("")
if len(axis_list) > 0:
# print("axis_list=", axis_list)
try:
yield axis_list
finally:
for axis in axis_list:
axis.controller.close()
def test_hw_axis_init(axis):
......@@ -35,20 +68,93 @@ def test_hw_axis_init(axis):
Device must be present.
Use axis fixture.
"""
axis.sync_hard()
axis.controller._initialize_axis(axis)
for axis_test in axis:
print(f"{axis_test.name} pos={axis_test.position}")
axis_test.sync_hard()
def test_hw_read_position(axis):
"""
Read position (cache ?)
Read measured position
"""
for axis in axis:
print(f"{axis.name}, measured_position={axis.measured_position}")
def test_hw_read_values(axis):
"""
Read various parameters
"""
pos = axis.position
assert pos
print("test_hw_read_values")
for axis in axis:
print(axis.name)
pos = axis.position
# print(f"{axis.name} POSITION={pos}")
# must be in closed-loop ?
assert pos >= 0
assert pos <= 100
pos1 = axis.measured_position
vol = axis.get_voltage()
out_vol = axis.get_output_voltage()
cl_st = axis.get_closed_loop()
model = axis.get_model()
# called at end of each test
def tearDown(self):
# Little wait time to let time to PI controller to
# close peacefully its sockets ???
time.sleep(0.1)
# ans = axis.controller.command("SPA? 1 0x07000000")
ans = axis.controller.command("*IDN?")
# Voltage Low Limit
# ans = axis.controller.get_voltage_low_limit(axis)
# Voltage High Limit
# ans = axis.controller.get_voltage_high_limit(axis)
pos2 = axis.controller._get_pos()
axis_id = axis.controller.get_id(axis)
axis_info = axis.controller.get_axis_info(axis)
vel = axis.controller.read_velocity(axis)
tg_pos = axis.controller._get_target_pos(axis)
ont_status = axis.controller._get_on_target_status(axis)
cl_status = axis.controller._get_closed_loop_status(axis)
# send 2 commands separated by "\n"
# pos, vel = axis.controller.command("POS? 1\nVEL? 1", nb_line=2)
def test_hw_get_info(axis):
for axis in axis:
info_str1 = axis.get_info()
time.sleep(0.1)
info_str2 = axis.get_info()
# Info strings have same length (with 1% accuracy).
# This means that 2 consecutive reading seem correct.
assert pytest.approx(len(info_str1), rel=0.01) == len(info_str2)
# def test_hw_halt(axis):
# pos = axis.measured_position
#
# # STP
# axis.controller.sock.write(b"STP\r")
# err_no, err_desc = axis.controller.get_error()
# assert int(err_no) == 10
#
# # HLT command does not exist in e753.
# # ans = axis.controller.command("HLT 1")
#
# # "#24"
# axis.controller.sock.write(chr(24).encode())
# err = axis.controller.get_error()
# # ERR=(10, 'Controller was stopped by command')
#
# # '#5' command requests motion status (only in closed-loop mode)
# # 1 2 4 : axis 1, 2, 3 is moving
# bit_status = axis.controller.sock.write_readline(chr(5).encode())
#
# pos = axis.measured_position
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment