Commit d05a51b2 authored by Antonia Beteva's avatar Antonia Beteva Committed by Matias Guijarro
Browse files

Revert "Added state command for the tests of a dummy tango shutter."

This reverts commit ff59761b
parent 44c2ec74
......@@ -5,50 +5,58 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import functools
""" BaseShutter, BaseShutterState, ShutterSwitch, Shutter"""
import functools
import time
from enum import Enum, unique
from gevent import lock
from bliss.config.conductor.client import Lock
from bliss.config.channels import Cache, Channel
from bliss.config.channels import Cache
from bliss.config.settings import HashObjSetting
from bliss.common.switch import Switch as BaseSwitch
class ShutterSwitch(BaseSwitch):
def __init__(self, set_open, set_closed, is_opened):
"""ShutterSwitch"""
def __init__(self, set_open, set_closed, is_open):
BaseSwitch.__init__(self, "ShutterSwitch" + str(id(self)), {})
self._set_open = set_open
self._set_closed = set_closed
self._is_opened = is_opened
self._is_open = is_open
def _states_list(self):
return ["OPEN", "CLOSED"]
"""Return list of states"""
return [BaseShutterState.OPEN, BaseShutterState.CLOSED]
def _set(self, state):
"""Set state"""
if state == "OPEN":
return self._set_open()
else:
return self._set_closed()
return self._set_closed()
def _get(self):
if self._is_opened():
return "OPEN"
else:
return "CLOSED"
"""Get the state"""
if self._is_open():
return BaseShutterState.OPEN
return BaseShutterState.CLOSED
class BaseShutter(object):
"""Define a simple shutter"""
@unique
class BaseShutterState(Enum):
""" Base states"""
OPEN, CLOSED, UNKNOWN, OTHER = list(range(4)) # state
STATE2STR = {
OPEN: ("OPEN", "Shutter is open"),
CLOSED: ("CLOSED", "Shutter is closed"),
UNKNOWN: ("UNKNOWN", "Unknown shutter state"),
OTHER: ("OTHER", "shutter state specific to controller"),
}
UNKNOWN = "Unknown state"
OPEN = "Open"
CLOSED = "Closed"
FAULT = "Fault state"
class BaseShutter:
"""Define a simple shutter"""
# Properties
@property
......@@ -68,24 +76,28 @@ class BaseShutter(object):
@property
def state_string(self):
return self.STATE2STR.get(self.state, self.STATE2STR[self.UNKNOWN])
"""Transfer state to a string"""
try:
return BaseShutterState.__members__[self.state].value
except KeyError:
return BaseShutterState.UNKNOWN.value
@property
def is_open(self):
"""States if the shutter is open"""
return self.state == self.OPEN
"""Check if the device is open"""
return self.state == BaseShutterState.OPEN
@property
def is_closed(self):
"""States if the shutter is closed"""
return self.state == self.CLOSED
"""Check if the device is closed"""
return self.state == BaseShutterState.CLOSED
# Methods
def open(self):
def open(self, timeout=None):
"""Method that opens the shutter"""
raise NotImplementedError
def close(self):
def close(self, timeout=None):
"""Method that closes the shutter"""
raise NotImplementedError
......@@ -100,6 +112,8 @@ class BaseShutter(object):
class Shutter(BaseShutter):
"""Shutter class"""
MANUAL, EXTERNAL, CONFIGURATION = list(range(3)) # modes
MODE2STR = {
MANUAL: ("MANUAL", "Manual mode"),
......@@ -134,7 +148,7 @@ class Shutter(BaseShutter):
self._external_ctrl = config.get("external-control")
self.__settings = HashObjSetting("shutter:%s" % name)
self.__initialized_hw = Cache(self, "initialized", default_value=False)
self.__state = Cache(self, "state", default_value=Shutter.UNKNOWN)
self.__state = Cache(self, "state", default_value=BaseShutterState.UNKNOWN)
self._init_flag = False
self.__lock = lock.Semaphore()
......@@ -158,7 +172,7 @@ class Shutter(BaseShutter):
"with a switch object".format(name)
)
else:
if not "OPEN" in states or not "CLOSED" in states:
if "OPEN" and "CLOSED" not in states:
raise ValueError(
"external-ctrl : {0} doesn't"
" have 'OPEN' and 'CLOSED' states".format(name)
......@@ -190,7 +204,6 @@ class Shutter(BaseShutter):
initialize the hardware.
It's will be call only once (by the first client).
"""
pass
@property
def name(self):
......@@ -202,13 +215,14 @@ class Shutter(BaseShutter):
@property
def settings(self):
"""Return the settings"""
return self.__settings
@property
def mode(self):
"""
shutter mode can be MANUAL,EXTERNAL,CONFIGURATION
In CONFIGURATION mode, shutter can't be opened/closed.
**CONFIGURATION** could mean that the shutter is in tuning mode
i.e: changing open/close position in case of a motor.
......@@ -231,7 +245,7 @@ class Shutter(BaseShutter):
self._set_mode(value)
if value in (self.CONFIGURATION, self.EXTERNAL):
# Can't cache the state if external or configuration
self.__state.value = self.UNKNOWN
self.__state.value = BaseShutterState.UNKNOWN
self.__settings["mode"] = value
def _set_mode(self, value):
......@@ -241,19 +255,22 @@ class Shutter(BaseShutter):
def state(self):
self.init()
mode = self.mode
if mode == self.MANUAL and self.__state.value == self.UNKNOWN:
if mode == self.MANUAL and self.__state.value == BaseShutterState.UNKNOWN:
return_state = self._state()
self.__state.value = return_state
return return_state
else:
if mode == self.EXTERNAL:
if self.external_control is not None:
switch_state = self.external_control.get()
return self.OPEN if switch_state == "OPEN" else self.CLOSED
else:
return self.UNKNOWN
elif mode == self.CONFIGURATION:
return self.UNKNOWN
if mode == self.EXTERNAL:
if self.external_control is not None:
switch_state = self.external_control.get()
return (
BaseShutterState.OPEN
if switch_state == "OPEN"
else BaseShutterState.CLOSED
)
if mode == self.CONFIGURATION:
return BaseShutterState.UNKNOWN
return BaseShutterState.UNKNOWN
return self.__state.value
def _state(self):
......@@ -261,6 +278,7 @@ class Shutter(BaseShutter):
@property
def external_control(self):
"""Return the external_control"""
return self._external_ctrl
@lazy_init
......@@ -290,22 +308,22 @@ class Shutter(BaseShutter):
Those timing will be register into the settings.
returns (opening,closing) time
"""
previous_mode = self.mode()
previous_mode = self.mode
try:
if previous_mode != self.MANUAL:
self.mode(self.MANUAL)
self.mode = self.MANUAL
opening_time, closing_time = self._measure_open_close_time()
self.__settings["opening_time"] = opening_time
self.__settings["closing_time"] = closing_time
return open_time, close_time
return opening_time, closing_time
finally:
if previous_mode != self.MANUAL:
self.mode(previous_mode)
self.mode = previous_mode
def _measure_open_close_time(self):
"""
This method can be overloaded if needed.
Basic timing on
Basic timing on. No timeout to wait opening/closing.
"""
self.close() # ensure it's closed
start_time = time.time()
......@@ -319,23 +337,28 @@ class Shutter(BaseShutter):
@lazy_init
def open(self):
"""Open the shutter
Returns:
(enum): The state of the shutter
Raises:
RuntimeError: Cannot open the shutter,
no external-control configured.
"""
mode = self.mode
if mode == self.EXTERNAL:
if self._external_ctrl is None:
raise RuntimeError(
"Can't open the shutter because no "
"external-control is configured"
"Cannot open the shutter, no external-control configured."
)
else:
ret = self._external_ctrl.set("OPEN")
ret = self._external_ctrl.set("OPEN")
elif mode != self.MANUAL:
raise RuntimeError(
"Can't open the shutter in %s"
"Cannot open, shutter in %s"
% self.MODE2STR.get(mode, ("Unknown", "Unknown mode"))[1]
)
else:
ret = self._open()
self.__state.value = self.OPEN
self.__state.value = BaseShutterState.OPEN
return ret
def _open(self):
......@@ -343,15 +366,20 @@ class Shutter(BaseShutter):
@lazy_init
def close(self):
"""Close the shutter
Returns:
(enum): The state of the shutter
Raises:
RuntimeError: Cannot open the shutter,
no external-control configured.
"""
mode = self.mode
if mode == self.EXTERNAL:
if self._external_ctrl is None:
raise RuntimeError(
"Can't close the shutter because no "
"external-control is configured"
"Cannot close the shutter, no external-control configured."
)
else:
ret = self._external_ctrl.set("CLOSED")
ret = self._external_ctrl.set("CLOSED")
elif mode != self.MANUAL:
raise RuntimeError(
"Can't close the shutter in %s"
......@@ -359,22 +387,21 @@ class Shutter(BaseShutter):
)
else:
ret = self._close()
self.__state.value = self.CLOSED
self.__state.value = BaseShutterState.CLOSED
return ret
def _close(self):
raise NotImplementedError
def set_external_control(self, set_open, set_closed, is_opened):
def set_external_control(self, set_open, set_closed, is_open):
"""
Programmatically set shutter in external control mode,
and create _external_ctrl switch using callback functions
"""
if not all(map(callable, (set_open, set_closed, is_opened))):
if not all(map(callable, (set_open, set_closed, is_open))):
raise TypeError(
"%s.set_external_control: set_open, set_closed, is_opened functions must be callable"
% self.name
f"{self.name}.set_external_control: set_open, set_closed, is_open functions must be callable"
)
switch = ShutterSwitch(set_open, set_closed, is_opened)
switch = ShutterSwitch(set_open, set_closed, is_open)
self._external_ctrl = switch
self.init()
......@@ -5,54 +5,69 @@
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from gevent import Timeout, sleep
from bliss.common.tango import DeviceProxy, DevFailed
from bliss.common.shutter import BaseShutter
"""
Tango shutter is used to control both front end and safetry shutter.
Some commands/attributes (like atomatic/manual) are only implemented in the
Tango shutter is used to control frontend or safety shutter or a valve.
Some commands/attributes (like automatic/manual) are only implemented in the
front end device server, set by the _frontend variable.
example yml file:
-
#front end shutter
class: tango_shutter
# front end shutter
class: TangoShutter
name: frontend
uri: //orion:10000/fe/id/30
uri: //orion:10000/fe/master/id30
-
#safety shutter
class:tango_shutter
# safety shutter
class: TangoShutter
name: safshut
uri: id30/bsh/1
"""
from enum import Enum
from gevent import Timeout, sleep
from bliss.common.shutter import BaseShutter, BaseShutterState
from bliss.common.tango import DeviceProxy, DevFailed
from bliss.common.logtools import log_warning
TangoShutterState = Enum(
"TangoShutterState",
dict(
{
"MOVING": "Moving",
"DISABLE": "Hutch not searched",
"STANDBY": "Wait for permission",
"RUNNING": "Automatic opening",
},
**{item.name: item.value for item in BaseShutterState},
),
)
class TangoShutter(BaseShutter):
""" Handle Tango frontend or safety shutter or a valve"""
class tango_shutter(BaseShutter):
def __init__(self, name, config):
tango_uri = config.get("uri")
self.__name = name
self.__config = config
self.__control = DeviceProxy(tango_uri)
self._frontend = None
self._mode = None
self._init_type()
self._mode = False
def _init_type(self):
try:
self._frontend = "FrontEnd" in self.__control.info().dev_class
except:
self._frontend = None
self._frontend = "FrontEnd" in self.__control.info().dev_class
@property
def frontend(self):
""" Check if the device is a front end type
Returns:
(bool): True if it is a front end, False otherwise
"""
if self._frontend is None:
self._init_type()
if self._frontend is None:
raise RuntimeError("Tango server is not running %s" % self.name)
return self._frontend
@property
......@@ -66,122 +81,138 @@ class tango_shutter(BaseShutter):
return self.__config
@property
def state(self):
try:
s = self._tango_state
if s == "OPEN":
return self.OPEN
elif s == "CLOSE":
return self.CLOSED
else:
return self.OTHER
except DevFailed:
raise RuntimeError(
"Shutter {}: Communication error with {}".format(
self.__name, self.__control.dev_name()
)
)
return self.UNKNOWN
def _tango_state(self):
""" Read the tango state. Available PyTango states: 'ALARM', 'CLOSE',
'DISABLE', 'EXTRACT', 'FAULT', 'INIT', 'INSERT', 'MOVING', 'OFF',
'ON', 'OPEN', 'RUNNING', 'STANDBY', 'UNKNOWN'.
Returns:
(str): The state from the device server.
"""
return self.__control.state().name
@property
def _tango_state(self):
return str(self.__control.state())
def _tango_status(self):
""" Read the status.
Returns:
(str): Complete state from the device server.
"""
return self.__control.status()
@property
def state_string(self):
s = self.state
if s in [self.OPEN, self.CLOSED, self.UNKNOWN]:
return self.STATE2STR.get(s, self.STATE2STR[self.UNKNOWN])
else:
return (self._tango_state, self._tango_status)
def state(self):
""" Read the state.
Returns:
(enum): state as enum
Raises:
RuntimeError: If DevFailed from the device server
"""
try:
state = self._tango_state
if "CLOSE" in state:
return TangoShutterState.CLOSED
return TangoShutterState.__members__[state]
except DevFailed:
raise RuntimeError(f"Communication error with {self.__control.dev_name()}")
@property
def _tango_status(self):
return str(self.__control.status())
def open(self):
state = self._tango_state
if state == "STANDBY":
raise RuntimeError("Cannot open shutter in STANDBY state")
if state == "OPEN":
# user log message: shutter already open
return
if state == "CLOSE":
def state_string(self):
"""Return state as combined string
Returns
(tuple): state as string, tango status
"""
return self.state.value, self._tango_status
def open(self, timeout=60):
"""Open
Args:
(float): Timeout [s] to wait until execution finished
Raises:
RuntimeError: Cannot execute if device in wrong state
"""
state = self.state
if state.name in ("OPEN", "RUNNING"):
log_warning(self, "Already open, command ignored")
if state == TangoShutterState.CLOSED:
try:
self.__control.open()
self._wait("OPEN", 5)
except:
raise RuntimeError("Cannot open shutter")
self._wait(TangoShutterState.OPEN, timeout)
except RuntimeError as err:
print(err)
raise
else:
raise RuntimeError("Trouble opening shutter: " + self.state_string)
return
def close(self):
state = self._tango_state
if state == "OPEN" or state == "RUNNING":
raise RuntimeError(f"Cannot open: {state.value}")
def close(self, timeout=60):
"""Close
Args:
(float): Timeout [s] to wait until execution finished
Raises:
RuntimeError: Cannot execute if device in wrong state
"""
state = self.state
if state == TangoShutterState.CLOSED:
log_warning(self, "Already closed, command ignored")
elif state.name in ("OPEN", "RUNNING"):
try:
self.__control.close()
self._wait("CLOSE", 5)
except:
raise RuntimeError("Cannot close shutter")
self._wait(TangoShutterState.CLOSED, timeout)
except RuntimeError as err:
print(err)
raise
else:
raise RuntimeError("Trouble closing shutter: " + str(self.state_string))
raise RuntimeError(f"Cannot close: {state.value}")
def set_automatic(self):
if not self.frontend:
raise NotImplementedError("Not a Front End shutter")
# try to set to automatic if manual mode only.
if self._mode == "MANUAL":
s = self._tango_state
if s == "CLOSE" or s == "OPEN":
try:
self.__control.automatic()
self._wait_mode(mode="AUTOMATIC")
except:
raise RuntimeError("Cannot set automatic mode closing")
else:
## TODO some user log message
pass
def set_manual(self):
if not self.frontend:
raise NotImplementedError("Not a Front End shutter")
# try to set to manual if automatic mode only.
if self._mode == "AUTOMATIC":
s = self._tango_state
if s == "CLOSE" or s == "RUNNING":
try:
self.__control.manual()
self._wait_mode(mode="MANUAL")
except:
raise RuntimeError("Cannot set manual mode closing")
else:
pass
# TODO some user log message
def get_closing_mode(self):
@property
def mode(self):
""" Get the opening mode.
Raises:
NotImplementedError: Not a Frontend shutter
"""
if not self.frontend:
raise NotImplementedError("Not a Front End shutter")
raise NotImplementedError("Not a Frontend shutter")
try:
_mode = self.__control.automatic_mode
except Exception:
except AttributeError:
_mode = None
self._mode = "AUTOMATIC" if _mode else "MANUAL" if _mode == False else "UNKNOWN"
self._mode = "AUTOMATIC" if _mode else "MANUAL" if _mode is False else "UNKNOWN"
return self._mode
@mode.setter
def mode(self, mode):
"""Set the frontend opening mode
Args:
mode (str): MANUAL or AUTOMATIC
Raises: NotImplementedError: Not a Fronend shutter.
"""
if not self.frontend:
raise NotImplementedError("Not a Frontend shutter")
try:
if mode == "MANUAL":
self.__control.manual()
elif mode == "AUTOMATIC":
self.__control.automatic()