Commit 018e1922 authored by bliss administrator's avatar bliss administrator
Browse files

Add step scan, opiom setup, scan settings and

software position trigger continuous scan
parent 4174e63b
from bliss.scanning.acquisition.musst import MusstAcquisitionMaster
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.chain import AcquisitionMaster
# This object allows to generate triggers by musst without any program
class SoftTriggerMusstAcquisitionMaster(AcquisitionMaster):
def __init__(self, musst_device, ctime):
AcquisitionMaster.__init__(self, musst_device, "musst_step")
self.time = ctime
self.musst = musst_device
def prepare(self):
pass
def start(self):
pass
def trigger(self):
print("start trigger musst")
#run RUNCT command ---> generate 2 pulses of 100 ms on outA at
# the beginning and the end of counting +
# a gate on outB of ctime width
self.musst.ct(self.time)
def stop(self):
pass
#object needed by the default chain config to know how to create musst master in case of step-by-step scans
class MusstStepScan(object):
def __init__(self, name, config):
self.musst = config.get("musst")
def create_master_device(self, scan_pars, **settings):
ctime = scan_pars['count_time']
master = SoftTriggerMusstAcquisitionMaster(self.musst, ctime)
return master
from bliss.scanning.chain import ChainPreset
from bliss import setup_globals
class OpiomSettingsPreset(ChainPreset):
def __init__(self,mode):
ChainPreset.__init__(self)
self.mux = setup_globals.config.get('multiplexer_tomo')
self.mode = mode
self.prev_val = self.mux.getGlobalStat()
def prepare(self,chain):
if self.mode == "HR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "HR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
def stop(self,chain):
self.mux.switch("CAMERA",self.prev_val['CAMERA'])
self.mux.switch("SHMODE",self.prev_val['SHMODE'])
self.mux.switch("TRIGGER",self.prev_val['TRIGGER'])
#self.mux.switch("SHUTTER",self.prev_val['SHUTTER'])
from bliss.config.settings import Parameters
from bliss.common.session import get_current
class ScanSettings:
def __init__(self, opiom_mode=None, detector_name=None):
#create a parameter in redis with name = 'session_name:scan_settings:default'
self._params = Parameters(name=get_current().name+':scan_settings')
settings = {}
settings['opiom_mode'] = opiom_mode
settings['detector'] = detector_name
for k,v in settings.items():
if v is not None:
#replace previous setting value if already present in redis
self._params.add(k,v)
def get(self, setting_name):
return self._params.to_dict()[setting_name]
def get_all(self):
return self._params.to_dict()
from bliss.scanning.chain import AcquisitionChain, AcquisitionChannel, AcquisitionMaster
from bliss.scanning.scan import Scan, StepScanDataWatch
from bliss.scanning.acquisition.musst import MusstAcquisitionDevice
from bliss.scanning.acquisition.motor import MotorMaster, LinearStepTriggerMaster, SoftwarePositionTriggerMaster
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.acquisition.ct2 import CT2AcquisitionMaster
from bliss.scanning.acquisition.counter import SamplingCounterAcquisitionDevice, IntegratingCounterAcquisitionDevice
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss import setup_globals
from bliss.controllers.ct2.device import AcqMode as P201AcqMode
from bliss.data.node import is_zerod
from bliss.common import session, scans
from bliss.scanning.acquisition.calc import CalcAcquisitionDevice
import sys
import gevent
import numpy as np
import bliss
import time
from bliss.common import axis
from bliss.common import event
from bliss import setup_globals
from id19.musst_step_scan import SoftTriggerMusstAcquisitionMaster
from id19.opiom_preset import *
def soft_sync_cont_scan(trigger, motor, start, end, images, ctime=0.05, readout=0.05, latency=0, run=True):
"""
Continuous scan with musst triggered by soft motor position (trigger = 'position')
or time (trigger = 'time')
Args:
trigger (string): type of trigger to generate
motor (Axis): motor to scan
start (float): motor start position
stop (float): motor end position
images (int): the number of images to acquire
ctime (float): count time (seconds)
run (bool): if True (default), run the scan. False means just create
scan object and acquisition chain
"""
scan_point_time = ctime + readout + latency
scan_step_size = float(end - start) / images
scan_speed = abs(scan_step_size/scan_point_time)
print("scan step size: %.2f" % scan_step_size)
print("scan speed: %.2f" % scan_speed)
print("scan step acquisition time: %.2f" % scan_point_time)
chain = AcquisitionChain()
if trigger == 'position':
#motor triggers are generated in time using motor speed
trigger_master = SoftwarePositionTriggerMaster(motor, start, end, images, time=scan_point_time*images)
if trigger == 'time':
sleep_time = scan_point_time - ctime
trigger_master = SoftwareTimerMaster(ctime, sleep_time=sleep_time, npoints=images)
#motor_master = MotorMaster(motor, start, end, time=scan_point_time*images)
musst_device = setup_globals.config.get("musst")
#musst triggers are generated with RUNCT command
musst_master = SoftTriggerMusstAcquisitionMaster(musst_device,ctime)
#chain.add(trigger_master,motor_master)
chain.add(trigger_master,musst_master)
detector_name = ScanSettings().get('detector')
detector = setup_globals.config.get(detector)
lima_master = LimaAcquisitionMaster(detector,
acq_nb_frames=images,
acq_expo_time=ctime,
acq_trigger_mode='EXTERNAL_TRIGGER_MULTI',
save_flag=True,
prepare_once=True,
start_once=True)
lima_master.add_counter(detector.image)
chain.add(musst_master,lima_master)
opiom_mode = ScanSettings().get('opiom_mode')
chain.add_preset(OpiomSettingsPreset(opiom_mode))
scan = Scan(chain)
if run is True:
scan.run()
return scan
......@@ -19,16 +19,29 @@ import time
from bliss.common import axis
from bliss.common import event
from bliss import setup_globals
from id19.opiom_preset import *
from id19.setup import *
def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.05, latency_time=None):
if latency_time is None:
calc_readout_time= (images*ctime)/60.0
if calc_readout_time >= 5.0:
latency_time = 0.005
def musst_sync_cont_scan(trigger, motor, start, end, images, ctime=0.05, readout_time=0.05, latency_time=None, run=True):
"""
Continuous scan with musst triggered in position (trigger = 'position')
or in time (trigger = 'time')
Motor with encoder needed for this scan
Args:
trigger (string): type of trigger to generate
motor (Axis): motor to scan
start (float): motor start position
stop (float): motor end position
images (int): the number of images to acquire
ctime (float): count time (seconds)
run (bool): if True (default), run the scan. False means just create
scan object and acquisition chain
"""
scan_point_time = ctime + readout_time + latency_time
scan_step_size = float(end - start) / images
......@@ -59,7 +72,7 @@ def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.
gatewidth = int(ctime*musst_card.get_timer_factor())
# --- position mode
if mode == "position":
if trigger == "position":
pos_delta = scan_step_size*motor.steps_per_unit
enc_delta = int(pos_delta)
mot_error = abs(int((enc_delta - pos_delta) * 2**31))
......@@ -75,7 +88,7 @@ def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.
}
# --- time mode
if mode == "time":
if trigger == "time":
time_delta = int(scan_point_time*musst_card.get_timer_factor())
musst_vars = {"MOTCHAN" : mot_chan,
"MOTZERO" : mot_zero,
......@@ -100,9 +113,12 @@ def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.
chain = AcquisitionChain()
# Continuous scan
# Continuous motor mouvment
# time param is used to modify motor speed
motor_master = MotorMaster(motor, start, end, time=images*scan_point_time)
# function used to convert encoder values coming from the musst into user motor position values
def enc_to_user(sender, channel_data):
calc_data = dict()
if sender.name == 'CH3':
......@@ -118,23 +134,44 @@ def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.
chain.add(motor_master,musst_device)
detector = setup_globals.config.get("pcoedge64")
lima_master = LimaAcquisitionMaster(detector,
acq_nb_frames=images,
acq_trigger_mode='EXTERNAL_TRIGGER_MULTI',
acq_expo_time=ctime,
save_flag=True,
prepare_once=True,
start_once=True)
#if detector in enabled_counters:
#enabled_counters.remove(detector)
#detector_name = ScanSettings().get('detector')
#detector_device = setup_globals.config.get(detector_name)
#lima_master = LimaAcquisitionMaster(detector_device,
#acq_nb_frames=images,
#acq_trigger_mode='EXTERNAL_TRIGGER_MULTI',
#acq_expo_time=ctime,
#save_flag=True,
#prepare_once=True,
#start_once=True)
lima_master.add_counter(detector.image)
#lima_master.add_counter(detector_device.image)
#chain.add(motor_master,lima_master)
chain.add(motor_master,lima_master)
# P201 master
p201_card = setup_globals.config.get("p201_0")
p201_master = CT2AcquisitionMaster(p201_card,
npoints=images,
acq_expo_time=ctime,
acq_mode=P201AcqMode.ExtTrigMulti
)
chain.add(motor_master,p201_master)
p201_0_counters = IntegratingCounterAcquisitionDevice(p201_card.counters[0], npoints=images, count_time=ctime)
chain.add(p201_master, p201_0_counters)
if enabled_counters:
#npoints = 0 means infinite number of points (until scan ending)
scan_params = {'npoints': 0, 'count_time': 1 }
#recover counter objects from measurement group
counters_left = [getattr(setup_globals,name) for name in enabled_counters]
#create a second acquisition chain with a software timer as top master
chain.append(scans.DEFAULT_CHAIN.get(scan_params, counters_left))
#scan_info = {
......@@ -146,12 +183,16 @@ def motor_scan_cont(mode, motor, start, end, images, ctime=0.05, readout_time=0.
#"root_path": "/tmp_14_days/id19test/visualtomo"
#}
setup_globals.SCAN_SAVING.base_path = '/tmp_14_days/id19test'
#scan = Scan(chain, scan_info=scan_info, data_watch_callback=ScanDisplayData(motor))
scan = Scan(chain), data_watch_callback=ScanDisplayData(motor))
with MultiplexerManager("HR_FTOMO_PCO"):
opiom_mode = ScanSettings().get('opiom_mode')
#preset is used to configure opiom before start scan
chain.add_preset(OpiomSettingsPreset(opiom_mode))
scan = Scan(chain, data_watch_callback=ScanDisplayData(motor))
if run is True:
scan.run()
return scan
......@@ -177,7 +218,7 @@ class ScanDisplayData(object):
if data_node.name == 'CH3':
print('motor position: {}'.format(self.motor.position))
#print('motor position: {}'.format(self.motor.position))
data = data_node.get(-1)/self.motor.steps_per_unit
else:
data = data_node.get(-1)
......@@ -188,43 +229,4 @@ class ScanDisplayData(object):
if display:
print(self.display),
sys.stdout.flush()
class MultiplexerManager(object):
def __init__(self, mode):
self.mux = setup_globals.config.get('multiplexer_tomo')
self.mode = mode
self.prev_val = self.mux.getGlobalStat()
def __enter__(self):
if self.mode == "HR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "HR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
def __exit__(self, type, value, traceback):
self.mux.switch("CAMERA",self.prev_val['CAMERA'])
self.mux.switch("SHMODE",self.prev_val['SHMODE'])
self.mux.switch("TRIGGER",self.prev_val['TRIGGER'])
#self.mux.switch("SHUTTER",self.prev_val['SHUTTER'])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment