Commit 6fa446d1 authored by bliss administrator's avatar bliss administrator
Browse files

Cleaning-up

parent 7cb2ce66
......@@ -6,7 +6,6 @@ Home-page: https://gitlab.esrf.fr/ID19/id19
Author: BCU Team
Author-email: bliss@esrf.fr
License: GNU General Public License v3
Description-Content-Type: UNKNOWN
Description: ============
ID19 project
============
......
......@@ -2,13 +2,14 @@ LICENSE
MANIFEST.in
README.rst
setup.cfg
setup.py
docs/Makefile
docs/conf.py
docs/index.rst
docs/readme.rst
docs/usage.rst
id19/__init__.py
id19/calibration_tests.py
id19/test_iter_motor_master.py
id19.egg-info/PKG-INFO
id19.egg-info/SOURCES.txt
id19.egg-info/dependency_links.txt
......
from bliss.scanning.chain import ChainPreset,ChainIterationPreset
import numpy as np
import bliss
from id19.TomoCcd import *
from bliss import setup_globals
class Preset(ChainPreset):
def __init__(self, expo_time, ref_n, projection):
self.expo_time = expo_time
self.ref_n = ref_n
self.projection = projection
detector_device = setup_globals.config.get('pcoedge64')
self.ccd = TomoCcd(detector_device)
class Iterator(ChainIterationPreset):
def __init__(self,iteration_nb, preset_obj, acq_chain):
self.iteration = iteration_nb
self.preset_obj = preset_obj
self.projection = preset_obj.projection[iteration_nb]*(iteration_nb+1)-1
self.motor_master = acq_chain.nodes_list[0]
self.lima_master = acq_chain.nodes_list[3]
def prepare(self):
print(f"Preparing iteration {self.iteration}")
musst_card = setup_globals.config.get("musst")
musst_card.ABORT
enc_channel = musst_card.get_channel_by_name(self.motor_master.device.name)
self.motor_master.device.wait_move()
motor_position = self.motor_master.device.position
enc_channel.value = int(round((motor_position*self.motor_master.device.steps_per_unit+0.5)//1))
if self.iteration == self.motor_master._npoints-1 and self.preset_obj.projection[0] != self.preset_obj.projection[self.iteration]:
print("Modification of lima acquisition parameters for last ref group")
self.projection = self.preset_obj.projection[self.iteration-1]*self.iteration+self.preset_obj.projection[self.iteration]-1
self.lima_master.parameters['acq_nb_frames'] = self.preset_obj.projection[self.iteration]
self.lima_master.device.acq_nb_frames = self.preset_obj.projection[self.iteration]
def start(self):
print(f"Starting iter {self.iteration}")
def stop(self):
print(f"Ending iter {self.iteration}")
print("Starting acquisition of ref images")
ref = setup_globals.config.get("reference")
ref.move_out()
self.preset_obj.ccd.ref_images(self.preset_obj.expo_time,self.preset_obj.ref_n,self.projection)
ref.move_in()
def get_iterator(self,acq_chain):
iteration_nb = 0
while True:
yield Preset.Iterator(iteration_nb,self,acq_chain)
iteration_nb += 1
from bliss import setup_globals
from bliss.config.static import get_config
from bliss.common.standard import *
from bliss.config.settings import *
from id19.setup import *
from id19.TomoCcd import *
from bliss import setup_globals
class Reference(object):
def __init__(self,name,config):
self.name = name
self.mot1 = config['mot1']
self.pos_in_mot1 = None
self.pos_out_mot1 = None
self.mot2 = None
self.Y_STEP = None
if 'mot2' in config:
self.mot2 = config['mot2']
self.pos_in_mot2 = None
self.pos_out_mot2 = None
self.Z_STEP = None
def move_in(self):
print("\n move %s to %f" % (self.mot1.name,self.pos_in_mot1))
umv(self.mot1,self.pos_in_mot1)
if self.mot2:
print("\n move %s to %f" % (self.mot2.name,self.pos_in_mot2))
umv(self.mot2,self.pos_in_mot2)
def move_out(self):
self.pos_out_mot1 = self.pos_in_mot1 + self.Y_STEP
print("\n move %s to %f" % (self.mot1.name,self.pos_out_mot1))
umv(self.mot1,self.pos_out_mot1)
if self.mot2:
self.pos_out_mot2 = self.pos_in_mot2 + self.Z_STEP
print("\n move %s to %f" % (self.mot2.name,self.pos_out_mot2))
umv(self.mot2,self.pos_out_mot2)
def set_in(self,pos_mot1=None,pos_mot2=None):
self.pos_in_mot1 = self.mot1.position
if pos_mot1 is not None:
self.pos_in_mot1 = pos_mot1
print("\nIn-beam sample position for ref %s (motor1): %s\n" % (self.mot1.name,self.pos_in_mot1))
if self.mot2:
self.pos_in_mot2 = self.mot2.position
if pos_mot2 is not None:
self.pos_in_mot2 = pos_mot2
print("\nIn-beam sample position for ref %s (motor2): %s\n" % (self.mot2.name,self.pos_in_mot2))
def set_out(self,Y_STEP=None,Z_STEP=None):
print("")
if self.mot2:
if Z_STEP is None:
Z_STEP = float(input("Z displacement value (ref motor2) needed to calculate out-of-the-beam \nsample position. Please enter it: "))
self.Z_STEP = Z_STEP
print("\nZ displacement value for ref %s (motor2): %s\n" % (self.mot2.name,self.Z_STEP))
if Y_STEP is not None:
self.Y_STEP = Y_STEP
else:
Y_STEP = self.default_y_step()
confirm = input("Recommended Y displacement value: %s. Do you want to set this value? (sign is considered) " % (str(Y_STEP)))
if confirm.lower() == 'no':
Y_STEP = float(input("\nPlease enter Y displacement value: "))
self.Y_STEP = Y_STEP
print("\nY displacement value for ref %s (motor1): %s\n" % (self.mot1.name,self.Y_STEP))
def default_y_step(self):
detector = ScanSettings().get('detector')
detector_device = setup_globals.config.get(detector)
ref_disp = TomoCcd(detector_device).ccd_field_of_view()
# done in macro spec : add 10% of FOV + round to nearest 0.1mm
return -int(11*ref_disp+0.5)/10
import sys
import gevent
import numpy as np
import time
import PyTango
import bliss
from id19.setup import *
#from bliss.common import mapping
#from bliss.common.logtools import LogMixin
#class TomoCcd(LogMixin):
class TomoCcd():
def __init__(self, det, *args, **kwargs):
#self._logger.info("__init__ entering")
self.detector = det
self.det_proxy = det.proxy
self.saved_configurations = {}
#
# Save a set of detector parameters
#
def save_ccd_config(self, config_name, parameters=None):
print ("Saving configuration for %s" % self.detector.name)
default_parameters = [
"saving_format",
"saving_overwrite_policy",
"saving_frame_per_file",
"saving_index_format",
"saving_directory",
"saving_prefix",
"saving_next_number",
"saving_suffix",
"saving_mode",
"acq_trigger_mode",
"acq_expo_time",
"acq_nb_frames",
"latency_time",
"acc_max_expo_time"]
# Clean-up formerly saved parameters
if config_name in self.saved_configurations:
del self.saved_configurations[config_name]
# Apply the default parameters if no list is given
if parameters == None:
parameters = default_parameters
# Read all parameters from the Lima server
attributes = self.det_proxy.read_attributes(parameters)
# store all parameters in a dictionary
saved_state = {}
param_len = len(parameters)
for i in range (0, param_len):
saved_state[parameters[i]] = attributes[i].value
# store in the configurations dictionary
self.saved_configurations[config_name] = saved_state
print (self.saved_configurations[config_name])
#
# Restore the saved detector parameters
#
def restore_ccd_config(self, config_name):
print ("Restore configuration for %s" % self.detector.name)
saved_state = self.saved_configurations[config_name]
#Write all saved parameters to the Lima server
self.det_proxy.write_attributes(list(saved_state.items()))
#
# Test for shutter capability
#
def has_shutter(self):
sh_modes = self.det_proxy.getAttrStringValueList("shutter_mode")
if "AUTO_FRAME" in sh_modes:
return True
else:
return False
#try:
#sh_modes = self.det_proxy.shutter_mode
#except PyTango.DevFailed as e:
#import pdb; pdb.set_trace()
#ex_info = sys.exc_info()[:2]
#error_stack = ex_info[1]
#desc = error_stack.args[0].desc
#if "No shutter capability" in desc:
#return False
#
# Set the camera shutter to manual or automatic operation
#
def manual_shutter_mode(self, manual):
if self.has_shutter():
if manual == True:
self.det_proxy.shutter_mode = "MANUAL"
else:
self.det_proxy.shutter_mode = "AUTO_FRAME"
def set_shutter_time (self, shuttime):
if self.has_shutter():
if self.det_proxy.camera_type != "Frelon":
self.det_proxy.shutter_open_time = shuttime
self.det_proxy.shutter_close_time = shuttime
def get_shutter_time (self):
if self.has_shutter():
return det_proxy.shutter_close_time
else:
return 0.0
#
# Calculate the readout time of a camera
#
def calibrate_ccd_readout_time(self, expo_time):
print ("Calibrate readout time for %s" % self.detector.name)
config_name = "calibration"
readout_time = None
# save the detector state
self.save_ccd_config(config_name)
# disable shutter and saving
self.manual_shutter_mode(True)
self.det_proxy.saving_mode = "MANUAL"
ccd_type = self.det_proxy.camera_type
# PCO detectors
if ccd_type == "Pco":
nimg = 2
self.take_image(expo_time, nimg)
runtime = self.detector.camera.coc_run_time
readout_time = runtime - expo_time
else:
raise ValueError ("No readout time calibration method defined for the detector %s" % self.detector.name)
# restore the detector state
self.restore_ccd_config(config_name)
# enable shutter back
self.manual_shutter_mode(False)
return readout_time
#
# Take one or several images images
#
def take_image (self, expo_time, nimg=1, shutter=None):
print ("Take image with %s" % self.detector.name)
# check the ACCUMULATION mode
if self.det_proxy.acq_mode == "ACCUMULATION":
if self.det_proxy.acc_nb_frames > 1:
expo_time = self.det_proxy.acc_expo_time
# prepare
self.det_proxy.acq_trigger_mode = "INTERNAL_TRIGGER"
self.det_proxy.acq_expo_time = expo_time
self.det_proxy.acq_nb_frames = nimg
self.det_proxy.prepareAcq()
# start image acquisition
runtime= time.time()
self.det_proxy.startAcq()
while self.det_proxy.acq_status == "Running":
if nimg > 0:
print ("Image %d" % (self.det_proxy.last_image_ready + 1))
time.sleep (0.02)
runtime = time.time() - runtime
print ("%d images aquired" % (self.det_proxy.last_image_ready + 1))
return runtime
#
# take a set of accumulated images
#
def take_acc_image (self, expo_time, nimg, shutter=None):
print ("Take accumulation image with %s" % self.detector.name)
acq_mode = self.det_proxy.acq_mode
if acq_mode != "ACCUMULATION":
self.set_acq_mode ("ACCUMULATION")
self.det_proxy.acc_max_expo_time = expo_time
# prepare
self.det_proxy.acq_trigger_mode = "INTERNAL_TRIGGER"
self.det_proxy.acq_expo_time = expo_time * nimg
self.det_proxy.acq_nb_frames = 1
self.det_proxy.prepareAcq()
# start image acquisition
runtime= time.time()
self.det_proxy.startAcq()
while self.det_proxy.acq_status == "Running":
time.sleep (0.02)
if acq_mode != "ACCUMULATION":
self.set_acq_mode (acq_mode)
runtime = time.time() - runtime
print ("%d accumulated images aquired" % (nimg))
return runtime
#
# switch the camera acquisition mode
#
def set_acq_mode (self, mode):
config_name = "image"
image_parameters = [
"image_bin",
"image_flip",
"image_roi"]
# save the detector image state
self.save_ccd_config(config_name, image_parameters)
self.det_proxy.acq_mode = mode
# restore the detector image state
self.restore_ccd_config(config_name)
def set_saving_file (self, prefix=None, inum=None, suffix=None):
ccd_inum = inum
if prefix != None:
if ccd_inum == None:
ccd_inum = self.det_proxy.saving_next_number
self.det_proxy.saving_prefix = prefix
if ccd_inum != None:
self.det_proxy.saving_next_number = ccd_inum
if suffix != None:
file_ext = self.det_proxy.saving_suffix
file_ext = file_ext[file_ext.rfind('.'):]
self.det_proxy.saving_suffix = ("%s%s" % (suffix, file_ext))
def get_saving_file (self):
path = self.det_proxy.saving_directory
prefix = self.det_proxy.saving_prefix
inum = self.det_proxy.saving_next_number
suffix = self.det_proxy.saving_suffix
return ("%s/%s%04d%s" % (path, prefix, inum, suffix))
def ref_images(self, expo_time, ref_n, projection, turn=0):
orig_prefix = self.det_proxy.saving_prefix
orig_inum = self.det_proxy.saving_next_number
suffix = ""
if turn > 0:
suffix = ("_%d" % turn)
suffix = ("_%04d%s" % (projection , suffix))
print (suffix)
self.set_saving_file("ref", 0, suffix)
self.take_image(expo_time, ref_n)
self.set_saving_file(orig_prefix, orig_inum, "")
def dark_images(self, expo_time, dark_n, accumulation=True):
self.set_saving_file("darkend", 0)
if accumulation == False:
self.take_image(expo_time, dark_n)
else:
self.take_acc_image(expo_time, dark_n)
def ccd_field_of_view(self):
ccd_xsize = self.det_proxy.image_sizes[2]
pixel_size = self.get_image_pixel_size()
return pixel_size / 1000 * ccd_xsize # in mm
def get_image_pixel_size(self):
optics_objective = ScanSettings().get('optics_objective')
optics_eyepiece = ScanSettings().get('optics_eyepiece')
# no manual mode implemented now i.e no possibility for user to define unbinned pixel size
magnif = optics_objective * optics_eyepiece if optics_eyepiece > 1 else optics_objective
ccd_ps = self.det_proxy.camera_pixelsize[0]
unbinned_ps = ccd_ps / magnif
ccd_ybin = self.det_proxy.image_bin[1]
pixel_size = unbinned_ps * ccd_ybin
return pixel_size
from bliss.common.standard import *
from bliss.common.axis import *
from bliss.common.cleanup import *
from bliss.setup_globals import *
import logging
import sys
import time
import random
import datetime
import gevent
### ---------- Log solution 2: Allows to activate debug for all functions in module
module_logger = logging.getLogger('cal')
handler = logging.StreamHandler()
handler.setLevel("DEBUG")
formatter = logging.Formatter("%(levelname)s -- %(funcName)s -- %(message)s")
handler.setFormatter(formatter)
module_logger.addHandler(handler)
module_logger.propagate = False
debug_activated =[]
###
def init_sx():
try:
logger = logging.getLogger('cal.init_sx')
sx.limits(low_limit=-100,high_limit=100)
######### LIMIT -
logger.info("Moving to hard limit -")
start = time.time()
sx.hw_limit(-1,wait=False)
gevent.sleep(1)
logger.info("Searching limit- ...")
_status = sx.state()
while _status == "MOVING":
_status = sx.state()
if logger.getEffectiveLevel() > 0 and logger.getEffectiveLevel() < 30:
sys.stdout.write('.')
sys.stdout.flush()
gevent.sleep(1)
if logger.getEffectiveLevel() > 0 and logger.getEffectiveLevel() < 30:
sys.stdout.write('.\n')
sys.stdout.flush()
logger.info("lim- found !\n")
logger.info("Time to found lim-: "+str(time.time()-start) + "\n")
# hardware synchronization
sx.sync_hard()
_myposn = sx.position()
logger.info("Hard limit - value: %f" % (_myposn))
######### LIMIT +
logger.info("Moving to hard limit +")
sx.hw_limit(1,wait=False)
gevent.sleep(1)
logger.info("Searching limit+ ...")
_status = sx.state()
while _status == "MOVING":
_status = sx.state()
if logger.getEffectiveLevel() > 0 and logger.getEffectiveLevel() < 30:
sys.stdout.write('.')
sys.stdout.flush()
gevent.sleep(1)
if logger.getEffectiveLevel() > 0 and logger.getEffectiveLevel() < 30:
sys.stdout.write('.\n')
sys.stdout.flush()
logger.info("lim+ found !\n")
logger.info("Time to found lim+: "+str(time.time()-start) + "\n")
# hardware synchronization
sx.sync_hard()
_myposp = sx.position()
logger.info("Hard limit + value: %f" % (_myposp))
mid = (_myposp - _myposn) / 2.0 + _myposn
logger.info("moving to middle position %f\n" % (mid))
umv(sx,mid)
logger.info("reseting sx to 0\n")
sx.dial(0)
sx.position(0)
slp = (_myposp - _myposn) / 2.0 - 0.05
logger.info("reseting sx limits to hard limits +/- 0.05\n")
sx.limits(low_limit=-slp,high_limit=slp)
logger.info("Time end: "+str(time.time()-start))
except:
sx.stop()
#logger.setLevel("NOTSET")
# search for SY pos and neg limits and reset dial to 0 in the middle of them.
def init_sy():
try:
logger = logging.getLogger('cal.init_sy')
sy.limits(low_limit=100,high_limit=-100)
######### LIMIT -
logger.info("Moving to hard limit -")
start = time.time()
sy.hw_limit(-1,wait=False)
gevent.sleep(1)
logger.info("Searching limit- ...")
_status = sy.state()
while _status == "MOVING":
_status = sy.state()
if logger.getEffectiveLevel() > 0 and logger.getEffectiveLevel() < 30: