diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..f062855d90c18eb6b32ba074e56aa71df8ad139a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,86 @@ +import os +import sys +import pytest +import contextlib +from nosqltangodb.utils import tango_utils +from nosqltangodb import helper +from tango import DevState + + +def pytest_addoption(parser): + + def int_or_auto(string): + if string == "auto": + return string + return int(string) + + parser.addoption( + "--tango-port", + type=int_or_auto, + default="auto", + help="Enfore the port of the Tango DB, default is dynamic", + ) + + +@contextlib.contextmanager +def context_yaml_db_server(pytestconfig): + root = os.path.dirname(__file__) + yaml_root = os.path.join(root, "tangodb") + + with helper.running_db( + name="2", + db_access="yaml", + port=pytestconfig.getoption("--tango-port"), + debug_protocol=True, + yaml_root=yaml_root, + update_tango_host=True, + timeout=10, + ) as db: + yield db.port + + +@pytest.fixture(scope="session") +def db_server(pytestconfig): + with context_yaml_db_server(pytestconfig) as s: + yield s + + +@pytest.fixture +def tango_address(db_server): + """Returns the tango address in the format specified by the `TANGO_HOSTT + environment variable.""" + return os.environ["TANGO_HOST"] + + +@pytest.fixture +def att1_motor_server(db_server, tango_address): + device_name = "id00/motor/att1" + device_fqdn = f"tango://{tango_address}/{device_name}" + + with tango_utils.start_tango_server( + sys.executable, + "-u", + "-m", + "tests.sim_motor", + "att1", + device_fqdn=device_fqdn, + state=DevState.ON, + ) as dev_proxy: + yield device_fqdn, dev_proxy + + +@pytest.fixture +def bsh_server(db_server, tango_address): + device_name = "id00/bsh/1" + device_fqdn = f"tango://{tango_address}/{device_name}" + + with tango_utils.start_tango_server( + sys.executable, + "-u", + "-m", + "tests.sim_shutter", + "bsh", + device_fqdn=device_fqdn, + state=DevState.CLOSE, + ) as dev_proxy: + yield device_fqdn, dev_proxy diff --git a/tests/description/att1.xml b/tests/description/att1.xml new file mode 100644 index 0000000000000000000000000000000000000000..0f6de2b21b160a5bd119158f46da96306d94f3ad --- /dev/null +++ b/tests/description/att1.xml @@ -0,0 +1,10 @@ +<object type="DiscretePositionsMotor"> + + <motor type="tango" device="id00/motor/att1"/> + + <position name="out" offset="0mm" delta="1mm" description="Out" /> + <position name="bpm1" offset="-24mm" delta="1mm" description="Diam 300um (BPM1)" /> + <position name="au2" offset="-5.9cm" delta="1mm" description="Au 1.4um + Diam 300um" /> + <position name="au5" offset="-94mm" delta="1mm" description="Au 4.6um + Diam 300um" /> + +</object> diff --git a/tests/description/att1_interlocked.xml b/tests/description/att1_interlocked.xml new file mode 100644 index 0000000000000000000000000000000000000000..5e80b57f985ca0b464e5304e56061d000b2b74ea --- /dev/null +++ b/tests/description/att1_interlocked.xml @@ -0,0 +1,11 @@ +<object type="DiscretePositionsMotor"> + + <motor type="tango" device="id00/motor/att1"/> + <shutter_protection device="id00/bsh/1" /> + + <position name="out" offset="0mm" delta="1mm" description="Out" /> + <position name="bpm1" offset="-24mm" delta="1mm" description="Diam 300um (BPM1)" /> + <position name="au2" offset="-5.9cm" delta="1mm" description="Au 1.4um + Diam 300um" /> + <position name="au5" offset="-94mm" delta="1mm" description="Au 4.6um + Diam 300um" /> + +</object> diff --git a/tests/sim_motor.py b/tests/sim_motor.py new file mode 100644 index 0000000000000000000000000000000000000000..7012fb4ebb7516a27593ed258f4e23373dd2ebf0 --- /dev/null +++ b/tests/sim_motor.py @@ -0,0 +1,81 @@ +import time +from tango.server import run +from tango.server import Device +from tango.server import attribute +from tango.server import command +from tango import DevState + + +class SimMotor(Device): + """ + Simulate tango server with a basic motor. + """ + + def __init__(self, *args, **kwargs): + Device.__init__(self, *args, **kwargs) + self._position = -59.0 + self._speed = 20.0 + self._dir = 1 + self._target = None + self._start_time = None + self.poll_attribute("state", 100) + self.poll_attribute("position", 100) + self.set_state(DevState.ON) + + def _move_to(self, target: float): + if self._position == target: + return + self.set_state(DevState.MOVING) + self._target = target + self._start_time = time.time() + if self._position < target: + self._dir = 1 + else: + self._dir = -1 + + def _stop(self): + self._target = None + self._start_time = None + self.set_state(DevState.ON) + + def _update_pos(self): + if self._start_time is None: + return + prev_time = self._start_time + self._start_time = time.time() + duration = self._start_time - prev_time + new_pos = self._position + duration * self._speed * self._dir + if (new_pos - self._target) * self._dir > 0: + self._position = self._target + self._stop() + return + self._position = new_pos + + @command + def On(self): + self.set_state(DevState.ON) + + @command + def Off(self): + self.set_state(DevState.OFF) + + @command + def Abort(self): + if self._start_time is None: + return + self._update_pos() + self._stop() + + @attribute(dtype=float, label="Position", unit="mm", min_value=-100, max_value=100, rel_change="0.001") + def position(self): + self._update_pos() + return self._position + + @position.setter + def setPosition(self, value: float): + self._update_pos() + self._move_to(value) + + +if __name__ == "__main__": + run((SimMotor,)) diff --git a/tests/sim_shutter.py b/tests/sim_shutter.py new file mode 100644 index 0000000000000000000000000000000000000000..1475fe34562b35e769384250a41f2e8bdfb4280f --- /dev/null +++ b/tests/sim_shutter.py @@ -0,0 +1,29 @@ +import time +from tango.server import run +from tango.server import Device +from tango.server import attribute +from tango.server import command +from tango import DevState + + +class SimShutter(Device): + """ + Simulate tango server with a basic motor. + """ + + def __init__(self, *args, **kwargs): + Device.__init__(self, *args, **kwargs) + self.poll_attribute("state", 100) + self.set_state(DevState.CLOSE) + + @command + def Close(self): + self.set_state(DevState.CLOSE) + + @command + def Open(self): + self.set_state(DevState.OPEN) + + +if __name__ == "__main__": + run((SimShutter,)) diff --git a/tests/tangodb/discrete_positions_motor.yml b/tests/tangodb/discrete_positions_motor.yml new file mode 100644 index 0000000000000000000000000000000000000000..01b6cf5aaf7a0fb0d6fa016c03eabab07e2bb57e --- /dev/null +++ b/tests/tangodb/discrete_positions_motor.yml @@ -0,0 +1,8 @@ +server: DiscretePositionsMotor +personal_name: discrete_positions_motor +device: +- class: DiscretePositionsMotor + tango_name: id00/tango/discrete_positions_motor + properties: + config_root: PROJECT_ROOT + config_file: tests/description/att1.xml diff --git a/tests/tangodb/discrete_positions_motor2.yml b/tests/tangodb/discrete_positions_motor2.yml new file mode 100644 index 0000000000000000000000000000000000000000..96a8c02545db014b1c0255bb9521d75ceb65a236 --- /dev/null +++ b/tests/tangodb/discrete_positions_motor2.yml @@ -0,0 +1,8 @@ +server: DiscretePositionsMotor +personal_name: discrete_positions_motor2 +device: +- class: DiscretePositionsMotor + tango_name: id00/tango/discrete_positions_motor + properties: + config_root: PROJECT_ROOT + config_file: tests/description/att1_interlocked.xml diff --git a/tests/tangodb/motors.yml b/tests/tangodb/motors.yml new file mode 100644 index 0000000000000000000000000000000000000000..708e0a9f35efa6b0752894b6ca81139df4be21f7 --- /dev/null +++ b/tests/tangodb/motors.yml @@ -0,0 +1,5 @@ +server: sim_motor +personal_name: att1 +device: +- class: SimMotor + tango_name: id00/motor/att1 diff --git a/tests/tangodb/shutters.yml b/tests/tangodb/shutters.yml new file mode 100644 index 0000000000000000000000000000000000000000..d0fd756d401ce73be026b476a2fdbe7c0fb7c0b6 --- /dev/null +++ b/tests/tangodb/shutters.yml @@ -0,0 +1,5 @@ +server: sim_shutter +personal_name: bsh +device: +- class: SimShutter + tango_name: id00/bsh/1 diff --git a/tests/test_discretepositionsmotor.py b/tests/test_discretepositionsmotor.py new file mode 100644 index 0000000000000000000000000000000000000000..29aca66d2581d8ffd72f641f4d4928264be45e8c --- /dev/null +++ b/tests/test_discretepositionsmotor.py @@ -0,0 +1,85 @@ +import time +import sys +import pytest +import tango +from nosqltangodb.utils import tango_utils +from tango import DevState + + +@pytest.fixture +def dpm_server(db_server, tango_address): + device_name = "id00/tango/discrete_positions_motor" + device_fqdn = f"tango://{tango_address}/{device_name}" + + with tango_utils.start_tango_server( + sys.executable, + "-u", + "-m", + "pascooldevices.DiscretePositionsMotor", + "discrete_positions_motor", + device_fqdn=device_fqdn, + state=DevState.ON, + ) as dev_proxy: + yield device_fqdn, dev_proxy + + +@pytest.fixture +def interlocked_dpm_server(db_server, tango_address): + device_name = "id00/tango/discrete_positions_motor" + device_fqdn = f"tango://{tango_address}/{device_name}" + + with tango_utils.start_tango_server( + sys.executable, + "-u", + "-m", + "pascooldevices.DiscretePositionsMotor", + "discrete_positions_motor2", + device_fqdn=device_fqdn, + state=DevState.ON, + ) as dev_proxy: + yield device_fqdn, dev_proxy + + +def test_start(att1_motor_server, dpm_server): + """The server can start, no obvious problem""" + pass + + +def test_move(att1_motor_server, dpm_server): + """Move to a named position""" + device = dpm_server[1] + assert device.positionId == "au2" + assert device.position != 0.0 + device.positionId = "out" + time.sleep(5) + assert device.positionId == "out" + assert device.position == 0.0 + + +def test_move_shutter_closed(att1_motor_server, bsh_server, interlocked_dpm_server): + """When the shutter is closed the motor is free""" + bsh = bsh_server[1] + device = interlocked_dpm_server[1] + assert bsh.state() == DevState.CLOSE + assert device.positionId == "au2" + assert device.position != 0.0 + + device.positionId = "out" + time.sleep(5) + assert device.positionId == "out" + assert device.position == 0.0 + + +def test_move_shutter_open(att1_motor_server, bsh_server, interlocked_dpm_server): + """When the shutter is open the motor is locked""" + bsh = bsh_server[1] + device = interlocked_dpm_server[1] + + bsh.Open() + time.sleep(1.0) + assert bsh.state() == DevState.OPEN + + with pytest.raises(tango.DevFailed) as exc_info: + device.positionId = "out" + print(dir(exc_info)) + assert "motor is locked" in exc_info.value.args[0].desc