Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
F-CRG
BM07
bm07
Commits
bf70105e
Commit
bf70105e
authored
Sep 06, 2021
by
Yoann Sallaz Damaz
Browse files
bla
parent
31e3f37f
Pipeline
#54155
failed with stages
Changes
4
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
bm07/datacollection.py
View file @
bf70105e
...
...
@@ -72,14 +72,15 @@ def acq_externalSingle(expo_s, step_deg, start_angle_deg, end_angle_deg):
pilatus_6m
.
prepareAcq
()
pilatus_6m
.
startAcq
()
mvr
(
omega
,
end_angle_deg
-
start_angle_deg
+
1
)
time
.
sleep
(
0.5
)
mv
(
omega
,
end_angle_deg
+
1
)
bac24
.
zero
()
omega
.
velocity
=
20
umv
(
omega
,
0
)
def
opensh
():
fast
_
shut
ter_pseudo
.
open_manual
()
fastshut
.
open_manual
()
def
closesh
():
fast_shutter_pseudo
.
close_manual
()
\ No newline at end of file
fastshut
.
close_manual
()
\ No newline at end of file
bm07/find_max_att.py
0 → 100644
View file @
bf70105e
"""
Procedure to find the max attenuation on a sample, while the signal from
the flourescence detector is within specified min and max integrated counts,
for a ROI between a configurable min and max energy, for a given counting time.
yml configuration example:
name: find_max_attenuation
plugin: default
diffractometer: $diffractometer
mca: $mca
transmission: $transmission
safshut: $safshut
integrated_counts: [2000, 50000]
transmission_table: [0.1, 0.2, 0.3, 0.9, 1.3, 1.9, 2.6, 4.3, 6, 8, 12, 24, 36, 50, 71]
"""
import
logging
import
gevent
from
bliss.config
import
static
from
bliss.common.cleanup
import
cleanup
,
error_cleanup
from
bliss.setup_globals
import
*
class
FindMaxAttenuation
:
""" Find maximum attenuation """
def
__init__
(
self
):
#cfg = static.get_config()
#config = cfg.get("find_max_attenuation")
#self.safshut = config.get("safshut")
#self.fastshut = config.get("fastshut")
#self.mca = config.get("mca")
#self.mca_in_out = config.get("mca_in_out")
#self.bstop = config.get("bstop")
#self.transmission = config.get("transmission")
#self.transmission_table = config.get("transmission_table", [])
self
.
transmission_table
=
[
0.1
,
0.2
,
0.3
,
0.9
,
1.3
,
1.9
,
2.6
,
4.3
,
6
,
8
,
12
,
24
,
36
,
50
,
71
]
self
.
t_table
=
[]
#self.integrated_counts = config.get("integrated_counts", [])
self
.
integrated_counts
=
[
2000
,
50000
]
self
.
roi
=
[]
self
.
datafile
=
None
self
.
initial_transmission
=
0
self
.
ctime
=
3
# default count time [s]
self
.
misc_roi_params
=
[
"Se"
,
34
]
# default roi element
def
init
(
self
,
**
kwargs
):
"""Initialise the procedure
Kwargs:
ctime(float): count time [s]
integrated_counts (list): minimum and maximum mca counts value
roi (list): mca ROI minimum and maximum energy [keV]
datafile (string): file to keep the data - full path
transmission_table (list): list of transmission values to check
misc_roi_params (list): some roi parameters
"""
if
not
safshut
:
logging
.
getLogger
().
debug
(
"Safety shutter will not be handled"
)
# count time in s
self
.
ctime
=
kwargs
.
get
(
"ctime"
,
3.0
)
# filename if saving the raw data wanted
self
.
datafile
=
kwargs
.
get
(
"datafile"
)
# get the mca roi and set the values in keV
self
.
roi
=
[]
roi
=
kwargs
.
get
(
"roi"
,
[
2.0
,
15.0
])
for
i
in
roi
:
if
i
>
1000
:
i
/=
1000.0
self
.
roi
.
append
(
i
)
self
.
roi
.
sort
()
self
.
misc_roi_params
=
kwargs
.
get
(
"misc_roi_params"
,
[
"Se"
,
34
])
integrated_counts
=
kwargs
.
get
(
"integrated_counts"
)
if
integrated_counts
:
self
.
integrated_counts
=
integrated_counts
if
not
self
.
integrated_counts
:
msg
=
"Minimum and/or maximum integrates counts value not set"
logging
.
getLogger
(
"user_level_log"
).
exception
(
msg
)
raise
RuntimeError
(
msg
)
self
.
integrated_counts
.
sort
()
self
.
t_table
=
kwargs
.
get
(
"transmission_table"
,
self
.
transmission_table
)
def
_prepare
(
self
,
ctime
=
None
):
if
ctime
:
self
.
ctime
=
ctime
# set the mca
mca
.
set_roi
(
self
.
roi
[
0
],
self
.
roi
[
1
],
channel
=
1
,
element
=
self
.
misc_roi_params
[
0
],
atomic_nb
=
self
.
misc_roi_params
[
1
],
)
mca
.
set_presets
(
erange
=
1
,
ctime
=
self
.
ctime
,
fname
=
self
.
datafile
)
# open safety shutter (just in case)
if
safshut
:
safshut
.
open
()
# put the fluorescent detector close to the sample
mca_in_out
.
insert
()
def
_cleanup
(
self
):
fastshut
.
open_manual
()
def
_error_cleanup
(
self
):
self
.
_cleanup
()
mca_in_out
.
remove
()
if
safshut
:
fastshut
.
close_manual
()
def
_procedure
(
self
,
ctime
=
None
):
""" The procedure itself """
if
ctime
:
self
.
ctime
=
ctime
def
restore_transmission
(
old_transmission
=
transmission
.
get
()):
transmission
.
set
(
old_transmission
)
with
cleanup
(
self
.
_cleanup
):
# set the maximum attenuation
transmission
.
set
(
0
)
logging
.
getLogger
(
"user_level_log"
).
info
(
"Choosing maximum attenuation, please wait"
)
# open the fast shutter
fastshut
.
open_manual
()
with
error_cleanup
(
self
.
_error_cleanup
,
restore_transmission
):
# start the acquisition
mca
.
start_acq
()
gevent
.
sleep
(
self
.
ctime
)
# save data if needed
save_data
=
False
if
self
.
datafile
:
save_data
=
True
logging
.
getLogger
(
"user_level_log"
).
info
(
"Raw data in "
+
self
.
datafile
)
# read the integrated counts
i_c
=
sum
(
mca
.
read_roi_data
(
save_data
))
//
self
.
ctime
logging
.
getLogger
(
"user_level_log"
).
debug
(
"Integrated counts/s: %g"
%
i_c
)
if
i_c
>
self
.
integrated_counts
[
1
]:
fastshut
.
close_manual
()
msg
=
"The detector is saturated, giving up."
logging
.
getLogger
(
"user_level_log"
).
exception
(
msg
)
raise
RuntimeError
(
msg
)
for
t_m
in
self
.
t_table
:
mca
.
clear_spectrum
()
logging
.
getLogger
(
"user_level_log"
).
debug
(
"Setting transmission to %f"
%
t_m
)
transmission
.
set
(
t_m
)
mca
.
start_acq
()
gevent
.
sleep
(
self
.
ctime
)
i_c
=
sum
(
mca
.
read_roi_data
())
//
self
.
ctime
print
(
"Integrated counts/s: %d"
%
i_c
)
logging
.
getLogger
(
"user_level_log"
).
debug
(
"Integrated counts/s: %g"
%
i_c
)
if
i_c
>
self
.
integrated_counts
[
0
]:
fastshut
.
close_manual
()
t_m
=
transmission
.
get
()
logging
.
getLogger
(
"user_level_log"
).
info
(
"Transmission chosen: %.3f (%d counts/s)"
%
(
t_m
,
i_c
)
)
return
t_m
if
i_c
<
self
.
integrated_counts
[
0
]:
msg
=
"Could not find satisfactory attenuation (is the mca properly set up?), giving up."
logging
.
getLogger
(
"user_level_log"
).
exception
(
msg
)
raise
RuntimeError
(
msg
)
return
False
def
find_max_attenuation
(
self
,
**
kwargs
):
""" The whole sequence of finding maximum attenuation """
self
.
init
(
**
kwargs
)
self
.
_prepare
(
**
kwargs
)
return
self
.
_procedure
(
**
kwargs
)
find_max_attenuation
=
FindMaxAttenuation
().
find_max_attenuation
\ No newline at end of file
bm07/grob_FIP.py
View file @
bf70105e
...
...
@@ -127,6 +127,7 @@ class grob_FIP(object):
port
=
config
.
get
(
"port"
)
self
.
client
=
ModbusTcp
(
ip
,
port
=
int
(
port
))
self
.
SIMU
=
config
.
get
(
"SIMULATION"
)
self
.
NumSampleMounted_Simulated
=
0
self
.
command
=
{
"stop"
:
1
,
"home"
:
2
,
"mount_sample"
:
3
,
"dismount_sample"
:
4
,
"mount_plate"
:
5
,
"dismount_plate"
:
6
,
"mount_gonio"
:
7
,
"dismount_gonio"
:
8
,
...
...
@@ -304,12 +305,15 @@ class grob_FIP(object):
self
.
send_command
(
"force_gripper"
,[
gripper_number
])
def
force_mounted_sample
(
self
,
sample_number
):
self
.
send_command
(
"force_mounted_sample"
,
[
sample_number
])
if
self
.
SIMU
:
self
.
NumSampleMounted_Simulated
=
sample_number
else
:
self
.
send_command
(
"force_mounted_sample"
,
[
sample_number
])
def
move_accessories_in_sample_mount_geo
(
self
,
checkonly
=
False
):
self
.
init_real_device
()
if
not
(
self
.
SIMU
):
if
not
(
self
.
SIMU
)
and
not
(
checkonly
)
:
self
.
dewar
.
open
()
self
.
detcover
.
close
()
self
.
cryolong
.
down
()
...
...
@@ -375,9 +379,8 @@ class grob_FIP(object):
def
move_accessories_in_sample_collect_geo
(
self
,
checkonly
=
False
):
self
.
init_real_device
()
if
not
(
self
.
SIMU
):
if
not
(
self
.
SIMU
)
and
not
(
checkonly
)
:
self
.
dewar
.
close
()
self
.
detcover
.
open
()
self
.
cryolong
.
down
()
self
.
mca
.
remove
()
self
.
bstop1
.
up
()
...
...
@@ -416,11 +419,6 @@ class grob_FIP(object):
else
:
txt
+=
"Cryo Long down
\t\t
"
+
CRED
+
"[NOK]
\n
"
+
CEND
ok
=
False
if
self
.
detcover
.
is_open
():
txt
+=
"Detector Cover opened
\t
"
+
CGRE
+
"[OK]
\n
"
+
CEND
else
:
txt
+=
"Detector Cover opened
\t
"
+
CRED
+
"[NOK]
\n
"
+
CEND
ok
=
False
if
self
.
keyence
.
is_off
():
txt
+=
"Keyence Light off
\t
"
+
CGRE
+
"[OK]
\n
"
+
CEND
else
:
...
...
@@ -440,6 +438,7 @@ class grob_FIP(object):
def
move_motors_in_sample_mount_geo
(
self
,
checkonly
=
False
):
self
.
init_real_device
()
self
.
omega
.
velocity
=
20
motor_list_first
=
[
self
.
omega
,
self
.
y
]
target_list_first
=
[
self
.
omega_mountsample
,
self
.
y_mountsample
]
motor_list_next
=
[
self
.
kappa
,
...
...
@@ -465,6 +464,7 @@ class grob_FIP(object):
def
move_motors_in_sample_collect_geo
(
self
,
checkonly
=
False
):
self
.
init_real_device
()
self
.
omega
.
velocity
=
20
motor_list_first
=
[
self
.
kappa
,
self
.
x
,
self
.
z
,
...
...
@@ -487,7 +487,10 @@ class grob_FIP(object):
checkonly
)
def
check_gonio_in_sample_mount_geo
(
self
):
return
self
.
move_gonio_in_sample_mount_geo
(
checkonly
=
True
)
return
self
.
move_motors_in_sample_mount_geo
(
checkonly
=
True
)
and
self
.
move_accessories_in_sample_mount_geo
(
checkonly
=
True
)
def
check_gonio_in_sample_collect_geo
(
self
):
return
self
.
move_motors_in_sample_collect_geo
(
checkonly
=
True
)
and
self
.
move_accessories_in_sample_collect_geo
(
checkonly
=
True
)
def
stop
(
self
):
self
.
send_command
(
"stop"
)
...
...
@@ -514,13 +517,15 @@ class grob_FIP(object):
if
sample_number
==
0
:
if
self
.
SIMU
:
self
.
state_grob
=
2
self
.
send_command
(
"force_mounted_sample"
,
[
0
])
#self.send_command("force_mounted_sample", [0])
self
.
NumSampleMounted_Simulated
=
0
else
:
self
.
send_command
(
"dismount_sample"
,
[])
else
:
if
self
.
SIMU
:
self
.
state_grob
=
2
self
.
send_command
(
"force_mounted_sample"
,
[
sample_number
])
#self.send_command("force_mounted_sample", [sample_number])
self
.
NumSampleMounted_Simulated
=
sample_number
else
:
self
.
send_command
(
"mount_sample"
,
[
sample_number
])
...
...
@@ -630,36 +635,39 @@ class grob_FIP(object):
def
read_all_memory
(
self
):
data
=
self
.
client
.
read_holding_registers
(
16
,
"30H"
)
self
.
Status
=
data
[
0
]
self
.
CurrentMode
=
data
[
1
]
self
.
GrobIsInit
=
data
[
2
]
self
.
NumSampleMounted
=
data
[
3
]
self
.
CurrentOmega
=
word_to_float
(
data
[
4
])
self
.
OmegaShift
=
word_to_float
(
data
[
5
])
self
.
CurrentKappa
=
word_to_float
(
data
[
6
])
self
.
CurrentPhi
=
word_to_float
(
data
[
7
])
self
.
PlateNbCol
=
data
[
8
]
self
.
PlateNbLine
=
data
[
9
]
self
.
PlateYFirstWell
=
word_to_float
(
data
[
10
])
self
.
PlateZFirstWell
=
word_to_float
(
data
[
11
])
self
.
PlateDepthWell
=
word_to_float
(
data
[
12
])
self
.
PlateWellNbCol
=
data
[
13
]
self
.
PlateWellNbLine
=
data
[
14
]
self
.
PlateDYCell
=
word_to_float
(
data
[
15
])
self
.
PlateDZCell
=
word_to_float
(
data
[
16
])
self
.
PlateDYWell
=
word_to_float
(
data
[
17
])
self
.
PlateDZWell
=
word_to_float
(
data
[
18
])
self
.
PlateRefrIndex
=
word_to_float
(
data
[
19
])
self
.
PlateThickness
=
word_to_float
(
data
[
20
])
self
.
PlateIdx
=
data
[
21
]
self
.
Versapin
=
data
[
22
]
self
.
SmartMagnet
=
data
[
23
]
self
.
CryoShort
=
data
[
24
]
self
.
CurPosX
=
data
[
25
]
self
.
CurPosY
=
data
[
26
]
self
.
CurPosZ
=
data
[
27
]
self
.
ErrorMsg
=
data
[
28
]
self
.
currentPuckType
=
data
[
29
]
self
.
Status
=
data
[
0
]
self
.
CurrentMode
=
data
[
1
]
self
.
GrobIsInit
=
data
[
2
]
if
self
.
SIMU
:
self
.
NumSampleMounted
=
self
.
NumSampleMounted_Simulated
else
:
self
.
NumSampleMounted
=
data
[
3
]
self
.
CurrentOmega
=
word_to_float
(
data
[
4
])
self
.
OmegaShift
=
word_to_float
(
data
[
5
])
self
.
CurrentKappa
=
word_to_float
(
data
[
6
])
self
.
CurrentPhi
=
word_to_float
(
data
[
7
])
self
.
PlateNbCol
=
data
[
8
]
self
.
PlateNbLine
=
data
[
9
]
self
.
PlateYFirstWell
=
word_to_float
(
data
[
10
])
self
.
PlateZFirstWell
=
word_to_float
(
data
[
11
])
self
.
PlateDepthWell
=
word_to_float
(
data
[
12
])
self
.
PlateWellNbCol
=
data
[
13
]
self
.
PlateWellNbLine
=
data
[
14
]
self
.
PlateDYCell
=
word_to_float
(
data
[
15
])
self
.
PlateDZCell
=
word_to_float
(
data
[
16
])
self
.
PlateDYWell
=
word_to_float
(
data
[
17
])
self
.
PlateDZWell
=
word_to_float
(
data
[
18
])
self
.
PlateRefrIndex
=
word_to_float
(
data
[
19
])
self
.
PlateThickness
=
word_to_float
(
data
[
20
])
self
.
PlateIdx
=
data
[
21
]
self
.
Versapin
=
data
[
22
]
self
.
SmartMagnet
=
data
[
23
]
self
.
CryoShort
=
data
[
24
]
self
.
CurPosX
=
data
[
25
]
self
.
CurPosY
=
data
[
26
]
self
.
CurPosZ
=
data
[
27
]
self
.
ErrorMsg
=
data
[
28
]
self
.
currentPuckType
=
data
[
29
]
try
:
if
self
.
mounted_sample_chan
.
value
!=
self
.
NumSampleMounted
:
self
.
mounted_sample_chan
.
value
=
self
.
NumSampleMounted
...
...
@@ -712,9 +720,9 @@ class grob_FIP(object):
return
value
def
gonio_state
(
self
):
if
self
.
move
_gonio_in_sample_collect_geo
(
checkonly
=
True
):
if
self
.
check
_gonio_in_sample_collect_geo
():
return
self
.
GONIO_SAMPLE_COLLECT_STATE
elif
self
.
move
_gonio_in_sample_mount_geo
(
checkonly
=
True
):
elif
self
.
check
_gonio_in_sample_mount_geo
():
return
self
.
GONIO_SAMPLE_MOUNT_STATE
else
:
0
...
...
@@ -779,6 +787,3 @@ class grob_FIP(object):
state_grob
=
self
.
get_state
()
state_gonio
=
self
.
gonio_state
()
return
state_grob
==
SampleChangerState
.
Loading
or
state_gonio
!=
self
.
GONIO_SAMPLE_COLLECT_STATE
\ No newline at end of file
bm07/optical_setup.py
View file @
bf70105e
...
...
@@ -20,6 +20,7 @@ import urllib.request
import
scipy.optimize
as
opt
import
matplotlib.pyplot
as
plt
from
scipy
import
ndimage
from
scipy.interpolate
import
interp1d
def
gauss
(
x
,
p
):
# p[0]==mean, p[1]==stdev
...
...
@@ -42,13 +43,23 @@ def analyseImg():
def
change_energy
(
value
):
mv
(
energy
,
value
)
def
find_nrj_in_conf
(
nrj_target
):
def
find_energy_in_conf
(
nrj_target
):
cc
=
get_config
()
cc
.
reload
()
optical_conf
=
cc
.
get
(
"optical_setup"
)
list_selected_str
=
list
(
optical_conf
[
"selected"
].
keys
())
date_selected
=
list
(
optical_conf
[
"selected"
].
values
())
list_selected_float
=
[
float
(
i
)
for
i
in
list_selected_str
]
list_selected_float
.
sort
()
#pas extrapolation on ne peux pas aller au dela de la plage d'energie deja alignée
if
nrj_target
<
list_selected_float
[
0
]:
nrj_target
=
list_selected_float
[
0
]
elif
nrj_target
>
list_selected_float
[
-
1
]:
nrj_target
=
list_selected_float
[
-
1
]
#est-ce une energie pile poile calibrée
if
nrj_target
in
list_selected_float
:
idx
=
list_selected_float
.
index
(
nrj_target
)
confdata
=
dict
(
optical_conf
.
get
(
list_selected_str
[
idx
]).
get
(
date_selected
[
idx
]))
...
...
@@ -58,13 +69,41 @@ def find_nrj_in_conf(nrj_target):
if
"beta"
not
in
confdata
:
confdata
.
update
({
"beta"
:
round
((
geofipdata
.
get
(
"mono_angle"
)
*
180.
/
math
.
pi
),
4
)})
return
confdata
#sinon interpolation
else
:
print
(
"nrj not found"
)
return
None
search1
=
min
(
enumerate
(
list_selected_float
),
key
=
lambda
x
:
abs
(
11
-
x
[
1
]))
if
nrj_target
>
list_selected_float
[
search1
[
0
]]:
search2
=
(
search1
[
0
]
+
1
,
list_selected_float
[
search1
[
0
]
+
1
])
else
:
search2
=
(
search1
[
0
]
-
1
,
list_selected_float
[
search1
[
0
]
-
1
])
confdata1
=
dict
(
optical_conf
.
get
(
list_selected_str
[
search1
[
0
]]).
get
(
date_selected
[
search1
[
0
]]))
confdata2
=
dict
(
optical_conf
.
get
(
list_selected_str
[
search2
[
0
]]).
get
(
date_selected
[
search2
[
0
]]))
geofipdata1
=
geofip
(
search1
[
1
],
VERBOSE
=
0
)
geofipdata2
=
geofip
(
search2
[
1
],
VERBOSE
=
0
)
if
"alpha1"
not
in
confdata1
:
confdata1
.
update
({
"alpha1"
:
round
((
geofipdata1
.
get
(
"angle_m1"
)
*
180.
/
math
.
pi
),
4
)})
if
"alpha1"
not
in
confdata2
:
confdata2
.
update
({
"alpha1"
:
round
((
geofipdata2
.
get
(
"angle_m1"
)
*
180.
/
math
.
pi
),
4
)})
if
"beta"
not
in
confdata1
:
confdata1
.
update
({
"beta"
:
round
((
geofipdata1
.
get
(
"mono_angle"
)
*
180.
/
math
.
pi
),
4
)})
if
"beta"
not
in
confdata2
:
confdata2
.
update
({
"beta"
:
round
((
geofipdata2
.
get
(
"mono_angle"
)
*
180.
/
math
.
pi
),
4
)})
if
confdata1
.
keys
()
!=
confdata2
.
keys
():
print
(
"ERROR : Some motors doesn't not exist in one the other configuration use for interpolation"
)
return
condataInter
=
{}
nrj_arr
=
np
.
array
([
search1
[
1
],
search2
[
1
]])
for
mot
in
confdata1
.
keys
():
mot_arr
=
np
.
array
([
confdata1
[
mot
],
confdata2
[
mot
]])
f
=
interp1d
(
nrj_arr
,
mot_arr
)
condataInter
.
update
({
mot
:
float
(
f
(
nrj_target
))})
return
condataInter
def
beam_set
(
nrj_target
,
mono_only
=
False
):
final_pos_dict
=
find_
nrj
_in_conf
(
nrj_target
)
final_pos_dict
=
find_
energy
_in_conf
(
nrj_target
)
if
final_pos_dict
is
None
:
return
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment