Commit 0c7d1e44 authored by bliss administrator's avatar bliss administrator
Browse files

test continuous scan

parent 05eb4263
from bliss.scanning.chain import AcquisitionChain, AcquisitionChannel, AcquisitionMaster
from bliss.scanning.scan import Scan, StepScanDataWatch
from bliss.scanning.acquisition.musst import MusstAcquisitionDevice
from bliss.scanning.acquisition.motor import MotorMaster, LinearStepTriggerMaster, SoftwarePositionTriggerMaster
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.acquisition.ct2 import CT2AcquisitionMaster
from bliss.scanning.acquisition.counter import SamplingCounterAcquisitionDevice, IntegratingCounterAcquisitionDevice
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss import setup_globals
from bliss.controllers.ct2.device import AcqMode as P201AcqMode
from bliss.data.node import is_zerod
from bliss.common import session, scans
import sys
import gevent
import numpy
import bliss
import time
from bliss.common import axis
from bliss.common import event
from bliss import setup_globals
# Test without hardware
#def ftomo_scan_cont(motor, start, end, images, ctime, readout_time, latency_time):
## Step scan
#motor_master = LinearStepTriggerMaster(images, motor, start, end)
## Continuous scan
#motor_master = SoftwarePositionTriggerMaster(motor, start, end, images, time=images*scan_point_time)
#timer_device = SoftwareTimerMaster(count_time=ctime, npoints=images)
#chain = AcquisitionChain()
#chain.add(motor_master,timer_device)
##counter = setup_globals.config.get("simct1")
##acq_device = SamplingCounterAcquisitionDevice(counter,
## npoints=images,
## count_time=ctime,
## mode=SamplingCounterAcquisitionDevice.INTEGRATE)
##chain.add(timer_device,acq_device)
def ftomo_scan_cont(mode, motor, start, end, images, ctime, readout_time, latency_time=None):
if latency_time is None:
calc_readout_time= (images*ctime)/60.0
if calc_readout_time >= 5.0:
latency_time = 0.005
scan_point_time = ctime + readout_time + latency_time
scan_step_size = float(end - start) / images
scan_speed = abs(scan_step_size/scan_point_time)
print("scan step size: %.2f" % scan_step_size)
print("scan speed: %.2f" % scan_speed)
print("scan step acquisition time: %.2f" % scan_point_time)
# Continuous scan
motor_master = MotorMaster(motor, start, end, time=images*scan_point_time)
musst_card = setup_globals.config.get("musst")
musst_card.ABORT
enc_channel = musst_card.get_channel_by_name(motor.name)
enc_name = 'enc_%s' % motor.name
# --- set encoder value
motor.wait_move()
motor_position = motor.position()
enc_channel.value = motor_position * motor.steps_per_unit
mot_chan = enc_channel.channel_id
mot_zero = int(round(start*motor.steps_per_unit))
print(mot_zero)
storemask = 1 | (1<<mot_chan)
# --- position mode
if mode == "position":
pos_delta = scan_step_size*motor.steps_per_unit
print(pos_delta)
enc_delta = int(pos_delta)
print(enc_delta)
mot_error = abs(int((enc_delta - pos_delta) * 2**31))
print(mot_error)
gatewidth = int(round(ctime*motor.steps_per_unit))
print(gatewidth)
musst_vars = {"MOTCHAN" : mot_chan,
"MOTZERO" : mot_zero,
"PTCHAN" : mot_chan,
"PTDELTA" : enc_delta,
"GATEMODE" : 2,
"NPULSES" : images,
"GATEWIDTH" : gatewidth,
"MOTERR" : mot_error,
"STOREMASK": storemask
}
# --- time mode
if mode == "time":
time_delta = int(scan_point_time*musst_card.get_timer_factor())
gatewidth = int(ctime*musst_card.get_timer_factor())
musst_vars = {"MOTCHAN" : mot_chan,
"MOTZERO" : mot_zero,
"PTCHAN" : 0,
"PTDELTA" : time_delta,
"GATEMODE" : 0,
"NPULSES" : images,
"GATEWIDTH" : gatewidth,
"STOREMASK": storemask
}
# MUSST device
store_list = ["TIMER","MOT"+str(mot_chan)]
musst_device = MusstAcquisitionDevice(musst_card,
program = 'ftomo.mprg',
program_start_name = 'TOMO',
program_abort_name = 'TOMO_CLEAN',
store_list = store_list,
vars=musst_vars)
chain = AcquisitionChain()
chain.add(motor_master,musst_device)
# P201 master
#p201_card = setup_globals.config.get("p201_0")
#p201_master = CT2AcquisitionMaster(p201_card,
#npoints=images,
#acq_expo_time=ctime,
#acq_mode=P201AcqMode.ExtTrigMulti)
#chain.add(motor_master,p201_master)
#p201_0_counters = IntegratingCounterAcquisitionDevice(p201_card.counters, npoints=images,
#count_time=ctime)
#chain.add(p201_master, p201_0_counters)
# Lima master
detector = setup_globals.config.get("pcoedge64")
lima_master = LimaAcquisitionMaster(detector,
acq_nb_frames=images,
acq_trigger_mode='EXTERNAL_TRIGGER_MULTI',
acq_expo_time=ctime,
save_flag=True,
prepare_once=True,
start_once=True)
lima_master.add_counter(detector.image)
chain.add(motor_master,lima_master)
scan = Scan(chain,save=False,data_watch_callback=ScanDisplayData(motor,detector,images,musst_card))
with MultiplexerManager("MR_FTOMO_PCO"):
scan.run()
return scan
class ScanDisplayData(object):
def __init__(self,motor,detector,nb_points,musst_card):
self.motor = motor
self.detector = detector
self.musst = musst_card
self.nb_points = nb_points
#def on_state(self,state):
#return True
def __call__(self,data_events,nodes,info):
display = False
for acq_device,events in data_events.iteritems():
data_node = nodes.get(acq_device)
if is_zerod(data_node):
display = True
self.display = ''
#if info.get('state') == Scan.PREPARE_STATE and data_node.name == self.motor.name:
#start_pos = data_node.get(-1)
#undershoot = self.motor.velocity()**2 / (2.0*self.motor.acceleration())
#real_end_pos = self.motor.settings.get('_set_position')
#end_pos = real_end_pos - undershoot
#step_size = float(end_pos - start_pos)/self.nb_points
#self.display = '\n'
#if info.get('state') == Scan.START_STATE:
print(data_node.name + ' ' + str(data_node.get(-1)))
current_image = self.detector.proxy.last_image_ready
print(current_image)
#if current_image < self.nb_points:
#mot_pos = start_pos+current_image*step_size
#self.display += '%s triggering position: %.2f ' % (self.motor.name,mot_pos)
#self.display += 'image acquisition (%d/%d)\n\n' % (current_image+1,self.nb_points)
if display:
print(self.display),
sys.stdout.flush()
class MultiplexerManager(object):
def __init__(self, mode):
self.mux = setup_globals.config.get('multiplexer_tomo')
self.mode = mode
self.prev_val = self.mux.getGlobalStat()
def __enter__(self):
if self.mode == "HR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "HR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_HR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_FRELON":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","CCD")
self.mux.switch("TRIGGER","MUSST_TRIG")
#_opmux_ask("SHUTTER") #TO MODIFY
elif self.mode == "MR_FTOMO_PCO":
self.mux.switch("CAMERA","CAM_MR")
self.mux.switch("SHMODE","SOFT")
self.mux.switch("TRIGGER","MUSST_TRIG")
self.mux.switch("SOFTSHUT","CLOSE")
#_opmux_ask("SHUTTER") #TO MODIFY
def __exit__(self, type, value, traceback):
self.mux.switch("CAMERA",self.prev_val['CAMERA'])
self.mux.switch("SHMODE",self.prev_val['SHMODE'])
self.mux.switch("TRIGGER",self.prev_val['TRIGGER'])
#self.mux.switch("SHUTTER",self.prev_val['SHUTTER'])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment