Commit 67126f99 authored by bliss administrator's avatar bliss administrator

custom tests

parent 2d1b5a8c
......@@ -61,6 +61,64 @@ hg_config_headers("tango_mot", "TANGO_MOT", "TANGO_MOT", "TANGO_MOT")
hg_generate("tango_mot")
def bliss_get_axes() '{
local TTT[]
tango_io("id26/bliss/cyrtest", "GetAxisList", TTT)
}'
def bliss_get_custom_commands(mot_name) '{
local ccList[]
tango_io(motor_par(mot_name, "dev_name"), "GetCustomCommandList", ccList)
local _fct_name _cmd_string _tin _tout _reste _length
printf("-------- custom commands for %s --------\n", mot_name)
for (ii in ccList) {
_cmd_string = ccList[ii]
# example :
# ["custom_set_measured_noise", ["float", "None"]]
_length = length(_cmd_string)
_fct_name = substr(_cmd_string, 3, index(_cmd_string, ",")-4)
# _fct_name = custom_set_measured_noise
# print _fct_name
_fct_length = index(_cmd_string, ",")+2
_reste = substr(_cmd_string, _fct_length , _length-_fct_length )
# _reste = ["custom_set_measured_noise", ["float", "None"]]
# print _reste
local uuu[]
split(_reste, uuu, ", ")
_tin = string_remove_last_N_chars(substr(uuu[0], 3), 1)
_tout = string_remove_last_N_chars(substr(uuu[1], 2), 2)
printf("%s(%s) --> %s \n", _fct_name, _tin, _tout)
}
}'
def bliss_put_traj() '{
float array tp[5][100]
array_op("fill", tp, 1,2)
tango_put("id26/bliss_cyrtest/roba/trajpar", tp)
#36.CYRIL> p motor_par("roba", "dev_name")
#id26/bliss_cyrtest/roba
}'
#%IU%
#%MDESC%
# Called on reconfig.
......
......@@ -12,7 +12,7 @@ import sys
import time
import traceback
import types
import json
class bcolors:
PINK = '\033[95m'
......@@ -24,13 +24,14 @@ class bcolors:
class BlissAxisManager(PyTango.Device_4Impl):
axis_dev_list = None
axis_dev_names = None
def __init__(self, cl, name):
PyTango.Device_4Impl.__init__(self, cl, name)
self.debug_stream("In __init__() of controller")
self.init_device()
self.axis_dev_list = None
def delete_device(self):
self.debug_stream("In delete_device() of controller")
......@@ -39,6 +40,7 @@ class BlissAxisManager(PyTango.Device_4Impl):
self.debug_stream("In init_device() of controller")
self.get_device_properties(self.get_device_class())
def dev_state(self):
""" This command gets the device state (stored in its device_state
data member) and returns it to the caller.
......@@ -97,6 +99,28 @@ class BlissAxisManager(PyTango.Device_4Impl):
return self.get_state()
def GetAxisList(self):
"""
Returns the list of BlissAxisManager axes of this device.
"""
argout = list()
U = PyTango.Util.instance()
dev_list = U.get_device_list("*")
# Creates the list of BlissAxis devices names.
if self.axis_dev_names is None:
self.axis_dev_names = list()
for dev in dev_list:
dev_name = dev.get_name()
if "bliss_" in dev_name:
self.axis_dev_names.append(dev_name)
print "axes list : ", self.axis_dev_names
for _axis in self.axis_dev_names:
argout.append(_axis)
return argout
class BlissAxisManagerClass(PyTango.DeviceClass):
# Class Properties
......@@ -111,6 +135,12 @@ class BlissAxisManagerClass(PyTango.DeviceClass):
["/users/blissadm/local/userconf/bliss/XXX.xml"]],
}
# Command definitions
cmd_list = {
'GetAxisList':
[[PyTango.DevVoid, "none"],
[PyTango.DevVarStringArray, "List of axis"]]
}
# Device States Description
# ON : The motor powered on and is ready to move.
......@@ -180,6 +210,8 @@ class BlissAxis(PyTango.Device_4Impl):
self.attr_Home_side_read = False
"""
self.attr_trajpar_read = [[0.0]]
# elog.info(" %s" % self.axis.get_info())
elog.info(" BlissAxisManager.py Axis " + bcolors.PINK + self._ds_name + bcolors.ENDC + " initialized")
......@@ -549,6 +581,30 @@ class BlissAxis(PyTango.Device_4Impl):
return self.axis.set_gate(argin)
def GetCustomCommandList(self):
"""
Returns the list of custom commands.
JSON format.
"""
_cmd_list = self.axis.custom_methods_list()
argout = list()
for _cmd in _cmd_list:
argout.append( json.dumps(_cmd))
return argout
def read_trajpar(self, attr):
self.debug_stream("In read_trajpar()")
attr.set_value(self.attr_trajpar_read)
def write_trajpar(self, attr):
self.debug_stream("In write_trajpar()")
data = attr.get_write_value()
class BlissAxisClass(PyTango.DeviceClass):
# Class Properties
......@@ -605,7 +661,10 @@ class BlissAxisClass(PyTango.DeviceClass):
[PyTango.DevString, "Configuration value"]],
'SetGate':
[[PyTango.DevLong, "state of the gate 0/1"],
[PyTango.DevVoid, ""]]
[PyTango.DevVoid, ""]],
'GetCustomCommandList':
[[PyTango.DevVoid, ""],
[PyTango.DevVarStringArray, "List of axis custom commands"]]
}
# Attribute definitions
......@@ -762,6 +821,10 @@ class BlissAxisClass(PyTango.DeviceClass):
is expressed in physical unit.",
'Display level': PyTango.DispLevel.EXPERT,
}],
'trajpar':
[[PyTango.DevFloat,
PyTango.IMAGE,
PyTango.READ_WRITE, 1000, 5]],
}
......@@ -894,7 +957,12 @@ def main():
elog.debug("axis names list : %s" % axis_names)
for axis_name in axis_names:
_axis = TgGevent.get_proxy(bliss.get_axis, axis_name)
elog.debug("BlissAxisManager.py : _____________ axis %s _____________" % axis_name)
try:
_axis = TgGevent.get_proxy(bliss.get_axis, axis_name)
except:
print traceback.format_exc()
sys.exit(-1)
new_axis_class_class = types.ClassType("BlissAxisClass_%s" % axis_name, (BlissAxisClass,), {})
new_axis_class = types.ClassType("BlissAxis_%s" % axis_name, (BlissAxis,), {})
......@@ -938,7 +1006,7 @@ def main():
"""
SETTINGS AS ATTRIBUTES.
"""
elog.debug(" BlissAxisManager.py ---------------- SETTINGS -------------------------")
elog.debug(" BlissAxisManager.py : %s : -------------- SETTINGS -----------------" % axis_name)
new_axis_class_class.attr_list = dict(BlissAxisClass.attr_list)
......@@ -983,9 +1051,11 @@ def main():
setattr(new_axis_class, "write_%s" % _attr_name, new_write_attr_method)
# End of custom command and settings
# Adds new Axis specific class.
elog.debug("BlissAxisManager.py : Adds new Axis specific class.")
py.add_class(new_axis_class_class, new_axis_class)
elog.debug("BlissAxisManager.py : Class added.")
elog.debug("BlissAxisManager.py : intitialize server.")
U.server_init()
except PyTango.DevFailed:
......
......@@ -16,6 +16,10 @@ config_xml = """
<config>
<controller class="PI_E517" name="testid16a">
<host value="192.168.167.10"/>
<encoder name="encA">
<steps_per_unit value="1"/>
<tolerance value="0.001"/>
</encoder>
<axis name="px">
<channel value="1"/>
<chan_letter value="A"/>
......@@ -39,37 +43,18 @@ class TestPI_E517Controller(unittest.TestCase):
def setUp(self):
bliss.load_cfg_fromstring(config_xml)
def test_get_id(self):
def test_get_informations(self):
pz = bliss.get_axis("pz")
print "PI_E517 IDN :", pz.get_id()
def test_get_chan(self):
pz = bliss.get_axis("pz")
print "PI_E517 channel :", pz.channel
print "PI_E517 chan_letter :", pz.chan_letter
print "PI_E517 pz state:", pz.state()
print "PI_E517 INFOS :\n", pz.get_info()
def test_get_position(self):
pz = bliss.get_axis("pz")
print "PI_E517 pz position :", pz.position()
def test_get_measured_position(self):
pz = bliss.get_axis("pz")
print "PI_E517 pz measured position :", pz.measured_position()
def test_get_axis(self):
pz = bliss.get_axis("pz")
self.assertTrue(pz)
def test_get_state(self):
pz = bliss.get_axis("pz")
print "PI_E517 pz state:", pz.state()
def test_get_info(self):
pz = bliss.get_axis("pz")
print "PI_E517 INFOS :\n", pz.get_info()
def test_get_voltage(self):
pz = bliss.get_axis("pz")
print "PI_E517 pz output voltage :", pz.controller._get_voltage(pz)
def test_get_closed_loop_status(self):
......@@ -84,117 +69,122 @@ class TestPI_E517Controller(unittest.TestCase):
print "PI_E517 move to ", _new_pos
pz.move(_new_pos)
def test_multiple_move(self):
pz = bliss.get_axis("pz")
_pos = pz.position()
for i in range(1000):
print i,
pz.move(_pos, wait=True)
def test_multiple_raw_move(self):
pz = bliss.get_axis("pz")
_pos = pz.position()
_cmd = "SVA %s %g \n" % (pz.chan_letter, _pos)
for i in range(1000):
print i,
pz.controller.sock.write(_cmd)
# time.sleep(0.001)
_pos = pz.position()
print "PI_E517 pos=", _pos
def test_multiple_raw_read_pos(self):
'''
25 mar.2014 :
nb_cmd=10000, mean=1.217250ms, max=2.267838 Ran 1 test in 12.414s
nb_cmd=100000,mean=1.242071ms, max=919.186115
val> 4ms : 627.511 919.186 887.565
'''
pz = bliss.get_axis("pz")
_cmd = "SVA? %s\n" % pz.chan_letter
_sum_time = 0
_nb_cmd = 100000
_max_duration = 0
_errors = 0
for i in range(_nb_cmd):
_t0 = time.time()
try:
_pos = pz.controller.sock.write_readline(_cmd, timeout=0.1),
except:
import pdb
pdb.set_trace()
print "TIMEOUT"
_duration = (time.time() - _t0) *1000
_sum_time = _sum_time + _duration
if _duration > _max_duration:
_max_duration = _duration
if _duration > 5:
print "i=%d %6.3f" % (i, _duration)
_errors = _errors + 1
if (i % 100) == 0:
print ".",
print "nb_cmd=%d, mean=%fms, max=%f" % (_nb_cmd, _sum_time / _nb_cmd, _max_duration)
def test_multiple_raw_read_pos_and_move(self):
print " \n\n"
pz = bliss.get_axis("pz")
_pos = pz.position()
_cmd_move = "SVA %s %g \n" % (pz.chan_letter, _pos)
_cmd_pos = "SVA? %s\n" % pz.chan_letter
for i in range(100):
print i,
print pz.controller.sock.write_readline(_cmd_pos),
# time.sleep(0.01)
pz.controller.sock.write(_cmd_move)
def test_multiple_raw_read_pos_and_move_3chan(self):
print " \n\n"
px = bliss.get_axis("px")
py = bliss.get_axis("py")
pz = bliss.get_axis("pz")
_pos_x = px.position()
_pos_y = py.position()
_pos_z = pz.position()
_cmd_move_x = "SVA %s %g \n" % (px.chan_letter, _pos_x)
_cmd_pos_x = "SVA? %s\n" % px.chan_letter
_cmd_move_y = "SVA %s %g \n" % (py.chan_letter, _pos_y)
_cmd_pos_y = "SVA? %s\n" % py.chan_letter
_cmd_move_z = "SVA %s %g \n" % (pz.chan_letter, _pos_z)
_cmd_pos_z = "SVA? %s\n" % pz.chan_letter
_respire = 0
for i in range(100):
print i,
_t0 = time.time()
px.controller.sock.write(_cmd_move_x)
_ans = px.controller.sock.write_readline(_cmd_pos_x)
_duration = 1000 * (time.time() - _t0)
print _ans,
if _duration > 5:
print "oups duration : ", _duration
time.sleep(_respire)
_t0 = time.time()
py.controller.sock.write(_cmd_move_y)
_ans = py.controller.sock.write_readline(_cmd_pos_y)
_duration = 1000 * (time.time() - _t0)
print _ans,
if _duration > 5:
print "oups duration : ", _duration
time.sleep(_respire)
_t0 = time.time()
pz.controller.sock.write(_cmd_move_z)
_ans = pz.controller.sock.write_readline(_cmd_pos_z)
_duration = 1000 * (time.time() - _t0)
print _ans,
if _duration > 5:
print "oups duration : ", _duration
time.sleep(_respire)
def test_encoder(self):
encA = bliss.get_encoder("encA")
self.assertTrue(encA)
print "PI_E517 encoder : ", encA.read()
# def test_multiple_move(self):
# pz = bliss.get_axis("pz")
# _pos = pz.position()
# for i in range(1000):
# print i,
# pz.move(_pos, wait=True)
#
# def test_multiple_raw_move(self):
# pz = bliss.get_axis("pz")
# _pos = pz.position()
# _cmd = "SVA %s %g \n" % (pz.chan_letter, _pos)
# for i in range(1000):
# print i,
# pz.controller.sock.write(_cmd)
# # time.sleep(0.001)
# _pos = pz.position()
# print "PI_E517 pos=", _pos
#
#
# def test_multiple_raw_read_pos(self):
# '''
# 25 mar.2014 :
# nb_cmd=10000, mean=1.217250ms, max=2.267838 Ran 1 test in 12.414s
# nb_cmd=100000,mean=1.242071ms, max=919.186115
# val> 4ms : 627.511 919.186 887.565
# '''
# pz = bliss.get_axis("pz")
# _cmd = "SVA? %s\n" % pz.chan_letter
# _sum_time = 0
# _nb_cmd = 100000
# _max_duration = 0
# _errors = 0
# for i in range(_nb_cmd):
# _t0 = time.time()
# try:
# _pos = pz.controller.sock.write_readline(_cmd, timeout=0.1),
# except:
# import pdb
# pdb.set_trace()
# print "TIMEOUT"
# _duration = (time.time() - _t0) *1000
# _sum_time = _sum_time + _duration
# if _duration > _max_duration:
# _max_duration = _duration
# if _duration > 5:
# print "i=%d %6.3f" % (i, _duration)
# _errors = _errors + 1
# if (i % 100) == 0:
# print ".",
# print "nb_cmd=%d, mean=%fms, max=%f" % (_nb_cmd, _sum_time / _nb_cmd, _max_duration)
#
#
# def test_multiple_raw_read_pos_and_move(self):
# print " \n\n"
# pz = bliss.get_axis("pz")
# _pos = pz.position()
# _cmd_move = "SVA %s %g \n" % (pz.chan_letter, _pos)
# _cmd_pos = "SVA? %s\n" % pz.chan_letter
# for i in range(100):
# print i,
# print pz.controller.sock.write_readline(_cmd_pos),
# # time.sleep(0.01)
# pz.controller.sock.write(_cmd_move)
#
#
# def test_multiple_raw_read_pos_and_move_3chan(self):
# print " \n\n"
# px = bliss.get_axis("px")
# py = bliss.get_axis("py")
# pz = bliss.get_axis("pz")
# _pos_x = px.position()
# _pos_y = py.position()
# _pos_z = pz.position()
# _cmd_move_x = "SVA %s %g \n" % (px.chan_letter, _pos_x)
# _cmd_pos_x = "SVA? %s\n" % px.chan_letter
# _cmd_move_y = "SVA %s %g \n" % (py.chan_letter, _pos_y)
# _cmd_pos_y = "SVA? %s\n" % py.chan_letter
# _cmd_move_z = "SVA %s %g \n" % (pz.chan_letter, _pos_z)
# _cmd_pos_z = "SVA? %s\n" % pz.chan_letter
#
# _respire = 0
#
# for i in range(100):
# print i,
#
# _t0 = time.time()
# px.controller.sock.write(_cmd_move_x)
# _ans = px.controller.sock.write_readline(_cmd_pos_x)
# _duration = 1000 * (time.time() - _t0)
# print _ans,
# if _duration > 5:
# print "oups duration : ", _duration
# time.sleep(_respire)
#
# _t0 = time.time()
# py.controller.sock.write(_cmd_move_y)
# _ans = py.controller.sock.write_readline(_cmd_pos_y)
# _duration = 1000 * (time.time() - _t0)
# print _ans,
# if _duration > 5:
# print "oups duration : ", _duration
# time.sleep(_respire)
#
# _t0 = time.time()
# pz.controller.sock.write(_cmd_move_z)
# _ans = pz.controller.sock.write_readline(_cmd_pos_z)
# _duration = 1000 * (time.time() - _t0)
# print _ans,
# if _duration > 5:
# print "oups duration : ", _duration
# time.sleep(_respire)
def test_get_on_target_status(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment