Commit 3a3b2c3a authored by Sebastien Petitdemange's avatar Sebastien Petitdemange Committed by Cyril Guilloud
Browse files

pi-e753: manage low level wave.

parent b78183ad
......@@ -4,11 +4,14 @@
#
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import contextlib
import numpy
from bliss.controllers.motor import Controller
from bliss.common.utils import object_method
from bliss.common.utils import object_attribute_get, object_attribute_set
from bliss.common.utils import object_attribute_get, object_attribute_set, object_method
from bliss.common.axis import AxisState
from bliss.common import axis as axis_module
from bliss.common.logtools import log_debug, log_debug_data, log_info, log_warning
from . import pi_gcs
......@@ -392,8 +395,115 @@ class PI_E753(pi_gcs.Communication, pi_gcs.Recorder, Controller):
return _txt
@object_method
def start_wave(
self, axis, wavetype, offset, amplitude, nb_cycles, wavelen, wait=True
):
"""
Start a simple wav trajectory,
-- wavetype can be LIN for a Linear or
SIN for a sinusoidal.
-- amplitude motor displacement
-- nb_cycles the number of time the motion is repeated.
-- wavelen the time in second that should last the motion
-- wait if you want to wait the end of the motion
"""
# check wavetype can be "LIN" or "SIN"
if wavetype not in ("LIN", "SIN"):
raise ValueError('wavetype can only be "SIN" or "LIN"')
offset *= axis.steps_per_unit
amplitude *= axis.steps_per_unit
servo_cycle = float(self.command("SPA? 1 0xe000200"))
number_of_points = int(self.command("SPA? 1 0x13000004"))
freq_faction = int(numpy.ceil((wavelen / number_of_points) / servo_cycle))
wavelen = round(wavelen / (servo_cycle * freq_faction))
if wavetype == "SIN":
if amplitude < 0: # cycle starting from max
cmd = b"WAV 1 X SIN_P %d %d %d %d %d %d" % (
wavelen,
-amplitude,
offset - amplitude,
wavelen,
0,
(0.5 * wavelen),
)
else:
cmd = b"WAV 1 X SIN_P %d %d %d %d %d %d" % (
wavelen,
+amplitude,
offset,
wavelen,
0,
(0.5 * wavelen),
)
else:
cmd = b"WAV 1 X LIN %d %d %d %d %d %d" % (
wavelen,
amplitude,
offset,
wavelen,
0,
2,
)
if not axis._is_cache_position_disable:
# This to be able to read the position
# during the trajectory
axis.settings.disable_cache("position")
if not axis._is_cache_state_disable:
axis.settings.disable_cache("state")
commands = [
b"WSL 1 1\n",
cmd + b"\n",
b"WTR 0 %d 1\n" % freq_faction,
b"WGC 1 %d\n" % nb_cycles,
b"WGO 1 1\n",
b"ERR?\n",
]
err = self.sock.write_readline(b"".join(commands))
log_debug(self, "Error code %s", err)
if wait:
try:
while self.state(axis) == AxisState("MOVING"):
gevent.sleep(0.1)
except:
self.stop_wave(axis)
raise
@object_method
def stop_wave(self, axis):
try:
self.sock.write(b"WGO 1 0\n")
finally:
if not axis._is_cache_position_disable:
axis.settings.disable_cache("position", flag=False)
if not axis._is_cache_state_disable:
axis.settings.disable_cache("state", flag=False)
class Axis(axis_module.Axis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel = 1
self._is_cache_position_disable = "position" in self.settings._disabled_settings
self._is_cache_state_disable = "state" in self.settings._disabled_settings
@contextlib.contextmanager
@axis_module.lazy_init
def run_wave(self, wavetype, offset, amplitude, nb_cycles, wavelen):
"""
Helper to run a wave (trajectory) during a scan or something like this.
And stop the trajectory at the end
"""
self.controller.start_wave(
self, wavetype, offset, amplitude, nb_cycles, wavelen, wait=False
)
try:
yield
finally:
self.controller.stop_wave(self)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment