GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit 5a52ad2d authored by bliss administrator's avatar bliss administrator

Cyclic trajectory test for icepap motor with bliss

parent 0126e3e0
import sys
import gevent
import numpy as np
import time
import PyTango
import bliss
from bliss import setup_globals, global_map
from bliss.common.scans import ascan
from bliss.common.logtools import log_info,log_debug
from bliss.common import session
from bliss.scanning.acquisition.motor import MotorMaster
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss.scanning.chain import AcquisitionChain, AcquisitionMaster
from bliss.scanning.scan import Scan, StepScanDataWatch
from bliss.physics import trajectory
from bliss.common.axis import CyclicTrajectory
from bliss.common.motor_group import TrajectoryGroup
def pos_mot():
deg = np.linspace(0,360,360)
pos = np.round(np.sin(np.deg2rad(deg)),4)
return pos
def cyclictrajectoryscan(motor, positions, nbcycles, frequency, expotime, scan_info=None, sleep_time=None):
chain = AcquisitionChain(parallel_prepare=True)
time_per_cycle = 1/frequency
# Motor master
motor_master = AxisCyclicTrajectoryMaster(motor, positions, nbcycles, time_per_cycle)
motor_master.trajectory.prepare()
motor_master.trajectory.move_to_start()
motor_traj_task = motor_master.trajectory.move_to_end
if scan_info is None:
scan_info = dict()
tasks=[gevent.spawn(motor_traj_task),gevent.spawn(timescan,expotime,sleep_time=sleep_time,scan_info=scan_info)]
try:
tasks[0].join()
gevent.killall(tasks)
except:
gevent.killall(tasks)
print("\n")
raise
class AxisCyclicTrajectoryMaster(AcquisitionMaster):
def __init__(
self,
axis,
axis_positions,
nb_cycles=1,
time_per_cycle=1.0,
trigger_type=AcquisitionMaster.SOFTWARE,
**keys,
):
AcquisitionMaster.__init__(
self, axis, trigger_type=trigger_type, **keys
)
self.movable = axis
nb_points = len(axis_positions)
time_array = numpy.linspace(0.0, time_per_cycle, nb_points)
# Position time array
pt = trajectory.PointTrajectory()
pt.build(time_array,{axis.name:axis_positions})
# check max velocity and acceleration
if pt.max_velocity[f'{axis.name}'] > axis.velocity:
raise RuntimeError(f"axis {axis.name} can not reach this speed,"
f" is limited to {axis.velocity} and should reach {pt.max_velocity}")
if pt.max_acceleration[f'{axis.name}'] > axis.acceleration:
raise RuntimeError(f"axis {axis.name} can not reach this acceleration,"
f" is limited to {axis.acceleration} and should reach {pt.max_acceleration}")
# to check limits -> limits = pt.limits
# ....
#transform PT array into PVT (Position Velocity Time) array
# using defined axis acceleration
pt._velocity[f'{axis.name}'] = abs(pt._velocity[f'{axis.name}'])
pvt = pt.pvt(acceleration_start_end={axis.name:axis.acceleration})
axis_trajectory = CyclicTrajectory(axis, pvt[axis.name], nb_cycles)
self.trajectory = TrajectoryGroup(axis_trajectory)
def prepare(self):
self.trajectory.prepare()
self.trajectory.move_to_start()
def start(self):
self.trigger()
def trigger(self):
if self.trigger_type == AcquisitionMaster.SOFTWARE:
self.trigger_slaves()
self.trajectory.move_to_end()
def trigger_ready(self):
return not self.trajectory.is_moving
def wait_ready(self):
self.trajectory.wait_move()
def stop(self):
self.trajectory.stop()
#def cyclictrajectoryscan(motor, positions, nbcycles, frequency, expotime, scan_info=None, title=None, sleep_time=None, save=True, run=True, return_scan=True):
#chain = AcquisitionChain(parallel_prepare=True)
#time_per_cycle = 1/frequency
## Motor master
#motor_master = AxisCyclicTrajectoryMaster(motor, positions, nbcycles, time_per_cycle)
#if scan_info is None:
#scan_info = dict()
#scan_info.update(
#{
#"type": 'cyclictrajectoryscan',
#"save": save,
#"sleep_time": sleep_time,
#}
#)
#scan_type = 'cyclictrajectoryscan'
#if title is None:
#args = scan_type, expotime
#template = " ".join(["{{{0}}}".format(i) for i in range(len(args))])
#scan_info["title"] = template.format(*args)
#scan_info.update({"npoints": 0, "count_time": expotime})
##_log.info("Doing %s", scan_type)
#scan_params = {"npoints": 0, "count_time": expotime, "type": scan_type}
#chain = setup_globals.DEFAULT_CHAIN.get(scan_info, [], motor_master)
#scan = Scan(
#chain,
#scan_info=scan_info,
#name='cyclictrajectoryscan',
#save=save,
#data_watch_callback=StepScanDataWatch(),
#)
#if run:
#scan.run()
#if return_scan:
#return scan
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment