Commit ab30619a authored by swhite's avatar swhite
Browse files

added python folder

parent 905414bd
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="projectConfiguration" value="Nosetests" />
<option name="PROJECT_TEST_RUNNER" value="Nosetests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 2.7 (BBA)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/BBA.iml" filepath="$PROJECT_DIR$/.idea/BBA.iml" />
</modules>
</component>
</project>
\ No newline at end of file
This diff is collapsed.
import scipy.io as sio
class Bumps():
def __init__(self,debug=True):
self._bpms = []
self.path = '/mntdirect/_machfs/swhite/Commissioning/BBA/'
for i in range(10):
self._bpms.append(sio.loadmat(self.path+'bpm'+str(i+1)+'.mat')['bump'])
def get_bump(self,bpm_nb,plane):
bump = self._bpms[bpm_nb-1]
if plane =='H':
bump = bump[0]
elif plane=='V':
bump = bump[1]
return bump
import numpy as np
import os
import OrbitController as oc
import time
from scipy.optimize import curve_fit
from scipy.interpolate import interp1d
import Bumps as bumps
class Loops():
def __init__(self,debug=True):
if debug:
os.environ['TANGO_HOST'] = 'ebs-simu:10000'
self.oc = oc.OrbitController()
self.bumps = bumps.Bumps()
self.xdatax = []
self.xdatay = []
self.xdatay1 = []
self.ydatax = []
self.ydatay = []
self.ydatay1 = []
self.offsets=[]
self.k1_init=0
self.k0_init=[]
self.quad_name=''
self.cor_name=''
self.dorun = False
def parabola(self,x,a,b,x0):
return b+a*(x-x0)**2
def linear(self,x,a,b):
return b+a*x
def abs_line(self,x,a,b,x0):
return np.abs(b+a*(x-x0))
def fit(self,x,y,p0=None):
try:
p,q = curve_fit(self.parabola,x,y,p0=p0)
except RuntimeError as err:
print 'Fit Failed: {0}'.format(err)
return x,np.ones(len(y))*np.amin(y),p0[2],0.0
offset = p[2]
res = np.sum((self.parabola(x,*p)-y)**2)
x_new = np.linspace(np.amin(x),np.amax(x))
y_new = self.parabola(x_new,*p)
return x_new,y_new,offset,res
def analyze(self,bpm_pos,std_orb,steps):
x_new1,y_new1,offset1,res1 = self.fit(bpm_pos,std_orb,p0=[1.0,0,bpm_pos[std_orb.index(np.amin(std_orb))]])
p1,q1 = curve_fit(self.linear,bpm_pos,steps)
x_new1 = self.linear(x_new1,*p1)
x_new2, y_new2, offset2,res2 = self.fit(steps, std_orb, p0=[1.0,0,steps[std_orb.index(np.amin(std_orb))]])
p2, q2 = curve_fit(self.linear,steps, bpm_pos)
offset2 = self.linear(offset2,*p2)
return [[x_new1, y_new1],[x_new2, y_new2]], [offset1, self.linear(offset1,*p1), offset2, self.linear(offset2,*p1)]
def compute_offset(self,bpm_posx,std_orbx,bpm_posy,std_orby,stepsx,stepsy):
xcoord, xoffset = self.analyze(bpm_posx,std_orbx,stepsx)
ycoord, yoffset = self.analyze(bpm_posy,std_orby,stepsy)
return xcoord,xoffset,ycoord,yoffset
def one_step_cor(self,cor_name,quad_name,plane,dk0,k0_init,dk1,k1_init,bpmi,ts):
if plane == 'HV':
self.oc.trim_cor(cor_name[0], dk0+k0_init[0])
self.oc.trim_cor(cor_name[1], dk0+k0_init[1])
else:
self.oc.trim_cor(cor_name, dk0+k0_init)
time.sleep(ts)
orb0 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, dk1, increment=True)
time.sleep(ts)
orbp1 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, -2.0 * dk1, increment=True)
time.sleep(ts)
orbm1 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, k1_init)
avorbx = (np.array(orbp1[0]) - np.array(orbm1[0])) * 0.5
sx2 = np.sum(np.square(avorbx))
x = orb0[0][bpmi]
avorby = (np.array(orbp1[1]) - np.array(orbm1[1])) * 0.5
sy2 = np.sum(np.square(avorby))
y = orb0[1][bpmi]
return x,sx2,y,sy2
def set_data(self,xx1,xy1,xy2,yx1,yy1,yy2):
self.xdatax = xx1
self.xdatay = xy1
self.xdatay1 = xy2
self.ydatax = yx1
self.ydatay = yy1
self.ydatay1 = yy2
def set_offset(self,xoffset,yoffset):
self.offsets = [xoffset,yoffset]
def apply_offset(self,bpmi,plane):
self.oc.set_offsets(bpmi,self.offsets,plane)
def get_xdata(self):
return self.xdatax,self.xdatay,self.xdatay1
def get_offsets(self):
return self.offsets
def get_ydata(self):
return self.ydatax,self.ydatay,self.ydatay1
def reset(self,plane,AppendtextCom):
AppendtextCom.data_signal.emit('Scan cancelled moving back to initial position')
if plane == 'HV':
self.oc.trim_cor(self.cor_name[0], self.k0_init[0])
self.oc.trim_cor(self.cor_name[1], self.k0_init[1])
else:
self.oc.trim_cor(self.cor_name, self.k0_init)
self.oc.trim_quad(self.quad_name, self.k1_init)
def scan_single_cor(self,bpmi,plane,k0min,k0max,nsteps,dk1,ts,AppendtextCom):
bpm_name = self.oc.get_bpm_name(bpmi)
meci = self.oc.get_mec(bpmi, plane)
self.cor_name = self.oc.get_cor_name(meci, plane)
self.quad_name, quadi = self.oc.get_quad_to_align(bpm_name)
self.dorun = True
AppendtextCom.data_signal.emit('BPM to align:'+'\n'
+str(bpmi)+' '+str(bpm_name)+'\n'
+'Most effective corrector:'+'\n'
+str(meci)+' '+str(self.cor_name)+'\n'
+'Quadrupole to align:'+'\n'
+str(quadi)+' '+str(self.quad_name))
corvals = self.oc.get_cor_values(plane)
if plane =='HV':
self.k0_init = [corvals[0][meci[0]],corvals[1][meci[1]]]
else:
self.k0_init = corvals[meci]
self.k1_init = self.oc.get_quad_values()[quadi]
steps=np.linspace(k0min,k0max,nsteps)
std_orbx = []
bpm_posx = []
std_orby = []
bpm_posy = []
self.set_data([],[],[],[],[],[])
for step in steps:
if self.dorun:
AppendtextCom.data_signal.emit("step:"+' '+str(step))
x, sx2, y, sy2 = self.one_step_cor(self.cor_name, self.quad_name, plane, step,
self.k0_init, dk1, self.k1_init, bpmi, ts)
std_orbx.append(sx2)
bpm_posx.append(x)
std_orby.append(sy2)
bpm_posy.append(y)
self.set_data([steps[0:len(bpm_posx)]],[std_orbx],[bpm_posx],
[steps[0:len(bpm_posy)]],[std_orby],[bpm_posy])
else:
self.reset(plane,AppendtextCom)
return
if plane == 'HV':
self.oc.trim_cor(self.cor_name[0], self.k0_init[0])
self.oc.trim_cor(self.cor_name[1], self.k0_init[1])
xcoord,xoffset,ycoord,yoffset = self.compute_offset(bpm_posx,std_orbx,bpm_posy,
std_orby, steps+self.k0_init[0], steps+self.k0_init[1])
self.set_offset(xoffset,yoffset)
self.set_data([steps+self.k0_init[0],xcoord[0][0],xcoord[1][0]],[std_orbx,xcoord[0][1],xcoord[1][1]], [bpm_posx],
[steps+self.k0_init[1],ycoord[0][0],ycoord[1][0]],[std_orby,ycoord[0][1],ycoord[1][1]], [bpm_posy])
else:
self.oc.trim_cor(self.cor_name, self.k0_init)
xcoord,xoffset,ycoord,yoffset = self.compute_offset(bpm_posx, std_orbx, bpm_posy,
std_orby, steps+self.k0_init, steps+self.k0_init)
self.set_offset(xoffset,yoffset)
self.set_data([steps+self.k0_init,xcoord[0][0],xcoord[1][0]],[std_orbx,xcoord[0][1],xcoord[1][1]], [bpm_posx],
[steps+self.k0_init,ycoord[0][0],ycoord[1][0]],[std_orby,ycoord[0][1],ycoord[1][1]], [bpm_posy])
def find_mini_cor(self,bpmi,plane,dk0,dk1,ts,AppendtextCom):
if plane=='HV':
print AppendtextCom.data_signal.emit('HV not allowed in this mode (find_mini)')
return
bpm_name = self.oc.get_bpm_name(bpmi)
meci = self.oc.get_mec(bpmi, plane)
self.cor_name = self.oc.get_cor_name(meci, plane)
self.quad_name, quadi = self.oc.get_quad_to_align(bpm_name)
self.dorun = True
AppendtextCom.data_signal.emit('BPM to align:'+'\n'
+str(bpmi)+' '+str(bpm_name)+'\n'
+'Most effective corrector:'+'\n'
+str(meci)+' '+str(self.cor_name)+'\n'
+'Quadrupole to align:'+'\n'
+str(quadi)+' '+str(self.quad_name))
corvals = self.oc.get_cor_values(plane)
self.k0_init = corvals[meci]
self.k1_init = self.oc.get_quad_values()[quadi]
std_orbx = []
bpm_posx = []
std_orby = []
bpm_posy = []
steps = []
self.set_data([], [], [], [], [], [])
AppendtextCom.data_signal.emit("Initial value")
delta = -1
direct= 0.0
while delta<0 and self.dorun==True:
step = self.oc.get_cor_values(plane)[meci]+direct*dk0
x, sx2, y, sy2 = self.one_step_cor(self.cor_name, self.quad_name, plane, step,
0.0, dk1, self.k1_init, bpmi, ts)
std_orbx.append(sx2)
bpm_posx.append(x)
std_orby.append(sy2)
bpm_posy.append(y)
steps.append(step)
self.set_data([steps], [std_orbx], [bpm_posx],
[steps], [std_orby], [bpm_posy])
if plane =='H':
delta = sx2-std_orbx[len(std_orbx)-2]
else:
delta = sy2 - std_orby[len(std_orby) - 2]
if direct ==0:
AppendtextCom.data_signal.emit("Moving in the positive direction")
direct = 1.0
delta = -1.0
elif delta > 0 and direct ==1.0:
AppendtextCom.data_signal.emit("Moving in the negative direction")
direct = -1.0
delta = -1.0
elif delta > 0 and direct == -1.0:
AppendtextCom.data_signal.emit("Last step in the negative direction")
step = self.oc.get_cor_values(plane)[meci] + direct*dk0
x, sx2, y, sy2 = self.one_step_cor(self.cor_name, self.quad_name, plane, step,
0.0, dk1, self.k1_init, bpmi, ts)
std_orbx.append(sx2)
bpm_posx.append(x)
std_orby.append(sy2)
bpm_posy.append(y)
steps.append(step)
self.set_data([steps], [std_orbx], [bpm_posx],
[steps], [std_orby], [bpm_posy])
if self.dorun:
self.oc.trim_cor(self.cor_name,self.k0_init)
xcoord,xoffset,ycoord,yoffset = self.compute_offset(bpm_posx, std_orbx, bpm_posy,
std_orby, steps, steps)
self.set_offset(xoffset,yoffset)
self.set_data([steps,xcoord[0][0],xcoord[1][0]],[std_orbx,xcoord[0][1],xcoord[1][1]], [bpm_posx],
[steps,ycoord[0][0],ycoord[1][0]],[std_orby,ycoord[0][1],ycoord[1][1]], [bpm_posy])
else:
self.reset(plane, AppendtextCom)
return
def one_step_bump(self,bump,quad_name,plane,amp,dk1,k1_init,bpmi,ts):
self.oc.set_bump(plane,bump,bpmi,amp)
time.sleep(ts)
orb0 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, dk1, increment=True)
time.sleep(ts)
orbp1 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, -2.0 * dk1, increment=True)
time.sleep(ts)
orbm1 = self.oc.get_orbit()
self.oc.trim_quad(quad_name, k1_init)
avorbx = (np.array(orbp1[0]) - np.array(orbm1[0])) * 0.5
sx2 = np.sum(np.square(avorbx))
x = orb0[0][bpmi]
avorby = (np.array(orbp1[1]) - np.array(orbm1[1])) * 0.5
sy2 = np.sum(np.square(avorby))
y = orb0[1][bpmi]
return x,sx2,y,sy2
def scan_single_bump(self,bpmi,plane,amp_min,amp_max,nsteps,dk1,ts):
bpm_name = self.oc.get_bpm_name(bpmi)
quad_name, quadi = self.oc.get_quad_to_align(bpm_name)
print 'BPM to align:'
print bpmi, bpm_name
print 'Quadrupole to align:'
print quadi, quad_name
bpm_nb,cell = self.oc.get_bpm_number_cell(bpm_name)
bump = self.bumps.get_bump(bpm_nb,plane)
ref_orbit = self.oc.get_ref_orbit(plane)
k1_init = self.oc.get_quad_values()[quadi]
steps=np.linspace(amp_min,amp_max,nsteps)
std_orbx = []
bpm_posx = []
std_orby = []
bpm_posy = []
for step in steps:
print "step:", step
x, sx2, y, sy2 = self.one_step_bump(bump,quad_name,plane,step,
dk1,k1_init,bpmi,ts)
std_orbx.append(sx2)
bpm_posx.append(x)
std_orby.append(sy2)
bpm_posy.append(y)
self.oc.set_ref_orbit(self, plane, ref_orbit)
self.plot(bpm_posx, std_orbx, bpm_posy, std_orby,
steps,steps, bpm_name, quad_name)
import tango
import csv
import numpy as np
import os
from enum import Enum
class EbsMode(Enum):
TbT = 0
Atlinopt = 1
Atx = 2
Mixed = 3
DetailedAtx = 4
class OrbitController():
# Class to get data from BPM
# Read/correct/set closed orbit
#drive bumps, correctors, quadrupoles
def __init__(self,debug=True):
if debug:
os.environ['TANGO_HOST'] = 'ebs-simu:10000'
simulator = tango.DeviceProxy('sys/ringsimulator/ebs')
simulator.write_attribute('Mode',EbsMode.Atlinopt.value)
self._BPMsdev = tango.DeviceProxy('srdiag/beam-position/all')
self._Quadsdev = tango.DeviceProxy('srmag/m-q/all')
self._corHdev = tango.DeviceProxy('srmag/hst/all')
self._corVdev = tango.DeviceProxy('srmag/vst/all')
self._autocor = tango.DeviceProxy('sr/beam-orbitcor/svd-auto')
self._svd_h = tango.DeviceProxy('sr/beam-orbitcor/svd-h')
self._svd_v = tango.DeviceProxy('sr/beam-orbitcor/svd-v')
self._quad_map = {1: ['qf1', 'a'],
2: ['qf4', 'a'],
3: ['qf4', 'b'],
4: ['qf6', 'b'],
5: ['qf8', 'b'],
6: ['qf8', 'd'],
7: ['qf6', 'd'],
8: ['qf4', 'd'],
9: ['qf4', 'e'],
10: ['qf1', 'e']}
hmfile = self._svd_h.get_property('SVDFileName')
vmfile = self._svd_v.get_property('SVDFileName')
hmfile = hmfile['SVDFileName'][0]
vmfile = vmfile['SVDFileName'][0]
self._hmat = self.load_respmat(hmfile)
self._vmat = self.load_respmat(vmfile)
def load_respmat(self, mfile):
mat = []
f = open(mfile,'r')
r = csv.reader(f)
for line in r:
ld = [float(x) for x in line]
mat.append(ld)
f.close()
return mat
def get_cor_name(self, index, plane):
names = []
if plane == 'H':
names = self._corHdev.read_attribute('CorrectorNames').value
elif plane == 'V':
names = self._corVdev.read_attribute('CorrectorNames').value
elif plane == 'HV':
names = [self._corHdev.read_attribute('CorrectorNames').value,
self._corVdev.read_attribute('CorrectorNames').value]
return [names[0][index[0]],names[1][index[1]]]
else:
print 'Plane is H,V or HV'
return names[index]
def get_cor_index(self, name):
if 'hst' in name:
names = self._corHdev.read_attribute('CorrectorNames').value
elif 'vst' in name:
names = self._corVdev.read_attribute('CorrectorNames').value
else:
print 'Error Corrector name: '+name
return names.index(name)
def get_cor_values(self, plane):
values=np.nan
if plane == 'H':
values = self._corHdev.read_attribute('Strengths').w_value
elif plane == 'V':
values = self._corVdev.read_attribute('Strengths').w_value
elif plane == 'HV':
values = [self._corHdev.read_attribute('Strengths').w_value,
self._corVdev.read_attribute('Strengths').w_value]
else:
print 'Plane is H,V or HV'
return values
def set_cor_values(self, plane,values):
if 'H' in plane:
self._corHdev.write_attribute('Strengths',values)
if 'V' in plane:
self._corVdev.write_attribute('Strengths',values)
def trim_cor(self,name,value,increment=False):
cor = tango.DeviceProxy(name)
corval = cor.read_attribute('Strength').w_value
if increment:
corval = corval + value
else:
corval = value
cor.write_attribute('Strength',corval)
def trim_cor_all(self,name,plane,value,increment=False):
cori = self.get_cor_index(name)
vals = self.get_cor_values(plane)
if increment:
vals[cori] = vals[cori]+value
else:
vals[cori] = value
self.set_cor_values(plane,vals)
def get_bpm_name(self, index):
return self._BPMsdev.command_inout('GetBPMName',index)
def get_bpm_index(self, name):
return self._BPMsdev.command_inout('GetBPMIndex',name)
def get_orbit(self):
orbit = [self._BPMsdev.read_attribute('SA_HPositions').value,
self._BPMsdev.read_attribute('SA_VPositions').value]
return orbit
def get_offets(self):
offsets = [self._BPMsdev.read_attribute('HOffsets').value,
self._BPMsdev.read_attribute('VOffsets').value]
return offsets
def reset_offets(self):
self._BPMsdev.write_attribute('HOffsets',np.zeros(320))
self._BPMsdev.write_attribute('VOffsets',np.zeros(320))
def set_offsets(self,bpmi,values,plane):
offsets = self.get_offets()
offsets[0][bpmi] += -values[0][0]
offsets[1][bpmi] += -values[1][0]
if 'H' in plane:
self._BPMsdev.write_attribute('HOffsets', offsets[0])
if 'V' in plane:
self._BPMsdev.write_attribute('VOffsets', offsets[1])
def get_ref_orbit(self,plane):
orbit = []
if plane == 'H':
orbit = self._svd_h.read_attribute('RefOrbit').value
elif plane == 'V':
orbit = self._svd_v.read_attribute('RefOrbit').value
elif plane == 'HV':
orbit = [self._svd_h.read_attribute('RefOrbit').value,
self._svd_v.read_attribute('RefOrbit').value]
else:
print 'Plane is H, V or HV'
return orbit
def set_ref_orbit(self,plane,orbit):
if 'H' in plane:
self._svd_h.write_attribute('RefOrbit',orbit)
if 'V' in plane:
self._svd_v.write_attribute('RefOrbit',orbit)
def reset_ref_orbit(self):
self.set_ref_orbit('HV',np.zeros((2,320)))
def set_bump(self,plane,bump,bpmi,amp):
amp = amp*1.0e3
indm = len(bump)/2+1
ref_orbit = self.get_ref_orbit(plane)
if plane == 'H' or plane == 'V':
norm = bump[indm]
new_ref = ref_orbit
new_ref[bpmi-indm+1:bpmi+indm] = bump[:]*amp/norm
elif plane == 'HV':
norm = bump[:][indm]
new_ref = ref_orbit
new_ref[0][bpmi - indm+1:bpmi + indm] = bump[0][:] * amp / norm[0]
new_ref[1][bpmi - indm+1:bpmi + indm] = bump[1][:] * amp / norm[1]
self.set_ref_orbit(plane,new_ref)
def get_quad_name(self, index):
names = self._Quadsdev.read_attribute('MagnetNames').value
return names[index]