TomoSampleChanger.py 34.23 KiB
# -*- coding: utf-8 -*-
#
# This file is part of the TomoSampleChanger project
#
# Copyright (C): 2018
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# France
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
""" Sample changer for micro tomography on BM05
Handles the sample changer robot for micro tomography on BM05.
The robot can be installed on different tomgraphy end stations and aligns itself with the set-up used.
The class used the StaubLink/StaubPy/bm05Controller2.py of the StaubLink project on the ESRF Gitlab to
handle all functionality of the robot.
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device, DeviceMeta
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(TomoSampleChanger.additionnal_import) ENABLED START #
import sys
import time
import math
import gevent
import traceback
# import robot cotroller
from StaubPy import bm05Controller2
# PROTECTED REGION END # // TomoSampleChanger.additionnal_import
__all__ = ["TomoSampleChanger", "main"]
class TomoSampleChanger(Device):
"""
Handles the sample changer robot for micro tomography on BM05.
The robot can be installed on different tomgraphy end stations and aligns itself with the set-up used.
The class used the StaubLink/StaubPy/bm05Controller2.py of the StaubLink project on the ESRF Gitlab to
handle all functionality of the robot.
**Properties:**
- Device Property
sampleLoaded
- Is the sample loaded
- Type:'DevBoolean'
samplePosition
- The position of the loaded sample
- Type:'DevString'
"""
__metaclass__ = DeviceMeta
# PROTECTED REGION ID(TomoSampleChanger.class_variable) ENABLED START #
green_mode = tango.GreenMode.Gevent
#
# Blocking robot operation to load a sample
#
def load_sample_task(self):
try:
self.info_stream("started sample loading")
self.last_task = "Sample loading"
self.is_error = False
self.is_moving = True
# check the robot interlocks
if self.robot.safeForMove:
# ask sample loading to the robot arm
self.robot.loadSample(self.load_sample_column, self.load_sample_row)
else:
self.load_sample_name = "None"
err_msg = "Robot cannot move due to active interlocks! Please check the status message"
tango.Except.throw_exception("ReadError", err_msg, "load_sample_task")
self.is_moving = False
self.is_loaded = True
self.write_properties()
self.info_stream("finished sample loading")
# catch all exceptions, print the traceback to the console and
# add the information to the device status until acknowledgement
except:
ex_info = sys.exc_info()
try:
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
except tango.DevFailed as ex:
self.task_error = ex
#tango.Except.print_exception(ex)
self.is_error = True
self.is_moving = False
#
# Blocking robot operation to unload a sample
#
def unload_sample_task(self):
try:
self.info_stream("started sample unloading")
self.last_task = "Sample unloading"
self.is_error = False
self.is_moving = True
# check the robot interlocks
if self.robot.safeForMove:
# ask sample unloading to the robot arm
if self.is_loaded:
self.robot.unloadSample(self.load_sample_column, self.load_sample_row)
else:
self.robot.unloadSample()
else:
err_msg = "Robot cannot move due to active interlocks! Please check the status message"
tango.Except.throw_exception("ReadError", err_msg, "unload_sample_task")
self.is_moving = False
self.is_loaded = False
self.load_sample_name = "None"
self.write_properties()
self.info_stream("finished sample unloading")
# catch all exceptions, print the traceback to the console and
# add the information to the device status until acknowledgement
except:
ex_info = sys.exc_info()
try:
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
except tango.DevFailed as ex:
self.task_error = ex
#tango.Except.print_exception(ex)
self.is_error = True
self.is_moving = False
self.write_properties()
#
# Blocking robot operation to align the robot with the tomography end station
#
def align_task(self):
try:
self.info_stream("started alignment")
self.last_task = "Alignment procedure"
self.is_error = False
self.is_moving = True
# check that the robot is in manual mode
if self.robot.deportedMode == False:
# ask for the alignment procedure
self.robot.refineTomographFrame()
else:
err_msg = "Robot is not in manual mode! Please check the mode key!"
tango.Except.throw_exception("ReadError", err_msg, "align_task")
self.is_moving = False
self.attr_Aligned_read = self.robot.isAligned
self.info_stream("finished alignment")
# catch all exceptions, print the traceback to the console and
# add the information to the device status until acknowledgement
except:
ex_info = sys.exc_info()
try:
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
except tango.DevFailed as ex:
self.task_error = ex
#tango.Except.print_exception(ex)
self.is_error = True
self.is_moving = False
def parking_task(self):
try:
self.info_stream("started move to parking position")
self.last_task = "Parking procedure"
self.is_error = False
self.is_moving = True
# check that the robot is in manual mode
if self.robot.deportedMode == False:
# ask to move to parking position
self.robot._forceBackToParkingFromAnywhere()
else:
err_msg = "Robot is not in manual mode! Please check the mode key!"
tango.Except.throw_exception("ReadError", err_msg, "align_task")
self.is_moving = False
self.attr_ParkingPosition_read = self.robot.isAtParking
# put back the error flag. A Reset still needs to be executed!
self.is_error = True
self.info_stream("finished move to parking position")
# catch all exceptions, print the traceback to the console and
# add the information to the device status until acknowledgement
except:
ex_info = sys.exc_info()
try:
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
except tango.DevFailed as ex:
self.task_error = ex
#tango.Except.print_exception(ex)
self.is_error = True
self.is_moving = False
#
# Read the status of the robot
#
def robot_status(self):
try:
if self.status_lock:
return
else:
self.status_lock=True
# call the robot status only if the robot is not moving
if self.is_moving == False:
# call the status onlt every three seconds, not to ovlerload the controller
now = time.time()
diff = now - self.robot_status_time
if diff > 3.0:
self.is_safe_for_move = self.robot.safeForMove
self.attr_WagoOK_read = self.robot.wagoIsAlive
self.attr_RobotLibraryName_read = self.robot.loadedLibraryName
if self.robot.airPressureTooLow or self.robot.airPressureTooHigh:
self.attr_AirPressureOK_read = False
else:
self.attr_AirPressureOK_read = True
self.attr_RemoteMode_read = self.robot.deportedMode
self.attr_TomographUsed_read = self.robot.tomographUsed
self.attr_Aligned_read = self.robot.isAligned
self.attr_ParkingPosition_read = self.robot.isAtParking
self.attr_SafetyChainOK_read = self.robot.safetyChain
self.robot_status_time = now
self.info_stream ("robot_status() at %d" % int(now))
self.status_lock = False
# catch all exceptions, convert to Tango exception and rethrow
except:
ex_info = sys.exc_info()
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
#
# Read the sample properties
#
def write_properties(self):
db = tango.Database()
keys = ("samplePosition", "sampleLoaded")
prop = dict.fromkeys(keys)
prop["samplePosition"] = self.load_sample_name
prop["sampleLoaded"] = self.is_loaded
db.put_device_property(self.get_name(), prop)
#
# Verify a given sample name and return the sample row and column position on the samples shelf
#
def get_sample_position (self, sample_name):
# check for valid sample name
# Column must be A, B or C
column = sample_name[0].upper()
if column.isalpha() == False or \
(column != "A" and column != "B" and column != "C"):
err_msg = "Wrong sample name. The sample column in the shelf must be A, B or C"
tango.Except.throw_exception("WriteError", err_msg, "get_sample_position")
#row number must be between 1 and 18
row = int(sample_name[1:3])
if row < 1 or row > 18:
err_msg = "Wrong sample name. The sample row in the shelf must be between 1 and 18"
tango.Except.throw_exception("WriteError", err_msg, "get_sample_position")
# the samples A7, B7, C7 and A8, B8, C8 don't exist!!!
if row == 7 or row == 8:
err_msg = "The samples A,B,C 7 and 8 do not exist on the sample changer shelf!"
tango.Except.throw_exception("WriteError", err_msg, "get_sample_position")
return row, column
# PROTECTED REGION END # // TomoSampleChanger.class_variable
# -----------------
# Device Properties
# -----------------
sampleLoaded = device_property(
dtype='DevBoolean',
default_value=False
)
samplePosition = device_property(
dtype='DevString',
default_value="None"
)
# ----------
# Attributes
# ----------
SamplePosition = attribute(
dtype='DevString',
label="Sample position",
)
SampleLoaded = attribute(
dtype='DevBoolean',
label="Sample loaded",
doc="The name of the sample currently loaded",
)
Aligned = attribute(
dtype='DevBoolean',
doc="Sample changer aligned with tomograph end station",
)
ParkingPosition = attribute(
dtype='DevBoolean',
label="Parking position",
)
RemoteMode = attribute(
dtype='DevBoolean',
label="Remote mode",
)
SafetyChainOK = attribute(
dtype='DevBoolean',
label="Safety chain OK",
doc="The safety interlock is active when an emergency stop button was pressed or the hutch is not seached and closed. \nWhen the safety interlock is active, robot movements are allowed only in manual operation mode.",
)
AirPressureOK = attribute(
dtype='DevBoolean',
label="Air pressure OK",
doc="True when the air pressure is not to high nor to low.",
)
WagoOK = attribute(
dtype='DevBoolean',
label="Wago OK",
doc="True when the connection to the interlock Wago controller is alive.",
)
TomographUsed = attribute(
dtype='DevString',
label="Tomograph used",
doc="The name of the tomography end station where the sample changer is installed.",
)
RobotLibraryName = attribute(
dtype='DevString',
label="Robot library name",
)
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the TomoSampleChanger."""
Device.init_device(self)
# PROTECTED REGION ID(TomoSampleChanger.init_device) ENABLED START #
self.attr_TomographUsed_read = "None"
self.attr_RobotLibraryName_read = "None"
self.attr_SamplePosition_read = "None"
self.attr_Aligned_read = False
self.attr_AirPressureOK_read = False
self.attr_WagoOK_read = False
self.attr_RemoteMode_read = False
self.attr_ParkingPosition_read = False
self.attr_SafetyChainOK_read = False
self.attr_RobotSpeed_read = "None"
# init flags for state and status
self.init_done = False
self.init_error = None
self.is_moving = False
self.is_loaded = False
self.is_error = False
self.is_safe_for_move = False
self.status_lock = False
self.state_error = None
self.task_error = None
self.robot_status_time = 0
self.last_task = "None"
self.load_sample_name = "None"
self.load_sample_column = ""
self.load_sample_row = 0
# get the device propertie values for the sample
self.load_sample_name = self.samplePosition
self.is_loaded = self.sampleLoaded
print ("Sample position = %s" % self.load_sample_name)
print ("Sample is loaded = %s" % self.is_loaded)
try:
# initialise the robot access
self.robot = bm05Controller2.BM05Controller2(controllerAssociated='bm05robot2.esrf.fr', logOnController=False) # Normal
self.robot.cs8LibraryName = 'bm05tomo'
self.robot.autoFirmwareUpdate()
self.robot._prog_display.threadOnController()
self.robot._prog_key_handler.threadOnController()
self.robot.allLightOn()
time.sleep(1)
self.robot.allLightOff()
self.robot.torqueMaxAllowed = {'j1': 9990, 'j2': 9990, 'j3': 9990, 'j4': 9990, 'j5': 9990, 'j6': 9990}
self.robot.torqueMonitor.threadOnController() # If emmergency function used, torque monitor must be reactivated.
print ("Controller initialized!!!!!!!")
self.init_done = True
except:
ex_info = sys.exc_info()
try:
tango.Except.throw_exception(str(ex_info[0]), str(ex_info[1]), traceback.format_exc())
except tango.DevFailed as ex:
#tango.Except.print_exception(ex)
self.init_error = ex
self.debug_stream('Exeception thrown in init_device')
# PROTECTED REGION END # // TomoSampleChanger.init_device
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(TomoSampleChanger.always_executed_hook) ENABLED START #
pass
# PROTECTED REGION END # // TomoSampleChanger.always_executed_hook
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# PROTECTED REGION ID(TomoSampleChanger.delete_device) ENABLED START #
pass
# PROTECTED REGION END # // TomoSampleChanger.delete_device
# ------------------
# Attributes methods
# ------------------
def read_SamplePosition(self):
# PROTECTED REGION ID(TomoSampleChanger.SamplePosition_read) ENABLED START #
self.attr_SamplePosition_read = self.load_sample_name
return self.attr_SamplePosition_read
# PROTECTED REGION END # // TomoSampleChanger.SamplePosition_read
def read_SampleLoaded(self):
# PROTECTED REGION ID(TomoSampleChanger.SampleLoaded_read) ENABLED START #
return self.is_loaded
# PROTECTED REGION END # // TomoSampleChanger.SampleLoaded_read
def read_Aligned(self):
# PROTECTED REGION ID(TomoSampleChanger.Aligned_read) ENABLED START #
self.robot_status()
return self.attr_Aligned_read
# PROTECTED REGION END # // TomoSampleChanger.Aligned_read
def read_ParkingPosition(self):
# PROTECTED REGION ID(TomoSampleChanger.ParkingPosition_read) ENABLED START #
self.robot_status()
return self.attr_ParkingPosition_read
# PROTECTED REGION END # // TomoSampleChanger.ParkingPosition_read
def read_RemoteMode(self):
# PROTECTED REGION ID(TomoSampleChanger.RemoteMode_read) ENABLED START #
self.robot_status()
return self.attr_RemoteMode_read
# PROTECTED REGION END # // TomoSampleChanger.RemoteMode_read
def read_SafetyChainOK(self):
# PROTECTED REGION ID(TomoSampleChanger.SafetyChainOK_read) ENABLED START #
"""Return the SafetyChainOK attribute."""
self.robot_status()
return self.attr_SafetyChainOK_read
# PROTECTED REGION END # // TomoSampleChanger.SafetyChainOK_read
def read_AirPressureOK(self):
# PROTECTED REGION ID(TomoSampleChanger.AirPressureOK_read) ENABLED START #
self.robot_status()
return self.attr_AirPressureOK_read
# PROTECTED REGION END # // TomoSampleChanger.AirPressureOK_read
def read_WagoOK(self):
# PROTECTED REGION ID(TomoSampleChanger.WagoOK_read) ENABLED START #
self.robot_status()
return self.attr_WagoOK_read
# PROTECTED REGION END # // TomoSampleChanger.WagoOK_read
def read_TomographUsed(self):
# PROTECTED REGION ID(TomoSampleChanger.TomographUsed_read) ENABLED START #
self.robot_status()
return self.attr_TomographUsed_read
# PROTECTED REGION END # // TomoSampleChanger.TomographUsed_read
def read_RobotLibraryName(self):
# PROTECTED REGION ID(TomoSampleChanger.RobotLibraryName_read) ENABLED START #
self.robot_status()
return self.attr_RobotLibraryName_read
# PROTECTED REGION END # // TomoSampleChanger.RobotLibraryName_read
speed = attribute(
label="Speed",
dtype=float,
max_value=1,
fget="get_speed", fset="set_speed",
doc="sample changer acceleration speed")
def get_speed(self):
# PROTECTED REGION ID(TomoSampleChanger.get_speed) ENABLED START #
if self.is_moving == False:
self.attr_RobotSpeed_read = self.robot.accelForLargeMovesMultiplicator
return self.attr_RobotSpeed_read
# PROTECTED REGION END # // TomoSampleChanger.get_speed
def set_speed(self, speed):
# PROTECTED REGION ID(TomoSampleChanger.set_speed) ENABLED START #
self.robot.accelForLargeMovesMultiplicator = speed
# PROTECTED REGION END # // TomoSampleChanger.set_speed
# --------
# Commands
# --------
@DebugIt()
def dev_state(self):
# PROTECTED REGION ID(TomoSampleChanger.State) ENABLED START #
if self.init_done == False or self.is_error == True:
_state = tango.DevState.FAULT
else:
try:
self.robot_status()
# air pressure and internal wago interlocks
if self.is_safe_for_move == False:
_state = tango.DevState.DISABLE
else:
if self.is_moving == True:
# pause mode activated
# The hutch was opened, the movemeny is inerrupted and will resume on hutch closure
#if self.robot.power() == 0:
# _state = tango.DevState.RUNNING
#else:
# _state = tango.DevState.MOVING
_state = tango.DevState.MOVING
else:
if self.attr_ParkingPosition_read == False:
_state = tango.DevState.FAULT
else:
_state = tango.DevState.ON
except tango.DevFailed as ex:
self.state_error = ex
_state = tango.DevState.UNKNOWN
self.set_state(_state)
return _state
# PROTECTED REGION END # // TomoSampleChanger.State
@DebugIt()
def dev_status(self):
# PROTECTED REGION ID(TomoSampleChanger.Status) ENABLED START #
state = self.dev_state()
self._status = "The device state is "
if state == tango.DevState.FAULT:
self._status += "FAULT"
else:
if state == tango.DevState.DISABLE:
self._status += "DISABLED"
else:
if state == tango.DevState.RUNNING:
self._status += "PAUSE, movement is paused due to hutch opening"
else:
if state == tango.DevState.MOVING:
self._status += "MOVING"
self._status += "\n\n" + self.last_task
else:
if state == tango.DevState.ON:
self._status += "ON"
else:
self._status += "UNKNOWN"
self._status += "\n"
if state == tango.DevState.FAULT:
if self.init_done == False:
self._status += "\nDevice initialisation failed!\n"
self._status += "Please correct the problems and try to initialise the device again.\n"
self._status += "\n"
self._status += str(self.init_error)
else:
if self.is_error:
# Get the task exception stored
if self.task_error != None:
self._status += "\n" + self.last_task + " failed!\n"
self._status += "Please check the problem! Call the Reset command when the robot is back at its safe parking position\n"
self._status += "\n"
self._status += str(self.task_error)
else:
if self.attr_ParkingPosition_read == False:
self._status += "\nThe robot is not at its save parking position!\n"
self._status += "Please check the problem! Call the Reset command when the robot is back at its safe parking position\n"
self._status += "\n"
else:
if state == tango.DevState.DISABLE:
# check the interlock conditions
if self.attr_AirPressureOK_read == False:
self._status += "Air pressure problem! Air pressure is too high ot too low\n"
else:
if self.attr_WagoOK_read == False:
self._status += "No connection to Wago for interlocks\n"
else:
if state == tango.DevState.UNKNOWN:
self._status += "\nReading of the robot status failed\n"
self._status += "\n"
self._status += str(self.state_error)
self.set_status(self._status)
return self._status
# PROTECTED REGION END # // TomoSampleChanger.Status
@command(
dtype_in='DevString',
doc_in="Sample to load",
)
@DebugIt()
def LoadSample(self, argin):
# PROTECTED REGION ID(TomoSampleChanger.LoadSample) ENABLED START #
# check for valid sample name and get the sample position
self.load_sample_row, self.load_sample_column = self.get_sample_position(argin)
## check for valid sample name
## Column must be A, B or C
#column = argin[0].upper()
#if column.isalpha() == False or \
#(column != "A" and column != "B" and column != "C"):
#err_msg = "Wrong sample name. The sample column in the shelf must be A, B or C"
#tango.Except.throw_exception("WriteError", err_msg, "LoadSample")
#self.load_sample_column = column
##row number must be between 1 and 18
#row = int(argin[1:3])
#if row < 1 or row > 18:
#err_msg = "Wrong sample name. The sample row in the shelf must be between 1 and 18"
#tango.Except.throw_exception("WriteError", err_msg, "LoadSample")
#self.load_sample_row = row
## the samples A7, B7, C7 and A8, B8, C8 don't exist!!!
#if row == 7 or row == 8:
#err_msg = "The samples A,B,C 7 and 8 do not exist on the sample changer shelf!"
#tango.Except.throw_exception("WriteError", err_msg, "LoadSample")
# Sample loading is only possible when the robot is aligned
if self.attr_Aligned_read == False:
err_msg = "The sample changer is not aligned! Cannot load sample"
tango.Except.throw_exception("WriteError", err_msg, "LoadSample")
# Not possible when another sample is already loaded
if self.is_loaded == True:
err_msg = "A sample is already loaded! Please unload sample first"
tango.Except.throw_exception("WriteError", err_msg, "LoadSample")
self.running_task = gevent.spawn(self.load_sample_task)
print ("loading sample %s:%i" % (self.load_sample_column, self.load_sample_row))
self.load_sample_name = ("%s%d" % (self.load_sample_column, self.load_sample_row))
self.write_properties()
# PROTECTED REGION END # // TomoSampleChanger.LoadSample
def is_LoadSample_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_LoadSample_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.FAULT,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_LoadSample_allowed
@command(
)
@DebugIt()
def UnloadSample(self):
# PROTECTED REGION ID(TomoSampleChanger.UnloadSample) ENABLED START #
# verify the sample position
self.load_sample_row, self.load_sample_column = self.get_sample_position(self.load_sample_name)
# Sample loading is only possible when the robot is aligned
if self.attr_Aligned_read == False:
err_msg = "The sample changer is not aligned! Cannot unload sample"
tango.Except.throw_exception("WriteError", err_msg, "UnloadSample")
self.running_task = gevent.spawn(self.unload_sample_task)
print ("unloading sample")
# PROTECTED REGION END # // TomoSampleChanger.UnloadSample
def is_UnloadSample_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_UnloadSample_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.FAULT,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_UnloadSample_allowed
@command(
)
@DebugIt()
def Align(self):
# PROTECTED REGION ID(TomoSampleChanger.Align) ENABLED START #
# Alignment cannot be done when a sample is loaded
if self.is_loaded == True:
err_msg = "A sample is loaded! Cannot start alignment procedure with a loaded sample"
tango.Except.throw_exception("WriteError", err_msg, "Align")
self.running_task = gevent.spawn(self.align_task)
print ("aligning robot")
# PROTECTED REGION END # // TomoSampleChanger.Align
def is_Align_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_Align_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.FAULT]
# PROTECTED REGION END # // TomoSampleChanger.is_Align_allowed
@command(
)
@DebugIt()
def Abort(self):
# PROTECTED REGION ID(TomoSampleChanger.Abort) ENABLED START #
# stop all movements
self.running_task.kill()
self.running_task.join()
self.robot.emmergencyResetOfMoves()
self.robot.emmergencyStopBlockingPrograms()
self.robot.powerOFF()
self.is_moving = False
self.is_error = True
# PROTECTED REGION END # // TomoSampleChanger.Abort
@command(
)
@DebugIt()
def Reset(self):
# PROTECTED REGION ID(TomoSampleChanger.Reset) ENABLED START #
# stop all eventually on-going movements
self.robot.emmergencyResetOfMoves()
self.robot.emmergencyStopBlockingPrograms()
# free the FAULT state ony when the robot is back at its parking position
if self.robot.isAtParking:
self.is_error = False
else:
err_msg = "Cannot reset! The robot is not at its parking position."
tango.Except.throw_exception("ReadError", err_msg, "Reset")
# PROTECTED REGION END # // TomoSampleChanger.Reset
def is_Reset_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_Reset_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.MOVING,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_Reset_allowed
@command(
)
@DebugIt()
def ForceParkingPosition(self):
# PROTECTED REGION ID(TomoSampleChanger.ForceParkingPosition) ENABLED START #
if self.robot.isAtParking == False:
self.running_task = gevent.spawn(self.parking_task)
print ("Bring robot back to parking position")
# PROTECTED REGION END # // TomoSampleChanger.ForceParkingPosition
def is_ForceParkingPosition_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_ForceParkingPosition_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.MOVING,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_ForceParkingPosition_allowed
@command(
)
@DebugIt()
def ManualUnload(self):
# PROTECTED REGION ID(TomoSampleChanger.ManualUnload) ENABLED START #
self.robot.manualUnloadSample()
self.is_loaded = False
self.load_sample_name = "None"
self.write_properties()
self.info_stream("finished sample unloading")
# PROTECTED REGION END # // TomoSampleChanger.ManualUnload
def is_ManualUnload_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_ManualUnload_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING]
# PROTECTED REGION END # // TomoSampleChanger.is_ManualUnload_allowed
@command(
)
@DebugIt()
def OpenGripper(self):
# PROTECTED REGION ID(TomoSampleChanger.OpenGripper) ENABLED START #
self.robot.openGripper()
# PROTECTED REGION END # // TomoSampleChanger.OpenGripper
def is_OpenGripper_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_OpenGripper_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_OpenGripper_allowed
@command(
)
@DebugIt()
def CloseGripper(self):
# PROTECTED REGION ID(TomoSampleChanger.CloseGripper) ENABLED START #
self.robot.closeGripper()
# PROTECTED REGION END # // TomoSampleChanger.CloseGripper
def is_CloseGripper_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_CloseGripper_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_CloseGripper_allowed
@command(
)
@DebugIt()
def IsAligned(self):
# PROTECTED REGION ID(TomoSampleChanger.IsAligned) ENABLED START #
"""
Forces the alignment flag to be set to True! Can be used after a server re-start
to avoid executing a new alignment procedure.
:return:None
"""
self.robot.isAligned = True
# PROTECTED REGION END # // TomoSampleChanger.IsAligned
def is_IsAligned_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_IsAligned_allowed) ENABLED START #
return self.get_state() not in [DevState.MOVING,DevState.RUNNING,DevState.FAULT,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_IsAligned_allowed
@command(
)
@DebugIt()
def ResumeAfterSafetyInterlock(self):
# PROTECTED REGION ID(TomoSampleChanger.ResumeAfterSafetyInterlock) ENABLED START #
"""
Resume a paused movement after a
safety intelock during a movement.
:return:None
"""
self.robot.emmergencyRestart()
# PROTECTED REGION END # // TomoSampleChanger.ResumeAfterSafetyInterlock
def is_ResumeAfterSafetyInterlock_allowed(self):
# PROTECTED REGION ID(TomoSampleChanger.is_ResumeAfterSafetyInterlock_allowed) ENABLED START #
#return self.get_state() not in [DevState.ON,DevState.MOVING,DevState.FAULT,DevState.DISABLE]
return self.get_state() not in [DevState.FAULT,DevState.DISABLE]
# PROTECTED REGION END # // TomoSampleChanger.is_ResumeAfterSafetyInterlock_allowed
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the TomoSampleChanger module."""
# PROTECTED REGION ID(TomoSampleChanger.main) ENABLED START #
# Enable gevents for the server
kwargs.setdefault("green_mode", tango.GreenMode.Gevent)
return run((TomoSampleChanger,), args=args, **kwargs)
# PROTECTED REGION END # // TomoSampleChanger.main
if __name__ == '__main__':
main()