Commit 48da8465 authored by Emmanuel Papillon's avatar Emmanuel Papillon

* FScanRunner / FScanDiagRunner implemented

* adapt FScanMaster to runners
* Params compatible with bliss Parameters
* MusstDevList tool to locate motor on musst
* ScanPreset simplified and managed by runners
parent 49f95917
......@@ -5,7 +5,11 @@ from bliss.scanning.scan import Scan
from bliss.scanning.acquisition.motor import MotorMaster
from bliss.controllers.ct2.device import AcqMode as CT2AcqMode
from id15.mussttools import MusstProgBase, MusstProgTimeTrigger
from id15.mussttools import (
MusstProgBase,
MusstProgTimeTrigger,
MusstDevList,
)
from id15.fscantools import (
FScanTrigMode,
FScanParamBase,
......@@ -14,7 +18,6 @@ from id15.fscantools import (
)
from id15.chaintools import MGMapping
from id15.musstprog.fscan_mprg import fscan_mprg_data
from id15.fscanpreset import FScanPresetBase
class MusstProgFScan(MusstProgBase):
......@@ -108,45 +111,25 @@ class FScanPars(FScanParamBase):
class FScanMaster(object):
def __init__(self, measgroup, *musstdev):
self.name = "fscan"
self.meas_group = measgroup
self.musst_devs = musstdev
self.scan_name = "fscan"
self.pars = FScanPars("fscanpars")
self.inpars = FScanParamStruct()
self.musst_counters = list()
self.presets = dict()
self.meas_group = measgroup
self.musst_list = MusstDevList(*musstdev)
def add_preset(self, preset):
self.presets[id(preset)]= preset
self.musst_counters = list()
def store_musst_counters(self, *counters):
self.musst_counters = counters
def find_musst_for_motor(self, motor):
for musst in self.musst_devs:
if self.musst_has_motor(musst, motor):
return musst
raise ValueError(
"Cannot find motor[{0}] on musst[{1}] !!".format(
motor.name, self.musst_devs
)
)
def musst_has_motor(self, musst, motor):
try:
chan = musst.get_channel_by_name(motor.name)
return True
except:
return False
def validate(self, pars):
self.inpars = pars
def validate(self):
# --- check motor is on musst
self.find_musst_for_motor(self.inpars.motor)
self.musst = self.musst_list.find_musst_for_motor(pars.motor)
# --- calc params
pars = self.inpars
pars.motor_name = pars.motor.name
pars.motor_unit = pars.motor.unit
pars.speed = pars.step_size / pars.step_time
......@@ -177,29 +160,26 @@ Acquisition :
dead time = {dead_time:.6f} sec
nb points = {npoints}
"""
print(txt.format(**self.inpars.get()))
print(txt.format(**self.inpars.to_dict()))
def init_chain(self):
self.musst = None
self.musst_motors = tuple()
self.musst_master = None
self.motor_master = None
def init_scan(self):
self.lima_used = list()
self.trigger_name = None
self.mgmap = None
self.scan = None
self.chain = AcquisitionChain()
self.mgmap = MGMapping(self.meas_group)
def setup_acq_chain(self, chain):
mot_master = self.setup_motor_master(chain)
acq_master = self.setup_acq_master(chain, mot_master)
self.setup_acq_slaves(chain, acq_master)
def setup_motor_master(self):
def setup_motor_master(self, chain):
pars = self.inpars
# --- find musst device for motor
self.musst = self.find_musst_for_motor(pars.motor)
self.musst_motors = [ pars.motor, ]
# --- create motor master
self.motor_master = MotorMaster(pars.motor, pars.start_pos, pars.stop_pos, pars.move_time)
return MotorMaster(pars.motor, pars.start_pos, pars.stop_pos, pars.move_time)
def setup_chain_master(self):
def setup_acq_master(self, chain, mot_master):
pars = self.inpars
# --- musst prog
......@@ -222,17 +202,21 @@ Acquisition :
musstprog.sync_motors()
# --- add to chain
musstprog.setup(self.chain, self.motor_master)
musstprog.add_default_calc(self.chain)
musstprog.add_external_channel("timer", self.motor_master)
musstprog.add_external_channel(pars.motor_name, self.motor_master)
musstprog.setup(chain, mot_master)
musstprog.add_default_calc(chain)
musstprog.add_external_channel("timer", mot_master)
musstprog.add_external_channel(pars.motor_name, mot_master)
self.trigger_name = "timer_trig"
self.musst_master = musstprog.musst_master
return musstprog.musst_master
def setup_acq_slaves(self, chain, acq_master):
self.mgmap = MGMapping(self.meas_group)
def setup_chain_slaves(self):
# P201 devices
ct2pars = {"npoints": self.inpars.npoints, "acq_mode": CT2AcqMode.ExtGate}
self.mgmap.setup_ct2(self.chain, self.musst_master, ct2pars)
self.mgmap.setup_ct2(chain, acq_master, ct2pars)
# LIMA devices
limapars = {
......@@ -243,13 +227,14 @@ Acquisition :
"save_flag": self.inpars.save_flag,
}
self.mgmap.setup_lima(
self.chain, self.musst_master, limapars
chain, acq_master, limapars
)
self.lima_used = self.mgmap.get_done_list("lima")
# sampling counters
# mgmap.setup_sampling(self.chain, self._sampling_time)
# mgmap.setup_sampling(chain, self._sampling_time)
def setup_chain_scope(self):
def setup_acq_scope(self, chain, mot_master):
acqtime = max(0.001, self.inpars.step_time / 10.)
npts = int((self.inpars.scan_time + 0.1) / acqtime + 0.5)
......@@ -262,42 +247,32 @@ Acquisition :
musstprog.sync_motors()
# --- acq chain
musstprog.setup(self.chain, self.motor_master)
musstprog.add_default_calc(self.chain)
musstprog.add_external_channel("timer", self.motor_master)
musstprog.add_external_channel(self.inpars.motor_name, self.motor_master)
musstprog.setup(chain, mot_master)
musstprog.add_default_calc(chain)
musstprog.add_external_channel("timer", mot_master)
musstprog.add_external_channel(self.inpars.motor_name, mot_master)
self.musst_master = musstprog.musst_master
def init_scan(self, scan_name):
def setup_scan(self, chain, scan_name):
if self.inpars.display_flag:
self.show()
pars = self.pars
scan_info = pars.get_dict(string_format=True)
scan_info = pars.to_dict(string_format=True)
scan_info["title"] = "{0} {1} {2:g} {3:g} {4:g} {5:g} {6:d}".format(
scan_name, pars.motor.name, pars.start_pos,
pars.step_size, pars.step_time, pars.acq_time, pars.npoints)
lima_used = self.mgmap.get_done_list("lima")
self.scan = Scan(
self.chain,
scan = Scan(
chain,
name=scan_name,
save=self.inpars.save_flag,
scan_info=scan_info,
data_watch_callback=FScanDisplay(
trigger_name="timer_trig",
trigger_name=self.trigger_name,
motors= (self.inpars.motor,),
limas= lima_used,
limas= self.lima_used,
),
)
devs_used = self.mgmap.get_done_list()
for preset in self.presets.values():
if isinstance(preset, FScanPresetBase):
preset.init(self.musst, devs_used, self.pars.get_dict())
self.scan.add_preset(preset)
def start(self):
self.scan.run()
return scan
from bliss.scanning.scan import ScanPreset
class FScanPresetBase(ScanPreset):
def __init__(self):
ScanPreset.__init__(self)
self.musst = None
self.pars = dict()
self.devices = list()
def init(self, musstdev, acqdevs, pardict):
self.musst = musstdev
self.devices = acqdevs
self.pars = pardict
def set_devices(self, dev_list):
self.devices = dev_list
class FScanMuxPreset(FScanPresetBase):
class FScanMuxPreset(ScanPreset):
def __init__(self, opmux):
FScanPresetBase.__init__(self)
ScanPreset.__init__(self)
self.opmux = opmux
self.limanames = list()
def set_fscan_master(self, master):
self.limanames = [ dev.name for dev in master.lima_used ]
def start(self, scan):
for dev in self.devices:
if dev.name == "pcoedge":
for name in self.limanames:
if name == "pcoedge":
self.opmux.switch("EDGE", "MUSST")
elif dev.name == "pilatus":
elif name == "pilatus":
self.opmux.switch("PILATUS", "MUSST")
elif dev.name == "maxipix":
elif name == "maxipix":
self.opmux.switch("MAXIPIX", "MUSST")
elif dev.name == "pcoedgehs":
elif name == "pcoedgehs":
self.opmux.switch("EDGEHS", "MUSST")
elif dev.name == "pcodimax":
elif name == "pcodimax":
self.opmux.switch("DIMAX", "MUSST")
from bliss.scanning.scan import ScanPreset
from bliss.scanning.chain import AcquisitionChain
from id15.fscantools import FScanParamStruct
class FScanRunner(object):
def __init__(self, scanmaster, scanname=None):
self._master = scanmaster
self._scan_name = scanname or self._master.scan_name
self._chain = None
self._scan = None
self._scan_presets = dict()
self._chain_presets = dict()
@property
def pars(self):
return self._master.pars
@property
def inpars(self):
return self._master.inpars
def validate(self):
inpars = FScanParamStruct()
inpars.from_dict(self._master.pars.to_dict())
self._master.validate(inpars)
def show(self):
self.validate()
self._master.show()
def prepare(self):
self.validate()
self._chain = AcquisitionChain()
self._master.init_scan()
self._master.setup_acq_chain(self._chain)
self._scan = self._master.setup_scan(self._chain, self._scan_name)
self._setup_presets()
def start(self):
self._scan.run()
def run(self):
self.prepare()
self.start()
def _setup_presets(self):
# --- chain presets
for preset in self._chain_presets.values():
try:
set_master_func = getattr(preset, "set_fscan_master")
set_master_func(self._master)
except:
pass
self._chain.add_preset(preset)
# --- scan presets
for preset in self._scan_presets.values():
try:
set_master_func = getattr(preset, "set_fscan_master")
set_master_func(self._master)
except:
pass
self._scan.add_preset(preset)
def add_scan_preset(self, preset):
if not isinstance(preset, ScanPreset):
raise ValueError("preset should be an instance of ScanPreset !!")
self._scan_presets[id(preset)] = preset
def remove_scan_preset(self, preset):
if id(preset) in self._scan_presets:
self._scan_presets.remove(id(preset))
def add_chain_preset(self, preset):
if not isinstance(preset, ChainPreset):
raise ValueError("preset should be an instance of ChainPreset !!")
self._chain_presets[id(preset)] = preset
def remove_chain_preset(self, preset):
if id(preset) in self._chain_presets:
self._chain_presets.remove(id(preset))
@property
def scan(self):
if self._scan is None:
raise RuntimeError("No scan performed yet !!")
return self._scan
@property
def chain(self):
if self._chain is None:
raise RuntimeError("No chain setup yet !!")
return self._chain
def chain_show(self):
self.chain._tree.show()
class FScanDiagRunner(FScanRunner):
def prepare(self):
scan_name = self._scan_name
self.validate()
self._master.init_scan()
self._chain = AcquisitionChain()
mot_master = self._master.setup_motor_master(self._chain)
acq_master = self._master.setup_acq_master(self._chain, mot_master)
self._master.setup_acq_slaves(self._chain, acq_master)
self._scan = self._master.setup_scan(self._chain, scan_name)
self._setup_presets()
def run(self):
self.prepare()
self.start()
def prepare_dry(self):
scan_name = "{0}.dry_run".format(self._scan_name)
self.validate()
self._chain = AcquisitionChain()
self._master.init_scan()
mot_master = self._master.setup_motor_master(self._chain)
acq_master = self._master.setup_acq_master(self._chain, mot_master)
self._scan = self._master.setup_scan(self._chain, scan_name)
self._setup_presets()
def dry_run(self):
self.prepare_dry()
self.start()
def prepare_scope(self):
scan_name = "{0}.scope_run".format(self._scan_name)
self.validate()
self._chain = AcquisitionChain()
self._master.init_scan()
mot_master = self._master.setup_motor_master(self._chain)
self._master.setup_acq_scope(self._chain, mot_master)
self._scan = self._master.setup_scan(self._chain, scan_name)
self._setup_presets()
def scope_run(self):
self.prepare_scope()
self.start()
......@@ -15,7 +15,6 @@ from bliss import setup_globals
def BOLD(msg):
return "\033[1m{0}\033[0m".format(msg)
class FScanModeBase(object):
def __init__(self, name, *args):
self.__name = name
......@@ -85,11 +84,11 @@ class FScanParamBase(object):
"show",
"set",
"get",
"get_dict",
"set_dict",
"to_dict",
"from_dict",
"reset",
"save_to_file",
"load_from_file",
"to_file",
"from_file",
]
def __getattribute__(self, name):
......@@ -163,7 +162,7 @@ class FScanParamBase(object):
else:
return [self._cache[key] for key in args]
def get_dict(self, string_format=False):
def to_dict(self, string_format=False):
if string_format is False:
return dict(self._cache)
else:
......@@ -178,7 +177,7 @@ class FScanParamBase(object):
ret_dict[name] = value.tolist()
return ret_dict
def set_dict(self, pars_dict):
def from_dict(self, pars_dict):
for (name, value) in pars_dict.items():
self.__set_value(name, value)
......@@ -187,13 +186,13 @@ class FScanParamBase(object):
for (name, value) in self._cache.items():
self._settings[name] = self.__obj_2_str(name, value)
def save_to_file(self, filename):
yaml_dict = self.get_dict(string_format=True)
def to_file(self, filename):
yaml_dict = self.to_dict(string_format=True)
yaml_data = yaml.dump(yaml_dict, default_flow_style=False, sort_keys=False)
with open(filename, "w") as yaml_file:
yaml_file.write(yaml_data)
def load_from_file(self, filename):
def from_file(self, filename):
with open(filename) as yaml_file:
yaml_dict = yaml.load(yaml_file, Loader=yaml.FullLoader)
for (name, value) in yaml_dict.items():
......@@ -215,11 +214,11 @@ class FScanParamStruct(object):
def __repr__(self):
return self._get_display_str()
def set(self, pars_dict):
def from_dict(self, pars_dict):
for (name, value) in pars_dict.items():
setattr(self, name, value)
def get(self):
def to_dict(self):
ret = dict()
for key in self.__keys:
ret[key] = getattr(self, key)
......
......@@ -11,6 +11,27 @@ from bliss.common.utils import grouped
from id15.musstprog.timetrig_mprg import timetrig_mprg_data
class MusstDevList(object):
def __init__(self, *musstdev):
self.musst_devs = musstdev
def find_musst_for_motor(self, motor):
for musst in self.musst_devs:
if self.musst_has_motor(musst, motor):
return musst
raise ValueError(
"Cannot find motor[{0}] on musst[{1}] !!".format(
motor.name, self.musst_devs
)
)
def musst_has_motor(self, musst, motor):
try:
chan = musst.get_channel_by_name(motor.name)
return True
except:
return False
def _get_iter_var(val):
try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment