Commit cfd17bb3 authored by Yoann Sallaz Damaz's avatar Yoann Sallaz Damaz
Browse files

add fastscan and change optics code

parent 2aba80e4
Pipeline #38909 failed with stages
from bliss.common.counter import SamplingCounter, SamplingMode
from bliss.config.static import get_config
from bliss.controllers.tango_attr_as_counter import tango_attr_as_counter
from bliss.common import tango
import termplotlib as tpl
import numpy as np
from bliss.shell.standard import umvr, umv
from time import sleep
from math import ceil
class scan_definition:
def __init__(self,values):
self.name = values.get("scan_name")
self.motor = values.get("motor")
self.counters = list(map(int, values.get("counters").split(",")))
self.channel = values.get("channel")
self.mvr_from = values.get("mvr_from")
self.mvr_to = values.get("mvr_to")
self.itm_ms = values.get("integration_time_ms")
def scan(self):
self._scan(self)
class fastscan:
def __init__(self, name, config):
self._proxy = tango.DeviceProxy(config["pico"])
self.wago = config["channel_wago"]
self.relays = config["channel_attr_name"]
for scan in config["scans"]:
tempscanobj=scan_definition(scan)
setattr(tempscanobj, "_scan", self.scan_intmax)
setattr(self, scan["scan_name"],tempscanobj)
def start_continuous_mode(self):
if self._proxy.status() != "In continuous reading..." :
self._proxy.command_inout("StartContinuous")
sleep(5)
def set_channel(self, idx):
values=[0]*6
values[idx]=1
self.wago.set(self.relays,values)
def get_channel(self):
return self.wago.get(self.relays)
def scan_intmax(self, scanobj):
print("Launch scan for",scanobj.name)
initialpos = scanobj.motor.position
print("trigger channel set to",scanobj.channel)
self.set_channel(scanobj.channel)
print("Prepositionning")
scanobj.motor.apply_config(reload=True)
umv(scanobj.motor, initialpos+scanobj.mvr_from)
max_step_per_sec = 1000/(scanobj.itm_ms+2)
max_velocity = max_step_per_sec / scanobj.motor.steps_per_unit
velocity = min(max_velocity, scanobj.motor.velocity)
nbsteptoaccel = ceil(scanobj.motor.steps_per_unit*velocity*velocity/scanobj.motor.acceleration)
print("Velocity set to", velocity)
print(nbsteptoaccel, "point at the beging and end will be erase due to accel.")
scanobj.motor.velocity = velocity
print("Set pico in intmax mode")
self._proxy.command_inout("StartIntmax", scanobj.itm_ms*10) #300 = 30ms
sleep(1)
print("Scanning")
umv(scanobj.motor, initialpos+scanobj.mvr_to)
sleep(1)
self._proxy.command_inout("Stop")
sleep(1)
scanobj.motor.apply_config(reload=True)
buffer = self._proxy.command_inout("ReadIBuffer")
self.scanresult = []
for i in range(nbsteptoaccel,len(buffer)-nbsteptoaccel,4):
value=0
for j in scanobj.counters:
value+= buffer[i+j]
self.scanresult.append(value)
# print(self.scanresult)
maximum = max(self.scanresult)
index_maximum = np.argmax(self.scanresult)
avg = np.mean(self.scanresult)
x = np.linspace(scanobj.mvr_from+initialpos, scanobj.mvr_to+initialpos, len(self.scanresult)+nbsteptoaccel*2)
x = x[nbsteptoaccel:len(x)-nbsteptoaccel]
# print("maximum =",maximum)
# print("avg =",avg)
# print("indexmax =", index_maximum)
# print("len x =", len(x))
# print("len scanresult =", len(self.scanresult))
# print(x)
if (maximum-avg)/avg > 0 : #marche pas
print("Go to maximum",x[index_maximum])
maximum_to_go = x[index_maximum]
else:
print("Return to initial position and speed")
maximum_to_go = x[int(len(x)/2)]
fig = tpl.figure()
fig.plot(x, self.scanresult, width=300, height=40)
fig.show()
umv(scanobj.motor, maximum_to_go)
......@@ -17,7 +17,7 @@ import sys
##########################################################################
# lambda = 12397.64/eV
# eV = 12397.64/lambda
# http://www.bmsc.washington.edu/scatter/data/V.html
# http://www.bmsc.washington.edu/scatter/AS_periodic.html
tab_nrj=[
("Ag", "K", 0.4859, 25.5140),
("Pd", "K", 0.5091, 24.3503),
......@@ -171,10 +171,14 @@ for valuelist in tab_nrj:
# LAMBDA in angstroem
# VERBOSE >= 2 => full display, 1 => one line output, 0 => silent
##########################################################################
def geofip(LAMBDA=0.97974,VERBOSE=2):
def geofip(LAMBDA_OR_NRJ=0.97974,VERBOSE=2, SLOPE_SOURCE_urad=0):
import sys
import math as math
if LAMBDA_OR_NRJ<2.7: #Angstom
LAMBDA = LAMBDA_OR_NRJ
else: #kev
LAMBDA = 12.39764/LAMBDA_OR_NRJ
#--------------------------------------------------------------------
#VARIABLES DEFINITION
#--------------------------------------------------------------------
......@@ -211,8 +215,11 @@ def geofip(LAMBDA=0.97974,VERBOSE=2):
#-------------------
Z_SOURCE = 0.0 # altitude first mirror
SLOPE_SOURCE = 0.0 # assuming z magnet = 0
#SLOPE_SOURCE urad is positive when rising from source but in geofip SLOPE_SOURCE sign is reverse
SLOPE_SOURCE = -SLOPE_SOURCE_urad*180/math.pi*1E-6 # assuming z magnet = 0
FAN_SOURCE = 0.001663 # div horiz du faisceau, en rad
DIV_V_SOURCE = 0.0002 # div vert du faisceau a 10 keV, en rad FWMH
APERT_V_MASK = 8.0 # ouverture verticale du mask (mm)
......@@ -332,6 +339,7 @@ def geofip(LAMBDA=0.97974,VERBOSE=2):
# ------
if(VERBOSE>=2): print("(max resol with Mar345 at 0 deg: %f Ang)" % (resol))
if(VERBOSE>=2): print("Slope source =%.1f urad (%f deg)" % (SLOPE_SOURCE_urad,SLOPE_SOURCE))
if(VERBOSE>=2): print("bragg angle c1 = %6.4g deg" % (br_angle_c1*180./math.pi))
if(VERBOSE>=2): print("bragg angle c2 = %6.4g deg" % (br_angle_c2*180./math.pi))
if(VERBOSE>=2): print("mono angle = %6.4g deg" % (mono_angle*180./math.pi))
......
......@@ -11,12 +11,12 @@ from bliss.physics import trajectory
from bliss.common import axis
from bliss.common.motor_group import TrajectoryGroup
import time
import math
def change_energy(value):
mv(energy,value)
def trajectoire_mono(nrj_target):
#get all yml optical position
def find_nrj_in_conf(nrj_target):
cc = get_config()
cc.reload()
optical_conf = cc.get("optical_setup")
......@@ -25,33 +25,56 @@ def trajectoire_mono(nrj_target):
list_selected_float = [float(i) for i in list_selected_str]
if nrj_target in list_selected_float:
idx = list_selected_float.index(nrj_target)
final_pos_dict=dict(optical_conf.get(list_selected_str[idx]).get(date_selected[idx]))
confdata = dict(optical_conf.get(list_selected_str[idx]).get(date_selected[idx]))
geofipdata = geofip(nrj_target, VERBOSE=0)
if "alpha1" not in confdata:
confdata.update({"alpha1":round((geofipdata.get("angle_m1")*180./math.pi), 4)})
if "beta" not in confdata:
confdata.update({"beta":round((geofipdata.get("mono_angle")*180./math.pi), 4)})
return confdata
else:
print("nrj not found")
return None
def beam_set(nrj_target, mono_only=False):
final_pos_dict = find_nrj_in_conf(nrj_target)
if final_pos_dict is None:
return
mono_motor_name_list = ["beta", "utx", "utz", "moveh"]
#get usefull object motor
motor_name_list=["beta", "utx", "utz", "moveh"]
if mono_only:
motor_name_list = mono_motor_name_list
else:
motor_name_list = mono_motor_name_list + ["alpha1", "alpha2"]
motor_list_traj = []
motor_list = []
for axis in global_map.get_axes_iter():
if axis.name in mono_motor_name_list:
motor_list_traj.append(axis)
if axis.name in motor_name_list:
motor_list.append(axis)
#comput the time need for the slowest motor to reach target
time_slowest = 0
for motor in motor_list:
for motor in motor_list_traj:
motor.apply_config(reload=True)
distance = abs(motor.position-final_pos_dict.get(motor.name))
moving_time = 2*motor.acctime+distance/motor.velocity
if moving_time > time_slowest:
time_slowest=moving_time
#move motor to reach their target all at the same time (good speed)
motor_need_moving_list = []
for motor in motor_list:
#change velocity for motor in traj
for motor in motor_list_traj:
distance = abs(motor.position-final_pos_dict.get(motor.name))
velocity = max(distance/time_slowest, motor.velocity*0.01)
motor.velocity=velocity
#move motor to reach their target all at the same time (good speed)
motor_need_moving_list = []
for motor in motor_list:
motor.move(final_pos_dict.get(motor.name), wait=False)
motor_need_moving_list.append(motor)
......@@ -129,12 +152,25 @@ def beam_save():
if mot.name=="energy":
motenergy=mot
break
cc = get_config()
cc.reload()
optical_conf = cc.get("optical_setup")
list_nrj_setup = list(optical_conf.keys())
list_nrj_setup.remove("name")
if "selected" in list_nrj_setup:
list_nrj_setup.remove("selected")
list_nrj_setup = np.array([float(i) for i in list_nrj_setup])
list_nrj_setup.sort()
tab_nrj_kev = np.array([i[3] for i in tab_nrj])
#Search for the closest idx
tab_nrj_kev = np.array([i[3] for i in tab_nrj])
idx = (np.abs(tab_nrj_kev-(motenergy.position/1000.))).argmin()
tab_nrj_choice = [(str(i[3]), "%s(%s) %.2fkeV"%(i[0],i[1],i[2])) for i in tab_nrj]
#tab_nrj_choice = [(str(i[3]), "%s(%s) %.2fkeV"%(i[0],i[1],i[2])) for i in tab_nrj]
tab_nrj_choice = [(str(i), "%6s %6s"%(str(i),"-".join(tab_nrj[np.argwhere(tab_nrj_kev==i)[0][0]][0:2]))) for i in list_nrj_setup]
dlg3 = UserChoice(label="Your are currently at %.2fkeV - Choose the energy for saving the current motors positions"%(motenergy.position/1000.), values=tab_nrj_choice, defval=idx)
new_nrg_to_save = BlissDialog( [ [ dlg3 ] ], title='Beam save').show()
if new_nrg_to_save is not False:
......@@ -143,12 +179,13 @@ def beam_save():
cc = get_config()
cc.reload()
optical_conf = cc.get("optical_setup")
dict_pos_mot = {"omega2":omega.position,
dict_pos_mot = {"omega2":omega2.position,
"gamma2":gamma2.position,
"khi2":khi2.position,
"utx":utx.position,
"utz":utz.position,
"moveh":moveh.position
"moveh":moveh.position,
"alpha2":alpha2.position
}
today_str = datetime.datetime.today().strftime('%Y-%m-%d_%H-%M')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment