Commit 07175ffb authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by bliss administrator
Browse files

Emotion Mockup Controller: improve acceleration/deceleration management

* Consider deceleration in stop and HW limit hit
* Implement home and limit search as standard movements
* Include "t" time argument for synchronised multi-axes operations
parent 95b8c7da
......@@ -5,13 +5,13 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import math
from math import fabs, sqrt, copysign
import time
import random
from bliss.controllers.motor import Controller
from bliss.common import log as elog
from bliss.common.axis import AxisState
from bliss.common.axis import AxisState, Motion
from bliss.common import event
from bliss.common.utils import object_method
......@@ -141,44 +141,72 @@ class Mockup(Controller):
else:
self.__hw_limit = (ll, hl)
def calc_acc_time_disp_vel(self, axis, delta=None, move=None):
a = self.read_acceleration(axis)
v = self.read_velocity(axis)
if move is not None and "start_pos" in move:
start_pos, end_pos = move["start_pos"], move["end_pos"]
delta = None if end_pos is None else end_pos - start_pos
if delta is not None:
v = min(v, sqrt(a * fabs(delta)))
t = v / a
d = v**2 / (2 * a)
return t, d, v
def start_all(self, *motion_list):
if self.__error_mode:
raise RuntimeError("Cannot start because error mode is set")
t0 = time.time()
t = time.time()
for motion in motion_list:
self.start_one(motion, t0=t0)
self.start_one(motion, t=t)
def start_one(self, motion, t0=None):
def start_one(self, motion, t=None):
if self.__error_mode:
raise RuntimeError("Cannot start because error mode is set")
axis = motion.axis
t0 = t0 or time.time()
pos = self.read_position(axis)
v = self.read_velocity(axis)
ll, hl = self.__hw_limit
t = t or time.time()
pos = self.read_position(axis, t=t)
end_pos = motion.target_pos
if hl is not None and end_pos > hl:
end_pos = hl
if ll is not None and end_pos < ll:
end_pos = ll
delta = motion.delta + end_pos - motion.target_pos
delta = end_pos - pos
if fabs(delta - motion.delta) > 1.5:
msg = ("Wrong motion delta: curr_pos=%s, target_pos=%s, delta=%s" %
(pos, end_pos, motion.delta))
raise RuntimeError(msg)
state = self._check_hw_limits(axis, t=t)
if ((state == 'LIMPOS' and delta > 0) or
(state == 'LIMNEG' and delta < 0)):
raise RuntimeError(str(state))
acc_t, acc_d, acc_v = self.calc_acc_time_disp_vel(axis, delta=delta)
s = copysign(1, delta)
l = self.__hw_limit[0 if s < 0 else 1]
if l is not None and s * (end_pos - l) > acc_d:
acc_d = min(acc_d, s * (l - pos))
end_pos = l + s * acc_d
delta = end_pos - pos
acc_t, acc_d, acc_v = self.calc_acc_time_disp_vel(axis, delta=delta)
move_t = 2 * acc_t + (fabs(delta) - 2 * acc_d) / acc_v
self._axis_moves[axis].update({
"start_pos": pos,
"delta": delta,
"dir": s,
"end_pos": end_pos,
"end_t": t0 + math.fabs(delta) / float(v),
"t0": t0})
"end_t": t + move_t,
"t0": t})
def start_jog(self, axis, velocity, direction):
t0 = time.time()
pos = self.read_position(axis)
def start_jog(self, axis, velocity, direction, t=None):
t = t or time.time()
pos = self.read_position(axis, t=t)
move_t = 1E9
self.set_velocity(axis, velocity)
self._axis_moves[axis].update({
"start_pos": pos,
"delta": direction,
"dir": direction,
"end_pos": None,
"end_t": t0+1E9,
"t0": t0})
"end_t": t + move_t,
"t0": t})
def read_position(self, axis, t=None):
"""
......@@ -188,20 +216,21 @@ class Mockup(Controller):
# handle read out during a motion
t = t or time.time()
end_t = self._axis_moves[axis]["end_t"]
move = self._axis_moves[axis]
end_t = move["end_t"]
if end_t and t < end_t:
# motor is moving
v = self.read_velocity(axis)
acc_t, acc_d, acc_v = self.calc_acc_time_disp_vel(axis, move=move)
t -= move["t0"]
const_t = end_t - move["t0"] - 2 * acc_t
t1 = min(t, acc_t)
t2 = min(t - t1, const_t)
t3 = t - t1 - t2
a = self.read_acceleration(axis)
d = math.copysign(1, self._axis_moves[axis]["delta"])
dt = t - self._axis_moves[axis]["t0"] # t0=time at start_one.
acctime = min(dt, v/a)
dt -= acctime
pos = self._axis_moves[axis]["start_pos"] + d*a*0.5*acctime**2
if dt > 0:
pos += d * dt * v
d = a * (t1**2 - t3**2) / 2 + acc_v * (t2 + t3)
pos = move["start_pos"] + move["dir"] * d
else:
pos = self._axis_moves[axis]["end_pos"]
pos = move["end_pos"]
return int(round(pos))
......@@ -248,7 +277,7 @@ class Mockup(Controller):
"""
<new_velocity> is in motor units
"""
axis.__vel = new_velocity
axis.__vel = float(new_velocity)
"""
ACCELERATION
......@@ -263,7 +292,7 @@ class Mockup(Controller):
"""
<new_acceleration> is in controller units / s2
"""
axis.__acc = new_acceleration
axis.__acc = float(new_acceleration)
"""
ON / OFF
......@@ -277,9 +306,9 @@ class Mockup(Controller):
"""
Hard limits
"""
def _check_hw_limits(self, axis):
def _check_hw_limits(self, axis, t=None):
ll, hl = self.__hw_limit
pos = self.read_position(axis)
pos = self.read_position(axis, t=t)
if ll is not None and pos <= ll:
return AxisState("READY", "LIMNEG")
elif hl is not None and pos >= hl:
......@@ -289,26 +318,37 @@ class Mockup(Controller):
"""
STATE
"""
def state(self, axis):
def state(self, axis, t=None):
t = t or time.time()
if self._hw_status == "PARKED":
return AxisState("PARKED")
if self._hw_status == "OFF":
return AxisState("OFF")
if self._axis_moves[axis]["end_t"] > time.time():
move = self._axis_moves[axis]
if move["end_t"] > t:
return AxisState("MOVING")
else:
self._axis_moves[axis]["end_t"]=0
return self._check_hw_limits(axis)
move["end_t"] = 0
return self._check_hw_limits(axis, t=t)
"""
Must send a command to the controller to abort the motion of given axis.
"""
def stop(self, axis, t=None):
if self._axis_moves[axis]["end_t"]:
self._axis_moves[axis]["end_pos"] = self.read_position(axis, t=t)
self._axis_moves[axis]["end_t"] = 0
t = t or time.time()
move = self._axis_moves[axis]
end_t = move["end_t"]
acc_t, acc_d, acc_v = self.calc_acc_time_disp_vel(axis, move=move)
if not end_t or t >= end_t - acc_t:
return
pos = self.read_position(axis, t=t)
acc_t = min(acc_t, t - move["t0"])
acc_d = min(acc_d, fabs(pos - move["start_pos"]))
move["end_pos"] = pos + move["dir"] * acc_d
move["end_t"] = t + acc_t
def stop_all(self, *motion_list):
t = time.time()
......@@ -318,31 +358,32 @@ class Mockup(Controller):
"""
HOME and limits search
"""
def home_search(self, axis, switch):
self._axis_moves[axis]["start_pos"] = self._axis_moves[axis]["end_pos"]
self._axis_moves[axis]["end_pos"] = 0
self._axis_moves[axis]["delta"] = 0
self._axis_moves[axis]["end_t"] = 0
self._axis_moves[axis]["t0"] = time.time()
self._axis_moves[axis]["home_search_start_time"] = time.time()
def home_search(self, axis, switch, t=None):
t = t or time.time()
end_pos = 0
pos = self.read_position(axis, t=t)
delta = end_pos - pos
if not delta:
return
acc_t, acc_d, acc_v = self.calc_acc_time_disp_vel(axis)
acc_d = min(acc_d, fabs(delta))
end_pos += copysign(acc_d, delta)
motion = Motion(axis, end_pos, end_pos - pos)
self.start_one(motion, t=t)
# def home_set_hardware_position(self, axis, home_pos):
# raise NotImplementedError
def home_state(self, axis):
if(time.time() - self._axis_moves[axis]["home_search_start_time"]) > 2:
return AxisState("READY")
else:
return AxisState("MOVING")
def limit_search(self, axis, limit):
self._axis_moves[axis]["start_pos"] = self._axis_moves[axis]["end_pos"]
self._axis_moves[axis]["end_pos"] = 1E6 if limit > 0 else -1E6
self._axis_moves[axis]["delta"] = self._axis_moves[axis]["end_pos"] #this is just for direction sign
self._axis_moves[axis]["end_pos"] *= axis.steps_per_unit
self._axis_moves[axis]["end_t"] = time.time() + 2
self._axis_moves[axis]["t0"] = time.time()
def home_state(self, axis, t=None):
return self.state(axis, t=t)
def limit_search(self, axis, limit, t=None):
t = t or time.time()
end_pos = copysign(1E6, limit)
motion = Motion(axis, end_pos, end_pos - self.read_position(axis, t=t))
self.start_one(motion, t=t)
def get_info(self, axis):
return "turlututu chapo pointu : %s" % (axis.name)
......@@ -356,12 +397,14 @@ class Mockup(Controller):
return com + ">-<" + com
def set_position(self, axis, pos):
self._axis_moves[axis]["end_pos"] = pos
self._axis_moves[axis]["end_t"] = 0
move = self._axis_moves[axis]
move["end_pos"] = pos
move["end_t"] = 0
return pos
def put_discrepancy(self, axis, disc):
self._axis_moves[axis]["end_pos"] += disc
move = self._axis_moves[axis]
move["end_pos"] += disc
"""
Custom axis methods
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment