Commit a4635eb6 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'motor_state' into 'master'

axis: state test like **state == "READY"** are now deprecated.

See merge request !625
parents d087d5c8 3fe32576
......@@ -44,6 +44,8 @@ import math
import types
import functools
import numpy
import warnings
warnings.simplefilter('once')
#: Default polling time
DEFAULT_POLLING_TIME = 0.02
......@@ -652,7 +654,7 @@ class Axis(object):
def _handle_move(self, motion, polling_time):
state = self._move_loop(polling_time)
if state in ['LIMPOS', 'LIMNEG']:
if state.LIMPOS or state.LIMNEG:
raise RuntimeError(str(state))
# gevent-atomic
......@@ -849,13 +851,13 @@ class Axis(object):
self.__move_done_callback.set()
def _check_ready(self):
if not self.state() in ("READY", "MOVING"):
# read state from hardware
self._update_settings(state=self.state(read_hw=True))
initial_state = self.state()
if not initial_state.READY and not initial_state.MOVING:
# read state from hardware
initial_state = self.state(read_hw=True)
self._update_settings(state=initial_state)
if initial_state != "READY":
if not initial_state.READY:
raise RuntimeError("axis %s state is \
%r" % (self.name, str(initial_state)))
......@@ -992,7 +994,7 @@ class Axis(object):
while True:
state = state_funct(self)
self._update_settings(state)
if state != "MOVING":
if not state.MOVING:
return state
gevent.sleep(polling_time)
......@@ -1366,19 +1368,32 @@ class AxisState(object):
def __str__(self):
return self.current_states()
def __eq__(self, other):
def __repr__(self):
return "AxisState: %s" % self.__str__()
def __contains__(self, other):
if isinstance(other, str):
if not self._current_states:
return other == "UNKNOWN"
return other in self._current_states
elif isinstance(other, AxisState):
s1 = set(self._current_states)
s2 = set(other._current_states)
return s1 == s2
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, str):
warnings.warn('Use: **%s in state** instead' % other,
DeprecationWarning)
return self.__contains__(other)
elif isinstance(other, AxisState):
return set(self._current_states) == \
set(other._current_states)
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, str):
warnings.warn('Use: **%s in state** instead' % other,
DeprecationWarning)
x = self.__eq__(other)
if x is not NotImplemented:
return not x
......
......@@ -169,7 +169,7 @@ class _Group(object):
def _check_ready(self):
initial_state = self.state()
if initial_state != "READY":
if not initial_state.READY:
raise RuntimeError("all motors are not ready")
def move(self, *args, **kwargs):
......
......@@ -474,7 +474,7 @@ class CalcController(Controller):
def state(self, axis, new_state=None):
st = self._reals_group.state()
if st == 'READY':
if st.READY:
self._calc_from_real()
return st
......
......@@ -43,7 +43,7 @@ class cs8tango(Controller):
name = axis.config.get("cs8name")
val3_varname = "n_Read%s" % name
r = setup_globals.robodiff.robot
if self.state(axis) == "READY":
if self.state(axis).READY:
with self._lock:
r.executeTask('Read%s' % name)
return float(r.getVal3GlobalVariableDouble(val3_varname))
......
......@@ -303,7 +303,7 @@ class Icepap(Controller):
def home_state(self,axis):
s = self.state(axis)
if s != 'READY' and s != 'POWEROFF':
if 'READY' not in s and 'POWEROFF' not in s:
s.set('MOVING')
return s
......
......@@ -281,7 +281,7 @@ class Mockup(Controller):
return AxisState("READY", "LIMNEG")
elif pos >= hl:
return AxisState("READY", "LIMPOS")
if self._hw_state == "OFF":
if self._hw_state.OFF:
return AxisState("OFF")
else:
s = AxisState(self._hw_state)
......
......@@ -21,14 +21,14 @@ def test_controller_from_axis(robz):
def test_state_callback(robz):
ready_event = gevent.event.AsyncResult()
def callback(state):
ready_event.set(state == 'READY')
ready_event.set(state.READY)
event.connect(robz, "state", callback)
robz.rmove(1)
assert ready_event.get(timeout=0.1)
assert robz.state() == "READY"
assert robz.state().READY
def test_move_done_callback(robz):
......@@ -94,15 +94,15 @@ def test_axis_set_acctime(roby):
assert roby.acctime(acc) == acc
def test_axis_move(robz):
assert robz.state() == "READY"
assert robz.state().READY
robz.move(180, wait=False)
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.wait_move()
assert robz.state() == "READY"
assert robz.state().READY
assert robz.position() == 180
assert robz._set_position() == 180
......@@ -111,36 +111,36 @@ def test_axis_multiple_move(robz):
robz.velocity(1000)
robz.acceleration(10000)
for i in range(10):
assert robz.state() == "READY"
assert robz.state().READY
robz.move((i+1)*2, wait=False)
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.wait_move()
assert robz.state() == "READY"
assert robz.state().READY
def test_axis_init(robz):
assert robz.state() == "READY"
assert robz.state().READY
assert robz.settings.get("init_count") == 1
def test_stop(robz):
assert robz.state() == "READY"
assert robz.state().READY
robz.move(180, wait=False)
assert robz._set_position() == 180
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.stop()
assert robz.state() == "READY"
assert robz.state().READY
def test_asynchronous_stop(robz):
robz.velocity(1)
robz.move(180, wait=False)
assert robz.state() == "MOVING"
assert robz.state().MOVING
started_time = time.time()
time.sleep(1+robz.acctime())
......@@ -148,11 +148,11 @@ def test_asynchronous_stop(robz):
robz.stop(wait=False)
elapsed_time = time.time() - started_time
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.wait_move()
assert robz.state() == "READY"
assert robz.state().READY
assert robz.position() == pytest.approx(elapsed_time+robz.acceleration()*robz.acctime()**2, 1e-2)
......@@ -161,13 +161,13 @@ def test_home_stop(robz):
time.sleep(0.1)
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.stop()
robz.wait_move()
assert robz.state() == "READY"
assert robz.state().READY
def test_limit_search_stop(robz):
robz.controller.set_hw_limits(robz, -5, 5)
......@@ -175,13 +175,13 @@ def test_limit_search_stop(robz):
time.sleep(0.1)
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.stop()
robz.wait_move()
assert robz.state() == "READY"
assert robz.state().READY
def test_limits(robz):
iset_pos = robz._set_position()
......@@ -196,7 +196,7 @@ def test_limits(robz):
robz.limits(-2.1, 1.1)
robz.rmove(1)
robz.rmove(-2)
assert robz.state() == 'READY'
assert robz.state().READY
def test_limits2(robz, roby):
iset_pos = robz._set_position()
......@@ -248,7 +248,7 @@ def test_backlash3(roby):
assert roby.backlash_move == 0
assert roby.state() == 'READY'
assert roby.state().READY
def test_axis_steps_per_unit(roby):
roby.move(180, wait=False)
......@@ -282,9 +282,9 @@ def test_custom_method(roby):
def test_home_search(roby):
roby.home(wait=False)
assert roby.state() == 'MOVING'
assert roby.state().MOVING
roby.wait_move()
assert roby.state() == 'READY'
assert roby.state().READY
roby.dial(38930)
roby.position(38930)
assert roby.offset == 0
......@@ -292,13 +292,13 @@ def test_home_search(roby):
def test_ctrlc(robz):
robz.move(100, wait=False)
assert robz.state() == "MOVING"
assert robz.state().MOVING
time.sleep(0.1)
robz._Axis__move_task.kill(KeyboardInterrupt, block=False)
with pytest.raises(KeyboardInterrupt):
robz.wait_move()
assert not robz.is_moving
assert robz.state() == "READY"
assert robz.state().READY
assert robz.position() < 100
def test_simultaneous_move(robz):
......@@ -314,7 +314,7 @@ def test_simultaneous_move(robz):
move_started.wait()
assert robz.state() == 'MOVING'
assert robz.state().MOVING
try:
robz.move(-10)
except Exception, e:
......@@ -331,19 +331,19 @@ def test_simultaneous_waitmove_exception(robz):
with pytest.raises(RuntimeError):
w2.get()
robz.off()
assert robz.state() == 'OFF'
assert 'OFF' in robz.state()
robz.on()
assert robz.state() == 'READY'
assert 'READY' in robz.state()
def test_on_off(robz):
try:
robz.off()
assert robz.state() == 'OFF'
assert robz.state().OFF
with pytest.raises(RuntimeError):
robz.move(1)
robz.on()
assert robz.state() == 'READY'
assert robz.state().READY
robz.move(1)
assert robz.position() == pytest.approx(1)
robz.move(2, wait=False)
......@@ -351,7 +351,7 @@ def test_on_off(robz):
robz.off()
robz.wait_move()
robz.off()
assert robz.state() == 'OFF'
assert robz.state().OFF
finally:
robz.on()
......@@ -405,7 +405,7 @@ def test_interrupted_waitmove(m0):
time.sleep(0.01)
with pytest.raises(KeyboardInterrupt):
waitmove.kill(KeyboardInterrupt)
assert m0.state() == "READY"
assert m0.state().READY
def test_hardware_limits(roby):
try:
......@@ -436,7 +436,7 @@ def test_bad_start(roby):
with pytest.raises(RuntimeError):
roby.move(1)
assert roby.state() == 'READY'
assert roby.state().READY
assert roby.position() == 0
finally:
roby.controller.set_error(False)
......@@ -484,9 +484,9 @@ def test_jog(robz):
hw_position = robz._hw_position()
elapsed_time = (time.time()-start_time) - robz.acctime()
assert hw_position == pytest.approx(300*elapsed_time+robz.acceleration()*0.5*robz.acctime()**2, 1e-2)
assert robz.state() == "MOVING"
assert robz.state().MOVING
robz.stop()
assert robz.state() == "READY"
assert robz.state().READY
assert robz._set_position() == robz.position()
robz.dial(0); robz.position(0)
assert robz.velocity() == 10
......
......@@ -43,7 +43,7 @@ def test_exported_pseudo_axes(s1vg, s1vo, s1hg, s1ho):
assert all((s1vg, s1vo, s1hg, s1ho))
controller = s1vg.controller
assert all((axis.controller == controller for axis in (s1vg, s1vo, s1hg, s1ho)))
assert all([axis.state() == 'READY' for axis in controller.pseudos])
assert all(['READY' in axis.state() for axis in controller.pseudos])
def test_real_axis_is_right_object(s1f, s1ho, m1):
controller = s1ho.controller
......@@ -65,8 +65,8 @@ def test_pseudo_axes_move(s1b, s1f, s1hg, s1ho):
assert s1hg.position() == pytest.approx(.5)
hgap = s1hg.position()
s1ho.move(2)
assert s1b.state() == "READY"
assert s1f.state() == "READY"
assert s1b.state().READY
assert s1f.state().READY
assert s1hg.position() == pytest.approx(hgap)
assert s1ho.position() == pytest.approx(2)
assert s1b.position() == pytest.approx(2 + (hgap / 2.0))
......
......@@ -15,45 +15,45 @@ def test_group_move(robz, roby):
roby_pos = roby.position()
grp = Group(robz, roby)
assert grp.state() == "READY"
assert grp.state().READY
target_robz = robz_pos + 50
target_roby = roby_pos + 50
grp.move(robz, target_robz, roby, target_roby, wait=False)
assert grp.state() == "MOVING"
#assert robz.state() == "MOVING"
#assert roby.state() == "MOVING"
assert grp.state().MOVING
#assert robz.state().MOVING
#assert roby.state().MOVING
grp.wait_move()
assert robz.state() == "READY"
assert roby.state() == "READY"
assert grp.state() == "READY"
assert robz.state().READY
assert roby.state().READY
assert grp.state().READY
def test_stop(roby, robz):
grp = Group(robz, roby)
grp.move(robz, 1, roby, 1)
assert robz.state() == "READY"
assert roby.state() == "READY"
assert grp.state() == "READY"
assert robz.state().READY
assert roby.state().READY
assert grp.state().READY
grp.move({robz: 0, roby: 0}, wait=False)
assert grp.state() == "MOVING"
assert grp.state().MOVING
grp.stop()
assert grp.state() == "READY"
assert robz.state() == "READY"
assert roby.state() == "READY"
assert grp.state().READY
assert robz.state().READY
assert roby.state().READY
def test_ctrlc(roby, robz):
grp = Group(robz, roby)
assert robz.state() == "READY"
assert roby.state() == "READY"
assert grp.state() == "READY"
assert robz.state().READY
assert roby.state().READY
assert grp.state().READY
grp.move({robz: -10, roby: -10}, wait=False)
......@@ -64,9 +64,9 @@ def test_ctrlc(roby, robz):
with pytest.raises(KeyboardInterrupt):
grp.wait_move()
assert grp.state() == "READY"
assert robz.state() == "READY"
assert grp.state() == "READY"
assert grp.state().READY
assert robz.state().READY
assert grp.state().READY
def test_position_reading(beacon, robz, roby):
grp = Group(robz, roby)
......@@ -104,7 +104,7 @@ def test_bad_startall(robz, robz2):
robz.controller.set_error(True)
with pytest.raises(RuntimeError):
grp.move({ robz: 1, robz2: 2 })
assert grp.state() == 'READY'
assert grp.state().READY
assert robz.position() == 0
assert robz2.position() == 0
finally:
......
......@@ -39,7 +39,7 @@ def test_config(hooked_m0, hooked_m1):
def test_axis_move(hooked_m0):
"""test single motion hook works in single axis motion"""
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
hook0 = hooked_m0.motion_hooks[0]
assert hook0.nb_pre_move == 0
......@@ -49,14 +49,14 @@ def test_axis_move(hooked_m0):
assert hook0.nb_pre_move == 1
assert hook0.nb_post_move == 0
assert hooked_m0.state() == "MOVING"
assert hooked_m0.state().MOVING
hooked_m0.wait_move()
assert hook0.last_post_move_args[-1].type == 'move'
assert hook0.nb_pre_move == 1
assert hook0.nb_post_move == 1
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
assert hooked_m0.position() == 180
assert hooked_m0._set_position() == 180
......@@ -77,7 +77,7 @@ def test_axis_limit(hooked_m0):
def test_axis_move2(hooked_m1):
"""test multiple motion hooks works in single axis motion"""
assert hooked_m1.state() == "READY"
assert hooked_m1.state().READY
hook0 = hooked_m1.motion_hooks[0]
hook1 = hooked_m1.motion_hooks[1]
......@@ -92,7 +92,7 @@ def test_axis_move2(hooked_m1):
assert hook0.nb_post_move == 0
assert hook1.nb_pre_move == 1
assert hook1.nb_post_move == 0
assert hooked_m1.state() == "MOVING"
assert hooked_m1.state().MOVING
hooked_m1.wait_move()
......@@ -100,7 +100,7 @@ def test_axis_move2(hooked_m1):
assert hook0.nb_post_move == 1
assert hook1.nb_pre_move == 1
assert hook1.nb_post_move == 1
assert hooked_m1.state() == "READY"
assert hooked_m1.state().READY
assert hooked_m1.position() == 180
assert hooked_m1._set_position() == 180
......@@ -110,22 +110,22 @@ def test_axis_multiple_move(hooked_m0):
hook0 = hooked_m0.motion_hooks[0]
for i in range(100):
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
assert hook0.nb_pre_move == i
assert hook0.nb_post_move == i
hooked_m0.move((i+1)*2, wait=False)
assert hook0.nb_pre_move == i+1
assert hook0.nb_post_move == i
assert hooked_m0.state() == "MOVING"
assert hooked_m0.state().MOVING
hooked_m0.wait_move()
assert hook0.nb_pre_move == i+1
assert hook0.nb_post_move == i+1
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
def test_stop(hooked_m0):
"""test motion hooks work when motor is stopped during motion"""
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
hook0 = hooked_m0.motion_hooks[0]
assert hook0.nb_pre_move == 0
......@@ -137,18 +137,18 @@ def test_stop(hooked_m0):
assert hook0.nb_pre_move == 1
assert hook0.nb_post_move == 0
assert hooked_m0.state() == "MOVING"
assert hooked_m0.state().MOVING
hooked_m0.stop()
assert hook0.nb_pre_move == 1
assert hook0.nb_post_move == 1
assert hooked_m0.state() == "READY"
assert hooked_m0.state().READY
def test_error_hook(hooked_error_m0):
"""test a hook which generates error on pre_move"""
assert hooked_error_m0.state() == "READY"
assert hooked_error_m0.state().READY
hook0 = hooked_error_m0.motion_hooks[0]
hook1 = hooked_error_m0.motion_hooks[1]
......@@ -164,7 +164,7 @@ def test_error_hook(hooked_error_m0):
assert hook0.nb_post_move == 0
assert hook1.nb_pre_move == 0
assert hook1.nb_post_move == 0
assert hooked_error_m0.state() == "READY"
assert hooked_error_m0.state().READY
def test_group_move(hooked_m0, hooked_m1):
......@@ -180,7 +180,7 @@ def test_group_move(hooked_m0, hooked_m1):
assert hook0.nb_post_move == 0
assert hook1.nb_pre_move == 0
assert hook1.nb_post_move == 0
assert grp.state() == "READY"
assert grp.state().READY
target_hooked_m0 = hooked_m0_pos + 50
target_hooked_m1 = hooked_m1_pos + 50
......@@ -193,7 +193,7 @@ def test_group_move(hooked_m0, hooked_m1):
assert hook0.nb_post_move == 0
assert hook1.nb_pre_move == 1
assert hook1.nb_post_move == 0
assert grp.state() == "MOVING"
assert grp.state().MOVING
grp.wait_move()
......@@ -201,9 +201,9 @@ def test_group_move(hooked_m0, hooked_m1):
assert hook0.nb_post_move == 2
assert hook1.nb_pre_move == 1
assert hook1.nb_post_move == 1
assert hooked_m0.state() == "READY"