Commit 9151231b authored by Yoann Sallaz Damaz's avatar Yoann Sallaz Damaz
Browse files

TestWago and scan optic

parent 92dafd2f
Pipeline #43321 failed with stages
from tkinter import *
from functools import partial
from pyModbusTCP.client import ModbusClient
import time
def set_text(e, text):
e["text"]=text
class WAGOTEST(Frame):
def __init__(self):
super().__init__()
self.initUI()
self.connected=False
self.c=None
self.poll()
def poll(self):
self.read_from_wago()
self.master.update_idletasks()
self.master.after_idle(self.poll)
def initUI(self):
self.master.title("Test WAGO Pneumatique")
frame1 = Frame(self)
frame1.pack(fill=X)
lblIP = Label(frame1, text="IP ou Hostname")
lblIP.pack(side=LEFT, padx=5, pady=5)
self.connectbtn = Button(frame1,text="Connecter", command=self.connection)
self.connectbtn.pack(side=RIGHT, padx=5)
self.entryPORT = Entry(frame1)
self.entryPORT.insert(END,"502")
self.entryPORT.pack(padx=5, side=RIGHT)
lblPORT = Label(frame1, text="Port")
lblPORT.pack(side=RIGHT, padx=5, pady=5)
self.entryIP = Entry(frame1)
self.entryIP.insert(END,"160.103.127.27")
self.entryIP.pack(fill=X, padx=5, expand=True)
self.frame2 = Frame(self, relief=RAISED, borderwidth=1)
for i in range(0,8+1+4):
if i==8:
weight=5
else:
weight=1
self.frame2.columnconfigure(i, pad=0, weight=weight)
for i in range(0,1+2+2):
self.frame2.rowconfigure(i, pad=3)
self.stateOn=[]
self.stateOff=[]
for i in range(0,8+1+4):
if i!=8:
lblChan = Label(self.frame2, text=str(i))
lblChan.grid(row=0,column=i, sticky="n")
self.stateOn.append(Canvas(self.frame2, bg="black", height=10, width=10))
self.stateOn[-1].grid(row=1,column=i, sticky="n")
self.stateOff.append(Canvas(self.frame2, bg="black", height=10, width=10))
self.stateOff[-1].grid(row=2,column=i, sticky="n")
on_with_arg = partial(self.enable, i)
btnOn = Button(self.frame2, text="ON", width=5, command=on_with_arg)
btnOn.grid(row=3,column=i, sticky="n")
off_with_arg = partial(self.disable, i)
btnOff = Button(self.frame2, text="OFF", width=5, command=off_with_arg)
btnOff.grid(row=4,column=i,sticky="n")
self.frame2.pack(fill=X)
for child in self.frame2.winfo_children():
child.configure(state='disable')
frame = Frame(self)
frame.pack(fill=BOTH, expand=True)
self.pack(fill=BOTH, expand=True)
self.status = Label(self)
self.status.pack(side=LEFT, padx=5, pady=5, fill=X, expand=True)
def display_enabled(self,channel):
self.stateOn[channel].config(bg="green")
self.stateOff[channel].config(bg="black")
def display_disabled(self,channel):
self.stateOn[channel].config(bg="black")
self.stateOff[channel].config(bg="red")
def enable(self,channel):
try:
if(self.connected):
self.c.write_single_coil(channel,1)
except Exception as e:
set_text(self.status, e)
def disable(self,channel):
try:
if(self.connected):
self.c.write_single_coil(channel,0)
except Exception as e:
set_text(self.status, e)
def read_from_wago(self):
try:
if self.connected:
values=self.c.read_coils(0,12)
for i in range(0,len(values)):
if values[i]:
self.display_enabled(i)
else:
self.display_disabled(i)
except Exception as e:
set_text(self.status, e)
def connection(self):
IP=self.entryIP.get()
PORT=self.entryPORT.get()
if not self.connected:
try:
self.c = ModbusClient(IP,int(PORT), timeout=2)
if not self.c.is_open():
if not self.c.open():
set_text(self.status, "unable to connect to "+IP+":"+PORT)
else:
self.connected=True
else:
self.connected=True
except Exception as e:
set_text(self.status, e)
if self.connected:
set_text(self.status, "Connection OK")
self.connectbtn['text']="Deconnecter"
self.entryIP.config(state='disabled')
self.entryPORT.config(state='disabled')
for child in self.frame2.winfo_children():
child.configure(state='normal')
else:
self.c.close()
set_text(self.status, "Deconnection")
self.connectbtn['text']="Connecter"
self.entryIP.config(state='normal')
self.entryPORT.config(state='normal')
self.connected=False
for child in self.frame2.winfo_children():
child.configure(state='disabled')
for child in self.stateOn:
child.config(bg="black")
for child in self.stateOff:
child.config(bg="black")
#read_coils
def main():
root = Tk()
root.geometry("800x170")
app = WAGOTEST()
root.mainloop()
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -12,6 +12,37 @@ from bliss.common import axis
from bliss.common.motor_group import TrajectoryGroup
import time
import math
import datetime
from PIL import Image
import requests
import termplotlib as tpl
import urllib.request
import scipy.optimize as opt
import matplotlib.pyplot as plt
def gauss(x, p): # p[0]==mean, p[1]==stdev
return 1.0/(p[1]*np.sqrt(2*np.pi))*np.exp(-(x-p[0])**2/(2*p[1]**2))
def analyseImg():
name="/tmp/pictures/test.jpg"
X = np.arange(0,800,1)
img = Image.open(name, "r")
imgcrp = img.crop((620, 0, 660, 800))
pix = np.asarray(img)
projection = np.sum(np.sum(imgcrp, axis = 1), axis=1)
projection = np.true_divide(projection, np.sum(projection))
# Fit a gaussian
p0 = [0,1] # Inital guess is a normal distribution
errfunc = lambda p, x, y: gauss(x, p) - y # Distance to the target function
p1, success = opt.leastsq(errfunc, p0[:], args=(X, projection))
if success :
return p1[0]
else:
return -1
def change_energy(value):
mv(energy,value)
......@@ -100,6 +131,8 @@ def beam_set(nrj_target, mono_only=False):
for motor in motor_need_moving_list:
motor.stop()
print(print_str, end="\r")
except KeyboardInterrupt:
break
except:
print(" !!! Emergency stop - error or keyboard interupt !!!")
mono_stop()
......@@ -293,4 +326,265 @@ def absCarbonIn():
print("Beam Monitor go to -33,8mm")
umv(beammonitor,-33.8)
def scanM1Bend():
with open("/tmp_14_days/fip/scanM1Bend.csv","a+") as f:
f.write("time,actam1,actav1,diode1,diode2,Mach_current\n")
for bend in range(300000,50000,-20000):
maintenant = datetime.datetime.now().strftime("%H:%M:%S")
# if idx%200==0:
# fastscan.gamma2_short.scan()
actam1.move(bend, wait=False)
actav1.move(bend, wait=False)
while actam1.is_moving or actav1.is_moving:
sleep(1)
bend1pos = actam1.position
bend2pos = actav1.position
# print("startT")
# fastscan._proxy.command_inout("StartT", 10000)
# sleep(12)
# print("ReadIbuffer")
# dioderead = fastscan._proxy.command_inout("ReadIBuffer")
# f.write("%s,%d,%d,%f,%f\n"%(maintenant,bend1pos,bend2pos,dioderead[2]/dioderead[0],dioderead[3]/dioderead[0]))
for i in range(0,30):
current = SR_Current.value
f.write("%s,%d,%d,%f,%f,%f\n"%(maintenant,bend1pos,bend2pos,OH_I1.value,OH_I2.value,current))
sleep(0.5)
f.flush()
def scanSl0(relMin, relMax, step, pics=False):
sl0vtposArr = []
PixelSumArr = []
sl0vtposInit = sl0vt.position
path = "/tmp/pictures/" #"/users/opd07/Pictures/"
PixelSumArr = []
for relpos in np.arange(relMin, relMax, step):
sl0vt.move(sl0vtposInit+relpos, wait=True)
sleep(0.2)
im = Image.open(requests.get("http://bm07recorder.esrf.fr:7011/jpg/image.jpg", stream=True).raw)
a=np.asarray(im)
pixelsvalue = a.sum() / 1E5
PixelSumArr.append(pixelsvalue)
sl0vtposArr.append(sl0vt.position)
if pics == True:
saveMonitorsImage("1000","sl0vt" + str(sl0vt.position))
sl0vt.move(sl0vtposInit, wait=True)
fig = tpl.figure()
fig.plot(sl0vtposArr,PixelSumArr, width=300, height=40)
fig.show()
def scanMovehM1(relMin, relMax, step):
M1posInit = m1up.position
MovehposInit = moveh.position
M1posArr = []
PixelSumArr = []
for relpos in np.append(np.arange(relMin,relMax,step),[relMax]):
m1up.move(M1posInit+relpos, wait=False)
moveh.move(MovehposInit+relpos, wait=False)
while moveh.is_moving or m1up.is_moving:
sleep(1)
sleep(1)
im = Image.open(requests.get("http://bm07recorder.esrf.fr:7010/jpg/image.jpg", stream=True).raw)
a=np.asarray(im)
pixelsvalue = a.sum() / 1E5
print(m1up.position, moveh.position, pixelsvalue)
M1posArr.append(m1up.position)
PixelSumArr.append(pixelsvalue)
m1up.move(M1posInit, wait=False)
moveh.move(MovehposInit, wait=False)
while moveh.is_moving or m1up.is_moving:
sleep(1)
fig = tpl.figure()
fig.plot(M1posArr,PixelSumArr, width=300, height=40)
fig.show()
def mesure_bend_M2(Min, Max, nbPoint):
saveMonitorsImage("0001","ref")
for pos in np.linspace(Min, Max, nbPoint):
m2up.move(pos, wait=True)
saveMonitorsImage("0001","m2up_%f"%pos)
def mesure_bend_M2_sans_M2(Min, Max, nbPoint):
saveMonitorsImage("0001","ref")
utzPos=[]
position=[]
for pos in np.linspace(Min, Max, nbPoint):
mv(utz,pos)
#fastscan.gamma2_short.scan()
value=0
nbpoint=0
for i in range(0,10):
saveMonitorsImage("0001","utz_%f"%pos)
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7006/jpg/image.jpg", "/tmp/pictures/test.jpg")
resu = analyseImg()
if resu != -1:
print(resu)
value+=resu
nbpoint+=1
time.sleep(0.3)
if nbpoint!=0:
resu = value/float(nbpoint)
print(pos, resu)
utzPos.append(pos)
position.append(resu)
print(utzPos)
print(position)
#def mesure_bend_M2_sans_M2_alpha(Min, Max, nbPoint):
#saveMonitorsImage("0001","ref")
#utzPos=[]
#position=[]
#for pos in np.linspace(Min, Max, nbPoint):
#mv(alpha2,pos)
##fastscan.gamma2_short.scan()
#value=0
#nbpoint=0
#for i in range(0,10):
#saveMonitorsImage("0001","utz_%f"%pos)
#urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7006/jpg/image.jpg", "/tmp/pictures/test.jpg")
#resu = analyseImg()
#if resu != -1:
#print(resu)
#value+=resu
#nbpoint+=1
#time.sleep(0.3)
#if nbpoint!=0:
#resu = value/float(nbpoint)
#print(pos, resu)
#utzPos.append(pos)
#position.append(resu)
#print(utzPos)
#print(position)
def mesure_beam_oscill_fluo3(nbimages,Name):
positions = []
for i in range(0,nbimages):
#saveMonitorsImage("0001","utz_%f"%pos)
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7006/jpg/image.jpg", "/tmp/pictures/test.jpg")
picpos = analyseImg()
positions.append(picpos)
print (picpos)
#time.sleep(0.3)
var = np.var(positions)
print("Vertical Pic Position Variance = %f"%var)
def saveMonitorsImage(cam,string): #cam in binary string 0000 to 1111, each character for a camera: wires, fluo1, fluo2, fluo3
path = "/tmp/pictures/"
now=datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if cam[0:1] == "1": # download wire camera picture
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7011/jpg/image.jpg", path + now + "_wires_" + string + ".jpg")
if cam[1:2] == "1": # download fluo1 camera picture
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7010/jpg/image.jpg", path + now + "_fluo1_" + string + ".jpg")
if cam[2:3] == "1": # download fluo2 camera picture
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7009/jpg/image.jpg", path + now + "_fluo2_" + string + ".jpg")
if cam[3:4] == "1": # download fluo3 camera picture
urllib.request.urlretrieve("http://bm07recorder.esrf.fr:7006/jpg/image.jpg", path + now + "_fluo3_" + string + ".jpg")
sleep(1)
def scanMovehM1Sl0(relMin, relMax, step, pics=False, sourceAngle_uRad=0):
M1posInit = m1up.position
MovehposInit = moveh.position
sl0vtposInit = sl0vt.position
UtxInit = utx.position
UtzInit = utz.position
path = "/tmp/pictures/" #"/users/opd07/Pictures/"
M1posArr = []
PixelSumArr = []
for relpos in np.append(np.arange(relMin,relMax,step),[relMax]):
deltaUtxUtz = delta_utx_utz_fixed_height(relpos, sourceAngle_uRad)
utx.move(UtxInit+deltaUtxUtz[0], wait=False)
utz.move(UtzInit+deltaUtxUtz[1], wait=False)
m1up.move(M1posInit+relpos, wait=False)
moveh.move(MovehposInit+relpos, wait=False)
sl0vt.move(sl0vtposInit+relpos, wait=False)
while moveh.is_moving or m1up.is_moving or sl0vt.is_moving or utx.is_moving or utz.is_moving:
sleep(0.2)
sleep(0.2)
fastscan.gamma2_short.scan()
PixelSumArr.append(max(fastscan.scanresult))
# if analyse fluo picture (and/or record it)
#im = Image.open(requests.get("http://bm07recorder.esrf.fr:7010/jpg/image.jpg", stream=True).raw)
if pics == True:
saveMonitorsImage("1100","Z" + str(m1up.position))
#a=np.asarray(im)
#pixelsvalue = a.sum() / 1E5
#print(m1up.position, moveh.position, pixelsvalue)
M1posArr.append(m1up.position)
#PixelSumArr.append(pixelsvalue)
#totalDiode = 0
#for j in range(0,5):
# totalDiode += OH_I1.value+OH_I2.value
# sleep(0.05)
#PixelSumArr.append(totalDiode/5.)
m1up.move(M1posInit, wait=False)
moveh.move(MovehposInit, wait=False)
sl0vt.move(sl0vtposInit, wait=False)
utx.move(UtxInit, wait=False)
utz.move(UtzInit, wait=False)
while moveh.is_moving or m1up.is_moving or sl0vt.is_moving or utx.is_moving or utz.is_moving:
sleep(0.5)
fig = tpl.figure()
fig.plot(M1posArr,PixelSumArr, width=300, height=40)
fig.show()
def scanMoveh(relMin, relMax, step):
MovehposInit = moveh.position
UtxInit = utx.position
UtzInit = utz.position
MovehposArr = []
PixelSumArr = []
for relpos in np.append(np.arange(relMin,relMax,step),[relMax]):
deltaUtxUtz = delta_utx_utz_fixed_height(relpos)
utx.move(UtxInit+deltaUtxUtz[0], wait=False)
utz.move(UtzInit+deltaUtxUtz[1], wait=False)
moveh.move(MovehposInit+relpos, wait=False)
while moveh.is_moving or utx.is_moving or utz.is_moving:
sleep(0.5)
sleep(0.5)
#im = Image.open(requests.get("http://bm07recorder.esrf.fr:7010/jpg/image.jpg", stream=True).raw)
#a=np.asarray(im)
#pixelsvalue = a.sum() / 1E5
#print(moveh.position, pixelsvalue)
MovehposArr.append(moveh.position)
#PixelSumArr.append(pixelsvalue)
totalDiode = 0
for j in range(0,10):
totalDiode += OH_I1.value+OH_I2.value
sleep(0.05)
PixelSumArr.append(totalDiode/10.)
moveh.move(MovehposInit, wait=False)
utx.move(UtxInit, wait=False)
utz.move(UtzInit, wait=False)
while moveh.is_moving or utx.is_moving or utz.is_moving:
sleep(0.5)
fig = tpl.figure()
fig.plot(MovehposArr,PixelSumArr, width=300, height=40)
fig.show()
# to keep the beam on the same position on dmono during moveh scanning (->C2 position is adjusted)
def delta_utx_utz_fixed_height(relMoveh, sourceAngle_uRad=0):
braggrad = deg2rad(bragg.position)+sourceAngle_uRad*1E-6
betarad = deg2rad(beta.position)
L2 = relMoveh / ( ( tan (braggrad - betarad ) + tan (braggrad + betarad ) ) * cos ( braggrad + betarad ) )
delta_utz = L2 *sin(braggrad)
delta_utx = L2 *cos(braggrad)
return (delta_utx, delta_utz)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment