Commit fece9f79 authored by Emmanuel Papillon's avatar Emmanuel Papillon
Browse files

first implementation of BeamLine Check on Scans

It checks for USM mode, FE opened, shutter/valves opened
and move ID motors to last set position.
parent c6ebfc3e
import gevent
import tabulate
import math
from bliss.config.settings import HashObjSetting
from bliss.scanning.chain import ChainPreset
from bliss.scanning.scan import ScanPreset
from bliss.common.scans import DEFAULT_CHAIN
from bliss.common.logtools import user_warning
from bliss.controllers.tango_shutter import TangoShutterState
class NotInUSMMode(Exception):
def __str__(self):
return "Machine not in USM Mode"
class BLCheckError(Exception):
pass
class BLCheckTask:
def __init__(self, name, config):
self.name = name
self.title = config.get("title")
self.init(config)
def __info__(self):
return f"BLCHECK [{self.name}] : {self.title}"
def init(self, config):
pass
def check(self):
pass
class BLCheckFE(BLCheckTask):
def init(self, config):
self.machinfo = config.get("machinfo")
self.frontend = config.get("frontend")
def check(self):
all_machinfo = self.machinfo.all_information
# check USM mode
srmode = all_machinfo["SR_Mode"]
if srmode != "USM":
raise NotInUSMMode()
# check FE auto mode
feauto = all_machinfo["Automatic_Mode"]
if feauto is False:
self.machinfo.automatic_mode = True
gevent.sleep(5.)
else:
fetime = all_machinfo["Auto_Mode_Time"]
if fetime < 24 * 60 * 60:
self.machinfo.automatic_mode = True
# check FE really open
if self.frontend.is_closed:
raise BLCheckError("Cannot open FrontEnd !!")
class BLCheckBSH(BLCheckTask):
def init(self, config):
self.shutter = config.get("shutter")
self.valves = config.get("valves")
def check(self):
# check shutter is opened
state = self.shutter.state
if state == TangoShutterState.DISABLE:
raise BLCheckError(f"shutter [{self.shutter.name}] is {state}")
if self.shutter.is_closed:
self.shutter.open()
if self.shutter.is_closed:
raise BLCheckError(f"Cannot open shutter [{self.shutter.name}]")
# check valves are opened
for valve in self.valves:
if valve.is_closed:
valve.open()
if valve.is_closed:
raise BLCheckError(f"Cannot open valve [{valve.name}]")
class BLCheckIDPos(BLCheckTask):
def init(self, config):
self.motors = config.get("undulators")
def check(self):
for motor in self.motors:
if not "READY" in motor.state:
user_warning(f"{motor.name} not READY, cannot check position.")
elif not math.isnan(motor._set_position) and \
abs(motor.position - motor._set_position) < 0.01:
motor.move(motor._set_position)
class BLCheckChainPreset(ChainPreset):
def __init__(self, blcheck_master):
self.__blcheck = blcheck_master
def prepare(self, acq_chain):
self.__blcheck.auto_check()
class BLCheckScanPreset(ScanPreset):
def __init__(self, blcheck_master):
self.__blcheck = blcheck_master
def prepare(self, scan):
self.__blcheck.auto_check()
class BLCheckMaster:
def __init__(self, name, config):
self.__name = name
self.__checklist = config.get("checklist")
self.__checknames = [ check.name for check in self.__checklist ]
self.__settings = HashObjSetting(f"blcheck:{name}")
self.__state = self.__settings.get("state", False)
self.__chain_preset = BLCheckChainPreset(self)
self.__scan_preset = BLCheckScanPreset(self)
if self.__state:
self.on()
@property
def name(self):
return self.__name
@property
def chain_preset(self):
return self.__chain_preset
@property
def scan_preset(self):
return self.__scan_preset
@property
def status(self):
status = self.__state is True and "ON" or "OFF"
return status
def get_states(self):
states = dict()
for name in self.__checknames:
states[name] = self.__settings.get(name, True)
return states
def auto_check(self):
if self.__state:
self.check()
def on(self):
DEFAULT_CHAIN.add_preset(self.chain_preset, name=self.name)
self.__state = True
self.__settings["state"] = True
def off(self):
self.__state = False
self.__settings["state"] = False
def enable(self, name):
if name not in self.__checknames:
raise ValueError("Unknown check name")
self.__settings[name] = True
def disable(self, name):
if name not in self.__checknames:
raise ValueError("Unknown check name")
self.__settings[name] = False
def get(self, name):
for check in self.__checklist:
if check.name == name:
return check
raise ValueError("Unknown check name")
def __info__(self):
states = self.get_states()
heads = ["Name", "Description", "State"]
infos = list()
for check in self.__checklist:
state = states[check.name] is True and "ENABLED" or "DISABLED"
infos.append([check.name, check.title, state])
txt = tabulate.tabulate(infos, headers=heads)
txt += f"\n\nBeamLine check on scans is {self.status}\n"
return txt
def check(self):
states = self.get_states()
for blcheck in self.__checklist:
if not states[blcheck.name]:
continue
print(f"Checking {blcheck.title} ...")
try:
blcheck.check()
except NotInUSMMode:
print("Not in USM mode ==> no further checks performed.")
return
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment