Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Benoit Rousselle
bliss
Commits
8aaef0b9
Commit
8aaef0b9
authored
Oct 31, 2016
by
Jose Tiago Macara Coutinho
Browse files
motion: add support for units
parent
e78752f5
Changes
5
Hide whitespace changes
Inline
Side-by-side
bliss/common/axis.py
View file @
8aaef0b9
...
...
@@ -33,6 +33,7 @@ from bliss.common.task_utils import *
from
bliss.controllers.motor_settings
import
AxisSettings
from
bliss.common
import
event
from
bliss.common.utils
import
Null
from
bliss.common.units
import
ureg
as
ur
import
gevent
import
re
import
types
...
...
@@ -41,6 +42,8 @@ import functools
#: Default polling time
DEFAULT_POLLING_TIME
=
0.02
_DISPLACEMENT_SETTINGS
=
set
((
'position'
,
'dial_position'
,
'_set_position'
,
'offset'
,
'low_limit'
,
'high_limit'
))
class
Modulo
(
object
):
def
__init__
(
self
,
mod
=
360
):
...
...
@@ -75,6 +78,24 @@ class Motion(object):
return
self
.
__axis
def
to_quantity
(
value
,
unit
):
if
unit
is
None
or
value
is
None
or
isinstance
(
value
,
Null
):
return
value
if
isinstance
(
value
,
ur
.
Quantity
):
if
value
.
dimensionless
:
value
=
value
.
magnitude
else
:
return
value
.
to
(
unit
)
return
ur
.
Quantity
(
value
,
unit
)
def
to_magnitude
(
value
,
unit
):
value
=
to_quantity
(
value
,
unit
)
if
isinstance
(
value
,
ur
.
Quantity
):
value
=
value
.
magnitude
return
value
class
Axis
(
object
):
"""
Bliss motor axis
...
...
@@ -144,19 +165,36 @@ class Axis(object):
return
self
.
is_moving
return
False
@
property
def
unit
(
self
):
"""Current user units (:obj:`Unit` or None)"""
#unit = self.config.get("unit", default='dimensionless')
unit
=
self
.
config
.
get
(
'unit'
,
default
=
None
)
if
unit
is
not
None
:
return
ur
.
Unit
(
unit
)
@
property
def
velocity_unit
(
self
):
return
self
.
unit
/
ur
.
s
if
self
.
unit
else
None
@
property
def
acceleration_unit
(
self
):
return
self
.
velocity_unit
/
ur
.
s
if
self
.
velocity_unit
else
None
@
property
def
offset
(
self
):
"""Current offset in user units (:obj:`float`)"""
offset
=
self
.
setting
s
.
get
(
"offset"
)
"""Current offset in user units (
:obj:`Quantity` or
:obj:`float`)"""
offset
=
self
.
get_
setting
(
"offset"
)
if
offset
is
None
:
offset
=
0
self
.
setting
s
.
set
(
'offset'
,
0
)
self
.
_set_
setting
(
'offset'
,
0
)
return
offset
@
property
def
backlash
(
self
):
"""Current backlash in user units (:obj:`float`)"""
return
self
.
config
.
get
(
"backlash"
,
float
,
0
)
"""Current backlash in user units (:obj:`Quantity` or :obj:`float`)"""
backlash
=
self
.
config
.
get
(
"backlash"
,
float
,
0
)
return
to_quantity
(
backlash
,
self
.
unit
)
@
property
def
sign
(
self
):
...
...
@@ -178,8 +216,9 @@ class Axis(object):
self
.
steps_per_unit
)
@
property
def
tolerance
(
self
):
"""Current tolerance in dial units (:obj:`float`)"""
return
self
.
config
.
get
(
"tolerance"
,
float
,
1E-4
)
"""Current tolerance in dial units (:obj:`Quantity` or :obj:`float`)"""
tolerance
=
self
.
config
.
get
(
"tolerance"
,
float
,
1E-4
)
return
to_quantity
(
tolerance
,
self
.
unit
)
@
property
def
encoder
(
self
):
...
...
@@ -220,11 +259,29 @@ class Axis(object):
def
set_setting
(
self
,
*
args
):
"""Sets the given settings"""
self
.
settings
.
set
(
*
args
)
self
.
_set_setting
(
*
args
)
def
_set_setting
(
self
,
*
args
,
**
kwargs
):
(
name
,
value
),
args
=
args
[:
2
],
args
[
2
:]
if
name
in
_DISPLACEMENT_SETTINGS
:
value
=
to_magnitude
(
value
,
self
.
unit
)
elif
name
==
'velocity'
:
value
=
to_magnitude
(
value
,
self
.
velocity_unit
)
elif
name
==
'acceleration'
:
value
=
to_magnitude
(
value
,
self
.
acceleration_unit
)
self
.
settings
.
set
(
name
,
value
,
*
args
,
**
kwargs
)
def
get_setting
(
self
,
*
args
):
"""Returns the values for the given settings"""
return
self
.
settings
.
get
(
*
args
)
result
=
self
.
settings
.
get
(
*
args
)
name
=
args
[
0
]
if
name
in
_DISPLACEMENT_SETTINGS
:
result
=
to_quantity
(
result
,
self
.
unit
)
elif
name
==
'velocity'
:
result
=
to_quantity
(
result
,
self
.
velocity_unit
)
elif
name
==
'acceleration'
:
result
=
to_quantity
(
result
,
self
.
acceleration_unit
)
return
result
def
has_tag
(
self
,
tag
):
"""
...
...
@@ -277,11 +334,13 @@ class Axis(object):
@
lazy_init
def
_set_position
(
self
,
new_set_pos
=
None
):
if
new_set_pos
is
None
:
sp
=
self
.
setting
s
.
get
(
"_set_position"
)
sp
=
self
.
get_
setting
(
"_set_position"
)
if
sp
is
not
None
:
return
sp
new_set_pos
=
self
.
position
()
self
.
settings
.
set
(
"_set_position"
,
new_set_pos
)
else
:
new_set_pos
=
to_quantity
(
new_set_pos
,
self
.
unit
)
self
.
set_setting
(
"_set_position"
,
new_set_pos
)
return
new_set_pos
@
lazy_init
...
...
@@ -300,9 +359,11 @@ class Axis(object):
Returns the dial encoder position.
Returns:
float: dial encoder position
Quantity or
float: dial encoder position
"""
return
self
.
__controller
.
read_encoder
(
self
.
encoder
)
/
self
.
encoder
.
steps_per_unit
encoder
=
self
.
__controller
.
read_encoder
(
self
.
encoder
)
encoder
/=
self
.
encoder
.
steps_per_unit
return
to_quantity
(
encoder
,
self
.
unit
)
@
lazy_init
def
dial
(
self
,
new_dial
=
None
):
...
...
@@ -318,7 +379,7 @@ class Axis(object):
float: current dial position (dimensionless)
"""
if
new_dial
is
None
:
dial_pos
=
self
.
setting
s
.
get
(
"dial_position"
)
dial_pos
=
self
.
get_
setting
(
"dial_position"
)
if
dial_pos
is
None
:
dial_pos
=
self
.
_read_dial_and_update
()
return
dial_pos
...
...
@@ -327,15 +388,17 @@ class Axis(object):
raise
RuntimeError
(
"%s: can't set axis dial position "
"while moving"
%
self
.
name
)
unit
=
self
.
unit
user_pos
=
self
.
position
()
new_dial
=
to_quantity
(
new_dial
,
unit
)
try
:
# Send the new value in motor units to the controller
# and read back the (atomically) reported position
new_hw
=
new_dial
*
self
.
steps_per_unit
new_hw
=
to_magnitude
(
new_dial
*
self
.
steps_per_unit
,
unit
)
hw_pos
=
self
.
__controller
.
set_position
(
self
,
new_hw
)
dial_pos
=
hw_pos
/
self
.
steps_per_unit
self
.
setting
s
.
set
(
"dial_position"
,
dial_pos
)
dial_pos
=
to_quantity
(
hw_pos
/
self
.
steps_per_unit
,
unit
)
self
.
set
_set
ting
(
"dial_position"
,
dial_pos
)
except
NotImplementedError
:
dial_pos
=
self
.
_read_dial_and_update
(
update_user
=
False
)
...
...
@@ -352,15 +415,15 @@ class Axis(object):
argument is provided
Keyword Args:
new_pos: new user position (
in user units) [default: None,
\
meaning just return current user position]
new_pos: new user position (
Quantity or float in user units)
[default: None,
meaning just return current user position]
Returns:
float: current user position (user units)
Quantity or
float: current user position (user units)
"""
elog
.
debug
(
"axis.py : position(new_pos=
%r)"
%
new_pos
)
elog
.
debug
(
"axis.py : position(new_pos=
{0})"
.
format
(
new_pos
)
)
if
new_pos
is
None
:
pos
=
self
.
setting
s
.
get
(
"position"
)
pos
=
self
.
get_
setting
(
"position"
)
if
pos
is
None
:
pos
=
self
.
dial2user
(
self
.
dial
())
return
pos
...
...
@@ -369,6 +432,7 @@ class Axis(object):
raise
RuntimeError
(
"%s: can't set axis user position "
"while moving"
%
self
.
name
)
new_pos
=
to_quantity
(
new_pos
,
self
.
unit
)
if
self
.
no_offset
:
return
self
.
dial
(
new_pos
)
else
:
...
...
@@ -376,11 +440,12 @@ class Axis(object):
@
lazy_init
def
_read_dial_and_update
(
self
,
update_user
=
True
,
write
=
True
):
unit
=
self
.
unit
dial_pos
=
self
.
_hw_position
()
self
.
setting
s
.
set
(
"dial_position"
,
dial_pos
,
write
=
write
)
self
.
_set_
setting
(
"dial_position"
,
dial_pos
,
write
=
write
)
if
update_user
:
user_pos
=
self
.
dial2user
(
dial_pos
,
self
.
offset
)
self
.
setting
s
.
set
(
"position"
,
user_pos
,
write
=
write
)
self
.
_set_
setting
(
"position"
,
user_pos
,
write
=
write
)
return
dial_pos
@
lazy_init
...
...
@@ -391,22 +456,24 @@ class Axis(object):
# this controller does not have a 'position'
# (e.g like some piezo controllers)
curr_pos
=
0
return
curr_pos
return
to_quantity
(
curr_pos
,
self
.
unit
)
def
_calc_offset
(
self
,
new_pos
,
dial_pos
):
return
new_pos
-
self
.
sign
*
dial_pos
def
_set_position_and_offset
(
self
,
new_pos
):
unit
=
self
.
unit
dial_pos
=
self
.
dial
()
prev_offset
=
self
.
offset
self
.
_set_position
(
new_pos
)
self
.
settings
.
set
(
"offset"
,
self
.
_calc_offset
(
new_pos
,
dial_pos
))
offset
=
self
.
_calc_offset
(
new_pos
,
dial_pos
)
self
.
_set_setting
(
"offset"
,
offset
)
# update limits
ll
,
hl
=
self
.
limits
()
lim_delta
=
self
.
offset
-
prev_offset
self
.
limits
(
ll
+
lim_delta
if
ll
is
not
None
else
ll
,
hl
+
lim_delta
if
hl
is
not
None
else
hl
)
self
.
setting
s
.
set
(
"position"
,
new_pos
,
write
=
True
)
self
.
_set_
setting
(
"position"
,
new_pos
,
write
=
True
)
return
new_pos
@
lazy_init
...
...
@@ -459,20 +526,24 @@ class Axis(object):
Returns:
float: current velocity (user units/second)
"""
unit
=
self
.
velocity_unit
if
from_config
:
return
self
.
config
.
get
(
"velocity"
,
float
)
return
to_quantity
(
self
.
config
.
get
(
"velocity"
,
float
)
,
unit
)
new_velocity
=
to_quantity
(
new_velocity
,
unit
)
if
new_velocity
is
not
None
:
# Write -> Converts into motor units to change velocity of axis."
self
.
__controller
.
set_velocity
(
self
,
new_velocity
*
abs
(
self
.
steps_per_unit
))
self
,
to_magnitude
(
new_velocity
,
unit
)
*
abs
(
self
.
steps_per_unit
))
_user_vel
=
self
.
__controller
.
read_velocity
(
self
)
/
abs
(
self
.
steps_per_unit
)
self
.
settings
.
set
(
"velocity"
,
_user_vel
)
_user_vel
=
to_quantity
(
_user_vel
,
unit
)
self
.
set_setting
(
"velocity"
,
_user_vel
)
else
:
# Read -> Returns velocity read from motor axis.
_user_vel
=
self
.
setting
s
.
get
(
'velocity'
)
_user_vel
=
self
.
get_
setting
(
'velocity'
)
if
_user_vel
is
None
:
_user_vel
=
self
.
__controller
.
read_velocity
(
self
)
/
abs
(
self
.
steps_per_unit
)
_user_vel
=
to_quantity
(
_user_vel
,
unit
)
return
_user_vel
...
...
@@ -481,23 +552,26 @@ class Axis(object):
"""
<new_acc> is given in user_units/s2.
"""
unit
=
self
.
acceleration_unit
if
from_config
:
return
self
.
config
.
get
(
"acceleration"
,
float
)
return
to_quantity
(
self
.
config
.
get
(
"acceleration"
,
float
)
,
unit
)
new_acc
=
to_quantity
(
new_acc
,
unit
)
if
new_acc
is
not
None
:
if
self
.
is_moving
:
raise
RuntimeError
(
"Cannot set acceleration while axis '%s` is moving."
%
self
.
name
)
# Converts into motor units to change acceleration of axis.
self
.
__controller
.
set_acceleration
(
self
,
new_acc
*
abs
(
self
.
steps_per_unit
))
self
.
__controller
.
set_acceleration
(
self
,
to_magnitude
(
new_acc
,
unit
)
*
abs
(
self
.
steps_per_unit
))
_ctrl_acc
=
self
.
__controller
.
read_acceleration
(
self
)
_acceleration
=
_ctrl_acc
/
abs
(
self
.
steps_per_unit
)
self
.
setting
s
.
set
(
"acceleration"
,
_acceleration
)
_acceleration
=
to_quantity
(
_ctrl_acc
/
abs
(
self
.
steps_per_unit
)
,
unit
)
self
.
set
_set
ting
(
"acceleration"
,
_acceleration
)
else
:
_acceleration
=
self
.
setting
s
.
get
(
'acceleration'
)
_acceleration
=
self
.
get_
setting
(
'acceleration'
)
if
_acceleration
is
None
:
_ctrl_acc
=
self
.
__controller
.
read_acceleration
(
self
)
_acceleration
=
_ctrl_acc
/
abs
(
self
.
steps_per_unit
)
_acceleration
=
to_quantity
(
_ctrl_acc
/
abs
(
self
.
steps_per_unit
)
,
unit
)
return
_acceleration
...
...
@@ -517,10 +591,12 @@ class Axis(object):
Returns:
float: current acceleration time (second)
"""
unit
=
ur
.
s
if
self
.
unit
else
None
if
from_config
:
return
self
.
velocity
(
from_config
=
True
)
/
self
.
acceleration
(
from_config
=
True
)
if
new_acctime
is
not
None
:
new_acctime
=
to_quantity
(
new_acctime
,
unit
=
unit
)
# Converts acctime into acceleration.
acc
=
self
.
velocity
()
/
new_acctime
self
.
acceleration
(
acc
)
...
...
@@ -551,11 +627,14 @@ class Axis(object):
ll
=
self
.
config
.
get
(
"low_limit"
,
float
,
None
)
hl
=
self
.
config
.
get
(
"high_limit"
,
float
,
None
)
return
map
(
self
.
dial2user
,
(
ll
,
hl
))
unit
=
self
.
unit
low_limit
=
to_quantity
(
low_limit
,
unit
)
high_limit
=
to_quantity
(
high_limit
,
unit
)
if
not
isinstance
(
low_limit
,
Null
):
self
.
setting
s
.
set
(
"low_limit"
,
low_limit
)
self
.
_set_
setting
(
"low_limit"
,
low_limit
)
if
not
isinstance
(
high_limit
,
Null
):
self
.
setting
s
.
set
(
"high_limit"
,
high_limit
)
return
self
.
setting
s
.
get
(
'low_limit'
),
self
.
setting
s
.
get
(
'high_limit'
)
self
.
_set_
setting
(
"high_limit"
,
high_limit
)
return
self
.
get_
setting
(
'low_limit'
),
self
.
get_
setting
(
'high_limit'
)
def
_update_settings
(
self
,
state
=
None
):
self
.
settings
.
set
(
"state"
,
state
if
state
is
not
None
else
self
.
state
(),
write
=
self
.
_hw_control
)
...
...
@@ -569,7 +648,7 @@ class Axis(object):
self
.
_handle_move
(
backlash_motion
,
polling_time
)
def
_handle_move
(
self
,
motion
,
polling_time
):
state
=
self
.
_wait_move
(
polling_time
)
state
=
self
.
_wait_move
(
polling_time
,
update_settings
=
True
)
if
state
in
[
'LIMPOS'
,
'LIMNEG'
]:
raise
RuntimeError
(
str
(
state
))
...
...
@@ -587,7 +666,7 @@ class Axis(object):
backlash_start
=
dial_pos
*
self
.
steps_per_unit
# axis has moved to target pos - backlash (or shorter, if stopped);
# now do the final motion (backlash) relative to current/theo. pos
elog
.
debug
(
"doing backlash (
%g)"
%
motion
.
backlash
)
elog
.
debug
(
"doing backlash (
{0})"
.
format
(
motion
.
backlash
)
)
self
.
_backlash_move
(
backlash_start
,
motion
.
backlash
,
polling_time
)
elif
stopped
:
self
.
_set_position
(
user_pos
)
...
...
@@ -595,7 +674,7 @@ class Axis(object):
self
.
_do_encoder_reading
()
def
_jog_move
(
self
,
velocity
,
direction
,
polling_time
):
self
.
_wait_move
(
polling_time
)
self
.
_wait_move
(
polling_time
,
update_settings
=
True
)
dial_pos
=
self
.
_read_dial_and_update
()
user_pos
=
self
.
dial2user
(
dial_pos
)
...
...
@@ -622,11 +701,14 @@ class Axis(object):
Returns:
float: position in axis user units
"""
position
=
to_quantity
(
position
,
self
.
unit
)
if
position
is
None
:
# see limits()
return
None
if
offset
is
None
:
offset
=
self
.
offset
else
:
offset
=
to_quantity
(
offset
,
self
.
unit
)
return
(
self
.
sign
*
position
)
+
offset
def
user2dial
(
self
,
position
):
...
...
@@ -639,21 +721,25 @@ class Axis(object):
Returns:
float: position in axis dial units
"""
position
=
to_quantity
(
position
,
self
.
unit
)
return
(
position
-
self
.
offset
)
/
self
.
sign
@
lazy_init
def
prepare_move
(
self
,
user_target_pos
,
relative
=
False
):
"""Prepare a motion. Internal usage only"""
elog
.
debug
(
"user_target_pos=%g, relative=%r"
%
(
user_target_pos
,
relative
))
elog
.
debug
(
"user_target_pos={0}, relative={1!r}"
.
format
(
user_target_pos
,
relative
))
unit
=
self
.
unit
user_initial_dial_pos
=
self
.
dial
()
hw_pos
=
self
.
_read_dial_and_update
()
elog
.
debug
(
"hw_position=%g user_initial_dial_pos=%g"
%
(
hw_pos
,
user_initial_dial_pos
))
elog
.
debug
(
"hw_position={0} user_initial_dial_pos={1}"
.
format
(
hw_pos
,
user_initial_dial_pos
))
if
abs
(
user_initial_dial_pos
-
hw_pos
)
>
self
.
tolerance
:
raise
RuntimeError
(
"%s: discrepancy between dial (%f) and
controller position (
%f
), aborting"
%
(
self
.
name
,
user_initial_dial_pos
,
hw_pos
))
raise
RuntimeError
(
"{0}: discrepancy between dial ({1}) and "
"
controller position (
{2}
), aborting"
.
format
(
self
.
name
,
user_initial_dial_pos
,
hw_pos
))
if
relative
:
user_initial_pos
=
self
.
_set_position
()
...
...
@@ -664,13 +750,13 @@ class Axis(object):
dial_initial_pos
=
self
.
user2dial
(
user_initial_pos
)
dial_target_pos
=
self
.
user2dial
(
user_target_pos
)
self
.
_set_position
(
user_target_pos
)
if
abs
(
dial_target_pos
-
dial_initial_pos
)
<
1E-6
:
if
abs
(
dial_target_pos
-
dial_initial_pos
)
<
to_quantity
(
1E-6
,
unit
)
:
return
elog
.
debug
(
"prepare_move : user_initial_pos=
%g
user_target_pos=
%g"
%
(
user_initial_pos
,
user_target_pos
)
+
" dial_target_pos=
%g
dial_intial_pos=
%g
relative=
%s"
%
(
dial_target_pos
,
dial_initial_pos
,
relative
))
elog
.
debug
(
"prepare_move : user_initial_pos=
{0}
user_target_pos=
{1}"
.
format
(
user_initial_pos
,
user_target_pos
)
+
" dial_target_pos=
{0}
dial_intial_pos=
{1}
relative=
{2}"
.
format
(
dial_target_pos
,
dial_initial_pos
,
relative
))
# all positions are converted to controller units
backlash
=
self
.
backlash
/
self
.
sign
*
self
.
steps_per_unit
...
...
@@ -678,7 +764,8 @@ class Axis(object):
target_pos
=
dial_target_pos
*
self
.
steps_per_unit
if
backlash
:
if
cmp
(
delta
,
0
)
!=
cmp
(
backlash
,
0
):
zero_d
=
to_quantity
(
0
,
unit
)
if
cmp
(
delta
,
zero_d
)
!=
cmp
(
backlash
,
zero_d
):
# move and backlash are not in the same direction;
# apply backlash correction, the move will happen
# in 2 steps
...
...
@@ -702,18 +789,22 @@ class Axis(object):
high_limit
,
low_limit
=
low_limit
,
high_limit
user_high_limit
,
user_low_limit
=
user_low_limit
,
user_high_limit
backlash_str
=
" (with
%f
backlash)"
%
self
.
backlash
if
backlash
else
""
backlash_str
=
" (with
{0}
backlash)"
.
format
(
self
.
backlash
)
if
backlash
else
""
if
user_low_limit
is
not
None
:
if
target_pos
<
low_limit
:
raise
ValueError
(
"
%s
: move to `
%f'%s
would go below low limit (
%f)"
%
(
self
.
name
,
user_target_pos
,
backlash_str
,
user_low_limit
))
"
{0}
: move to `
{1}'{2}
would go below low limit (
{3})"
.
format
(
self
.
name
,
user_target_pos
,
backlash_str
,
user_low_limit
))
if
user_high_limit
is
not
None
:
if
target_pos
>
high_limit
:
raise
ValueError
(
"
%s
: move to `
%f' %s
would go beyond high limit (
%f)"
%
(
self
.
name
,
user_target_pos
,
backlash_str
,
user_high_limit
))
"
{0}
: move to `
{1}'{2}
would go beyond high limit (
{3})"
.
format
(
self
.
name
,
user_target_pos
,
backlash_str
,
user_high_limit
))
# need to strip units out (controller is not unit aware)
target_pos
=
to_magnitude
(
target_pos
,
unit
)
delta
=
to_magnitude
(
delta
,
unit
)
backlash
=
to_magnitude
(
backlash
,
unit
)
motion
=
Motion
(
self
,
target_pos
,
delta
)
motion
.
backlash
=
backlash
...
...
@@ -738,11 +829,18 @@ class Axis(object):
pass
except
:
sys
.
excepthook
(
*
sys
.
exc_info
())
# update settings;
# as update is done before move done is set,
# we need to read state from hardware to get it right
# (it would return 'MOVING' otherwise)
# this update is very important for position, to have
# final position ok for waiters on move done event
self
.
_update_settings
(
state
=
self
.
state
(
read_hw
=
True
))
self
.
__move_done
.
set
()
event
.
send
(
self
,
"move_done"
,
True
)
def
_check_ready
(
self
):
initial_state
=
self
.
state
()
initial_state
=
self
.
state
(
read_hw
=
True
)
if
initial_state
!=
"READY"
:
raise
RuntimeError
(
"axis %s state is
\
%r"
%
(
self
.
name
,
str
(
initial_state
)))
...
...
@@ -774,7 +872,8 @@ class Axis(object):
position or True if it is given in relative position
polling_time (float): motion loop polling time (seconds)
"""
elog
.
debug
(
"user_target_pos=%g wait=%r relative=%r"
%
(
user_target_pos
,
wait
,
relative
))
elog
.
debug
(
"user_target_pos={0} wait={1!r} relative={2!r}"
.
format
(
user_target_pos
,
wait
,
relative
))
if
self
.
__controller
.
is_busy
():
raise
RuntimeError
(
"axis %s: controller is busy"
%
self
.
name
)
self
.
_check_ready
()
...
...
@@ -822,8 +921,8 @@ class Axis(object):
enc_dial
=
self
.
encoder
.
read
()
curr_pos
=
self
.
_read_dial_and_update
()
if
abs
(
curr_pos
-
enc_dial
)
>
self
.
encoder
.
tolerance
:
raise
RuntimeError
(
"'
%s
' didn't reach final position.(enc_dial=
%g
, curr_pos=
%g)"
%
(
self
.
name
,
enc_dial
,
curr_pos
))
raise
RuntimeError
(
"'
{0}
' didn't reach final position.(enc_dial=
{1}
, curr_pos=
{2})"
.
format
(
self
.
name
,
enc_dial
,
curr_pos
))
def
_do_handle_move
(
self
,
motion
,
polling_time
):
with
error_cleanup
(
self
.
_cleanup_stop
):
...
...
@@ -859,7 +958,7 @@ class Axis(object):
wait (bool): wait or not for end of motion
polling_time (float): motion loop polling time (seconds)
"""
elog
.
debug
(
"user_delta_pos=
%g
wait=
%r"
%
(
user_delta_pos
,
wait
))
elog
.
debug
(
"user_delta_pos=
{0}
wait=
{1!r}"
.
format
(
user_delta_pos
,
wait
))
return
self
.
move
(
user_delta_pos
,
wait
,
relative
=
True
,
polling_time
=
polling_time
)
def
wait_move
(
self
):
...
...
@@ -883,13 +982,15 @@ class Axis(object):
except
gevent
.
GreenletExit
:
pass
def
_wait_move
(
self
,
polling_time
=
DEFAULT_POLLING_TIME
,
ctrl_state_funct
=
'state'
):
def
_wait_move
(
self
,
polling_time
=
DEFAULT_POLLING_TIME
,
update_settings
=
False
,
ctrl_state_funct
=
'state'
):
while
True
:
state_funct
=
getattr
(
self
.
__controller
,
ctrl_state_funct
)
state
=
state_funct
(
self
)
self
.
_update_settings
(
state
)
if
state
!=
"MOVING"
:
return
state
if
update_settings
:
self
.
_update_settings
(
state
)
gevent
.
sleep
(
polling_time
)
def
_cleanup_stop
(
self
,
jog
=
False
):
...
...
@@ -1213,7 +1314,7 @@ class AxisState(object):
# (copy constructor)
if
'('
in
state
:
full_states
=
[
s
.
strip
()
for
s
in
state
.
split
(
'|'
)]
p
=
re
.
compile
(
'^([A-Z
0-9
]+)\s\((.+)\)$'
)
p
=
re
.
compile
(
'^([A-Z]+)\s\((.+)\)$'
)
for
full_state
in
full_states
:
m
=
p
.
match
(
full_state
)
state
=
m
.
group
(
1
)
...
...
bliss/common/standard.py
View file @
8aaef0b9
...
...
@@ -33,6 +33,7 @@ from bliss.config.settings import QueueSetting
from
bliss.controllers.motor_group
import
Group
from
bliss.common.units
import
ureg
as
ur
_ERR
=
'!ERR'
_MAX_COLS
=
9
...
...
@@ -302,7 +303,14 @@ def __row_positions(positions, motors, fmt, sep=' '):
def
__row
(
cols
,
fmt
,
sep
=
' '
):
return
sep
.
join
([
format
(
col
,
fmt
)
for
col
in
cols
])
data
=
[]
for
col
in
cols
:
if
isinstance
(
col
,
ur
.
Quantity
):
col
=
str
(
col
)
else
:
col
=
format
(
col
,
fmt
)
data
.
append
(
col
)
return
sep
.
join
(
data
)
def
__umove
(
*
args
,
**
kwargs
):
...
...
bliss/common/units.py
0 → 100644
View file @
8aaef0b9
from
pint
import
UnitRegistry
ureg
=
UnitRegistry
()
# use short ASCII print format
ureg
.
default_format
=
'~'
bliss/controllers/motors/mockup.py
View file @
8aaef0b9
...
...
@@ -11,7 +11,7 @@ import random
from
bliss.controllers.motor
import
Controller
from
bliss.common
import
log
as
elog
from
bliss.common.axis
import
AxisState
from
bliss.common.axis
import
AxisState
,
to_magnitude
from
bliss.common
import
event
from
bliss.common.utils
import
object_method
...
...
@@ -90,13 +90,13 @@ class Mockup(Controller):
if
axis
.
config
.
get
(
"persistent_position"
,
bool
,
default
=
False
,
inherited
=
True
):
try
:
dial_pos
=
axis
.
settings
.
get
(
"dial_position"
)
dial_pos
=
axis
.
settings
.
get
(
"dial_position"
)
or
0
except
:
pass
def
set_pos
(
move_done
,
axis
=
axis
):
if
move_done
:
self
.
set_position
(
axis
,
axis
.
dial
()
*
axis
.
steps_per_unit
)
dial
=
to_magnitude
(
axis
.
dial
(),
axis
.
unit
)
self
.
set_position
(
axis
,
dial
*
axis
.
steps_per_unit
)
self
.
_axis_moves
[
axis
]
=
{
"measured_simul"
:
False
,
...
...
requirements.txt
View file @
8aaef0b9
...
...
@@ -20,6 +20,7 @@ six >= 1.10
tabulate
sphinx
>= 1.4
pyserial
>= 3.0
pint
# --- optional dependencies --------------------------------
...
...
Write
Preview