Commit 33217d2e authored by Emmanuel Papillon's avatar Emmanuel Papillon

added interlaced scan : not yet finished

parent fbec647f
Pipeline #23396 canceled with stages
import time
import numpy
from bliss.scanning.scan import Scan
from bliss.scanning.acquisition.motor import MotorMaster
from bliss.controllers.ct2.device import AcqMode as CT2AcqMode
from bliss.controllers.mca import AcquisitionMode as MCAAcqMode
from bliss.controllers.mca.base import TriggerMode, BaseMCA
from bliss.scanning.chain import AcquisitionSlave
from fscan.mussttools import (
MusstProgBase,
MusstProgTimeTrigger,
MusstDevList,
)
from fscan.fscantools import (
FScanModeBase,
FScanParamBase,
FScanParamStruct,
FScanDisplay,
)
from fscan.motortools import RotationMotorChainPreset
from fscan.chaintools import ChainTool
from fscan.musstprog.fscan_mprg import fscan_mprg_data
from fscan.fscanrunner import FScanDiagRunner
FInterlacedMode = FScanModeBase("FInterlacedMode",
"REWIND",
"ZIGZAG",
"FORWARD"
)
class FInterlacedPars(FScanParamBase):
DEFAULT = {
"motor": None,
"start_pos": 0,
"acq_size": 0.1,
"acq_time": 0.8,
"npoints": 10,
"mode": FInterlacedMode.REWIND,
"shift_pos": 0.,
"save_flag": True,
"display_flag": True,
"sync_encoder": True,
"acc_margin": 0.,
"sampling_time": 0.5,
}
OBJKEYS = ["motor"]
LISTVAL = {
"mode": FInterlacedMode.values,
}
def __init__(self, name):
FScanParamBase.__init__(self, name,
FInterlacedPars.DEFAULT,
FInterlacedPars.OBJKEYS,
FInterlacedPars.LISTVAL)
def _validate_npoints(self, value):
if not int(value) > 0:
raise ValueError("npoints should be > 0")
return int(value)
def _validate_save_flag(self, value):
return value and True or False
def _validate_display_flag(self, value):
return value and True or False
def _validate_sync_encoder(self, value):
return value and True or False
def _validate_scan_mode(self, value):
return FInterlacedMode.get(value, "mode")
def _validate_acc_margin(self, value):
return abs(value)
class FInterlacedMaster(object):
def __init__(self, name, config):
self.name = name
self.config = config
self.musst_list = MusstDevList(config["devices"]["musst"])
self.meas_group = config["devices"]["measgroup"]
self.rot_motors = config["devices"]["rotation_motors"]
self.chain_config = config["chain_config"]
self.pars = FInterlacedPars(self.name)
self.inpars = None
self.mgchain = None
def validate(self):
self.inpars = FScanParamStruct(self.pars.to_dict())
pars = self.inpars
# --- check motor is on musst
self.musst = self.musst_list.find_musst_for_motor(pars.motor)
# --- forward mode on rotation motor only
if pars.mode == FInterlacedMode.FORWARD:
if pars.motor not in self.rot_motors:
raise ValueError(f"In interlaced FORWARD mode, can only use motors {self.rot_motors}")
# --- calc params
pars.motor_name = pars.motor.name
pars.motor_unit = pars.motor.unit is None and "unit" or pars.motor.unit
pars.speed = abs(pars.acq_size) / pars.acq_time
pars.acc_time = pars.acc_margin / pars.speed + pars.speed / pars.motor.acceleration
pars.acc_disp = pars.acc_margin + 0.5 * pars.speed ** 2 / pars.motor.acceleration
pars.step_size = 2 * pars.acq_size
pars.step_time = 2 * pars.acq_time
if pars.mode == FInterlacedMode.FORWARD:
pars.npoints_arr = numpy.array([int(pars.npoints / 2), ] * 2, numpy.int)
if pars.acq_size > 0:
pars.scan_dir = 1
else:
pars.scan_dir = -1
pars.start_arr = numpy.array([pars.start_pos, pars.start_pos + pars.scan_dir * 360.0 + pars.acq_size], numpy.float)
pars.stop_arr = pars.start_arr + pars.npoints_arr * pars.step_size
pars.move_time = abs(pars.stop_arr[1] - pars.start_arr[0]) / pars.speed
pars.scan_time = 2 * pars.acc_time + pars.move_time
if pars.mode in (FInterlacedMode.REWIND, FInterlacedMode.ZIGZAG):
pars.npoints_arr = numpy.array([int(pars.npoints / 2),] * 2, numpy.int)
pars.npoints_arr[0] += pars.npoints%2
if pars.acq_size > 0:
pars.scan_dir = [ 1, 1 ]
else:
pars.scan_dir = [ -1, -1 ]
pars.start_arr = numpy.array([pars.start_pos, pars.start_pos + pars.acq_size], numpy.float)
pars.stop_arr = pars.start_arr + pars.npoints_arr * pars.step_size
if pars.mode == FInterlacedMode.ZIGZAG:
pars.scan_dir[1] *= -1
pars.start_arr[1] = pars.stop_arr[0] + pars.shift_pos
pars.stop_arr[1] = pars.start_arr[0] + pars.shift_pos
if pars.npoints%2:
pars.start_arr[1] -= pars.step_size
pars.move_time_arr = abs(pars.stop_arr - pars.start_arr)/pars.speed
pars.scan_time = numpy.sum(2 * pars.acc_time + pars.move_time_arr)
def show(self):
if self.inpars is None:
raise RuntimeError(f"FInterlacedMaster[{self.name}]: Parameters not validated yet !!")
mot_txt = """
Motor : {motor_name}
velocity = {speed:.4f} {motor_unit}/sec
acceleration size = {acc_disp:.4f} {motor_unit} (margin = {acc_margin:.4f})
acceleration time = {acc_time:.6f} sec
"""
acq_txt = """
Acquisition :
acq time = {acq_time:.6f} sec
acq size = {acq_size:.4f} {motor_unit}
total nb points = {npoints}
"""
print(mot_txt.format(**self.inpars.to_dict()))
pars = self.inpars
print(f"Scan in {pars.mode} mode:")
for ii in range(2):
print(" Pass #{idx:d} : {npts:d} points from {start:.4f} to {stop:.4f}".format(
idx=ii+1, npts=pars.npoints_arr[ii], start=pars.start_arr[ii], stop=pars.stop_arr[ii]))
print(acq_txt.format(**self.inpars.to_dict()))
def init_scan(self):
self.mgchain = ChainTool(self.meas_group, self.chain_config)
self.lima_used = list()
self.home_preset = None
def get_controllers_found(self, ctrl_type):
mgchain = ChainTool(self.meas_group)
return mgchain.get_controllers_not_done(ctrl_type)
def get_controllers_used(self, ctrl_type):
if self.mgchain is None:
return []
else:
return mgchain.get_controllers_done(ctrl_type)
def setup_acq_chain(self, chain):
mot_master = self.setup_motor_master(chain)
acq_master = self.setup_acq_master(chain, mot_master)
self.setup_acq_slaves(chain, acq_master)
def setup_motor_master(self, chain):
pars = self.inpars
self.musst_motors = [ pars.motor, ]
if pars.home_rotation is True and pars.motor in self.rot_motors:
self.home_preset = RotationMotorChainPreset(pars.motor, homing=True, rounding=False)
chain.add_preset(self.home_preset)
# --- create motor master
return MotorMaster(pars.motor, pars.start_pos, pars.stop_pos, pars.move_time,
undershoot_start_margin=pars.acc_margin, undershoot_end_margin=pars.acc_margin)
def setup_acq_master(self, chain, mot_master):
pars = self.inpars
# --- musst prog
musstprog = MusstProgFScan(self.musst, self.musst_motors)
musstprog.check_max_timer(pars.scan_time)
musstprog.set_modes(pars.scan_mode, pars.gate_mode)
if pars.scan_mode == FScanTrigMode.TIME:
delta = pars.step_time
else:
delta = pars.step_size
if pars.gate_mode == FScanTrigMode.TIME:
gatewidth = pars.acq_time
else:
gatewidth = pars.acq_size
musstprog.set_params(pars.start_pos, pars.scan_dir, delta, gatewidth, pars.npoints)
# --- sync motors on musst
if self.home_preset:
self.home_preset.set_musst_sync_cb(musstprog.sync_one_motor)
elif pars.sync_encoder:
musstprog.sync_motors()
# --- add to chain
musstprog.setup(chain, mot_master)
musstprog.add_default_calc(chain)
# --- motor external channel
musstprog.set_motor_external_channel(pars.motor_name, mot_master)
return musstprog.musst_master
def setup_acq_slaves(self, chain, acq_master):
pars = self.inpars
# --- ct2 counters
ct2_params={
"npoints": pars.npoints,
"acq_mode": CT2AcqMode.ExtGate,
"prepare_once": True,
"start_once": True
}
self.mgchain.setup("ct2", chain, acq_master, ct2_params)
# --- lima devices
lima_params = {
"acq_trigger_mode": "EXTERNAL_TRIGGER_MULTI",
"acq_expo_time": pars.acq_time,
"acq_nb_frames": pars.npoints,
"wait_frame_id": [pars.npoints-1,],
"prepare_once": True,
"start_once": True,
"scan_frame_per_file": pars.npoints,
}
self.mgchain.setup("lima", chain, acq_master, lima_params)
self.lima_used = self.mgchain.get_controllers_done("lima")
# --- mca devices
for mca_ctrl in self.mgchain.get_controllers_not_done("mca"):
try:
is_hwsca = mca_ctrl.acquisition_mode == MCAAcqMode.HWSCA
except:
is_hwsca = False
if is_hwsca:
mca_params = {
"npoints": self.inpars.npoints,
"prepare_once": True,
"start_once": True,
}
self.mgchain.setup("mca", chain, acq_master, mca_params, mca_ctrl.name)
#
# "trigger_mode": TriggerMode.SYNC,
# "preset_time": 0.9, # not used in SYNC trigger mode
# "block_size": None,
# "polling_time": 0.1,
# "spectrum_size": None
# "prepare_once": False, # False => re-do 'prepare' at each line.
# "start_once": False # False => re-do 'start' at each line.
# --- mythen
mythen_params = {
"trigger_type": AcquisitionSlave.HARDWARE,
"npoints": pars.npoints,
"count_time": pars.acq_time,
}
self.mgchain.setup("mythen", chain, acq_master, mythen_params)
# --- sampling
self.mgchain.setup_sampling(chain, pars.sampling_time)
def setup_acq_scope(self, chain, mot_master):
acqtime = max(0.001, self.inpars.step_time / 10.)
npts = int((self.inpars.scan_time + 0.1) / acqtime + 0.5)
# --- musst time prog
musstprog = MusstProgTimeTrigger(self.musst, self.musst_motors)
musstprog.set_params(npts, acqtime)
# --- sync motors on musst
if self.home_preset:
self.home_preset.set_musst_sync_cb(musstprog.sync_one_motor)
elif self.inpars.sync_encoder:
musstprog.sync_motors()
# --- acq chain
musstprog.setup(chain, mot_master)
musstprog.add_default_calc(chain)
def setup_scan(self, chain, scan_name):
if self.inpars.display_flag:
self.show()
pars = self.pars
scan_info = pars.to_dict(string_format=True)
scan_info["title"] = "{0} {1} {2:g} {3:g} {4:g} {5:g} {6:d}".format(
scan_name, pars.motor.name, pars.start_pos,
pars.step_size, pars.step_time, pars.acq_time, pars.npoints)
scan_info["npoints"] = self.inpars.npoints
scan_info["type"] = "fscan"
scan = Scan(
chain,
name=scan_name,
save=self.inpars.save_flag,
scan_info=scan_info,
data_watch_callback=FScanDisplay(
self.musst,
trigger_name="timer",
motors= (self.inpars.motor,),
limas= self.lima_used,
),
)
return scan
def get_runner(self, name="fscan"):
return FScanDiagRunner(name, self)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment