Skip to content
Snippets Groups Projects
Commit 24134008 authored by Valentin Valls's avatar Valentin Valls
Browse files

Setup discrete motor tests

parent 2f0f4cb3
No related branches found
No related tags found
No related merge requests found
import os
import sys
import pytest
import contextlib
from nosqltangodb.utils import tango_utils
from nosqltangodb import helper
from tango import DevState
def pytest_addoption(parser):
def int_or_auto(string):
if string == "auto":
return string
return int(string)
parser.addoption(
"--tango-port",
type=int_or_auto,
default="auto",
help="Enfore the port of the Tango DB, default is dynamic",
)
@contextlib.contextmanager
def context_yaml_db_server(pytestconfig):
root = os.path.dirname(__file__)
yaml_root = os.path.join(root, "tangodb")
with helper.running_db(
name="2",
db_access="yaml",
port=pytestconfig.getoption("--tango-port"),
debug_protocol=True,
yaml_root=yaml_root,
update_tango_host=True,
timeout=10,
) as db:
yield db.port
@pytest.fixture(scope="session")
def db_server(pytestconfig):
with context_yaml_db_server(pytestconfig) as s:
yield s
@pytest.fixture
def tango_address(db_server):
"""Returns the tango address in the format specified by the `TANGO_HOSTT
environment variable."""
return os.environ["TANGO_HOST"]
@pytest.fixture
def att1_motor_server(db_server, tango_address):
device_name = "id00/motor/att1"
device_fqdn = f"tango://{tango_address}/{device_name}"
with tango_utils.start_tango_server(
sys.executable,
"-u",
"-m",
"tests.sim_motor",
"att1",
device_fqdn=device_fqdn,
state=DevState.ON,
) as dev_proxy:
yield device_fqdn, dev_proxy
@pytest.fixture
def bsh_server(db_server, tango_address):
device_name = "id00/bsh/1"
device_fqdn = f"tango://{tango_address}/{device_name}"
with tango_utils.start_tango_server(
sys.executable,
"-u",
"-m",
"tests.sim_shutter",
"bsh",
device_fqdn=device_fqdn,
state=DevState.CLOSE,
) as dev_proxy:
yield device_fqdn, dev_proxy
<object type="DiscretePositionsMotor">
<motor type="tango" device="id00/motor/att1"/>
<position name="out" offset="0mm" delta="1mm" description="Out" />
<position name="bpm1" offset="-24mm" delta="1mm" description="Diam 300um (BPM1)" />
<position name="au2" offset="-5.9cm" delta="1mm" description="Au 1.4um + Diam 300um" />
<position name="au5" offset="-94mm" delta="1mm" description="Au 4.6um + Diam 300um" />
</object>
<object type="DiscretePositionsMotor">
<motor type="tango" device="id00/motor/att1"/>
<shutter_protection device="id00/bsh/1" />
<position name="out" offset="0mm" delta="1mm" description="Out" />
<position name="bpm1" offset="-24mm" delta="1mm" description="Diam 300um (BPM1)" />
<position name="au2" offset="-5.9cm" delta="1mm" description="Au 1.4um + Diam 300um" />
<position name="au5" offset="-94mm" delta="1mm" description="Au 4.6um + Diam 300um" />
</object>
import time
from tango.server import run
from tango.server import Device
from tango.server import attribute
from tango.server import command
from tango import DevState
class SimMotor(Device):
"""
Simulate tango server with a basic motor.
"""
def __init__(self, *args, **kwargs):
Device.__init__(self, *args, **kwargs)
self._position = -59.0
self._speed = 20.0
self._dir = 1
self._target = None
self._start_time = None
self.poll_attribute("state", 100)
self.poll_attribute("position", 100)
self.set_state(DevState.ON)
def _move_to(self, target: float):
if self._position == target:
return
self.set_state(DevState.MOVING)
self._target = target
self._start_time = time.time()
if self._position < target:
self._dir = 1
else:
self._dir = -1
def _stop(self):
self._target = None
self._start_time = None
self.set_state(DevState.ON)
def _update_pos(self):
if self._start_time is None:
return
prev_time = self._start_time
self._start_time = time.time()
duration = self._start_time - prev_time
new_pos = self._position + duration * self._speed * self._dir
if (new_pos - self._target) * self._dir > 0:
self._position = self._target
self._stop()
return
self._position = new_pos
@command
def On(self):
self.set_state(DevState.ON)
@command
def Off(self):
self.set_state(DevState.OFF)
@command
def Abort(self):
if self._start_time is None:
return
self._update_pos()
self._stop()
@attribute(dtype=float, label="Position", unit="mm", min_value=-100, max_value=100, rel_change="0.001")
def position(self):
self._update_pos()
return self._position
@position.setter
def setPosition(self, value: float):
self._update_pos()
self._move_to(value)
if __name__ == "__main__":
run((SimMotor,))
import time
from tango.server import run
from tango.server import Device
from tango.server import attribute
from tango.server import command
from tango import DevState
class SimShutter(Device):
"""
Simulate tango server with a basic motor.
"""
def __init__(self, *args, **kwargs):
Device.__init__(self, *args, **kwargs)
self.poll_attribute("state", 100)
self.set_state(DevState.CLOSE)
@command
def Close(self):
self.set_state(DevState.CLOSE)
@command
def Open(self):
self.set_state(DevState.OPEN)
if __name__ == "__main__":
run((SimShutter,))
server: DiscretePositionsMotor
personal_name: discrete_positions_motor
device:
- class: DiscretePositionsMotor
tango_name: id00/tango/discrete_positions_motor
properties:
config_root: PROJECT_ROOT
config_file: tests/description/att1.xml
server: DiscretePositionsMotor
personal_name: discrete_positions_motor2
device:
- class: DiscretePositionsMotor
tango_name: id00/tango/discrete_positions_motor
properties:
config_root: PROJECT_ROOT
config_file: tests/description/att1_interlocked.xml
server: sim_motor
personal_name: att1
device:
- class: SimMotor
tango_name: id00/motor/att1
server: sim_shutter
personal_name: bsh
device:
- class: SimShutter
tango_name: id00/bsh/1
import time
import sys
import pytest
import tango
from nosqltangodb.utils import tango_utils
from tango import DevState
@pytest.fixture
def dpm_server(db_server, tango_address):
device_name = "id00/tango/discrete_positions_motor"
device_fqdn = f"tango://{tango_address}/{device_name}"
with tango_utils.start_tango_server(
sys.executable,
"-u",
"-m",
"pascooldevices.DiscretePositionsMotor",
"discrete_positions_motor",
device_fqdn=device_fqdn,
state=DevState.ON,
) as dev_proxy:
yield device_fqdn, dev_proxy
@pytest.fixture
def interlocked_dpm_server(db_server, tango_address):
device_name = "id00/tango/discrete_positions_motor"
device_fqdn = f"tango://{tango_address}/{device_name}"
with tango_utils.start_tango_server(
sys.executable,
"-u",
"-m",
"pascooldevices.DiscretePositionsMotor",
"discrete_positions_motor2",
device_fqdn=device_fqdn,
state=DevState.ON,
) as dev_proxy:
yield device_fqdn, dev_proxy
def test_start(att1_motor_server, dpm_server):
"""The server can start, no obvious problem"""
pass
def test_move(att1_motor_server, dpm_server):
"""Move to a named position"""
device = dpm_server[1]
assert device.positionId == "au2"
assert device.position != 0.0
device.positionId = "out"
time.sleep(5)
assert device.positionId == "out"
assert device.position == 0.0
def test_move_shutter_closed(att1_motor_server, bsh_server, interlocked_dpm_server):
"""When the shutter is closed the motor is free"""
bsh = bsh_server[1]
device = interlocked_dpm_server[1]
assert bsh.state() == DevState.CLOSE
assert device.positionId == "au2"
assert device.position != 0.0
device.positionId = "out"
time.sleep(5)
assert device.positionId == "out"
assert device.position == 0.0
def test_move_shutter_open(att1_motor_server, bsh_server, interlocked_dpm_server):
"""When the shutter is open the motor is locked"""
bsh = bsh_server[1]
device = interlocked_dpm_server[1]
bsh.Open()
time.sleep(1.0)
assert bsh.state() == DevState.OPEN
with pytest.raises(tango.DevFailed) as exc_info:
device.positionId = "out"
print(dir(exc_info))
assert "motor is locked" in exc_info.value.args[0].desc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment