conftest.py 3.83 KB
Newer Older
bliss administrator's avatar
bliss administrator committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import os
import time
import subprocess
import multiprocessing

import redis
import pytest
import gevent
import sys

from contextlib import contextmanager
from bliss.config import static
from bliss.config.conductor import client
from bliss.config.conductor import connection
from bliss.config.conductor.client import get_default_connection

ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
BEACON = [sys.executable, '-m', 'bliss.config.conductor.server']
BEACON_DB_PATH = os.path.join(ROOT, 'tests', 'test_configuration')
REDIS_PORT = 7754
BEACON_PORT = 7755
TANGO_PORT = 7756

@pytest.fixture(scope="session")
def beacon():
    args = [
        '--port=%d' % BEACON_PORT,
        '--redis_port=%d' % REDIS_PORT,
        '--redis_socket=/tmp/redis_test_id09.sock',
        '--db_path=' + BEACON_DB_PATH,
        '--posix_queue=0',
        '--tango_port=%d' % TANGO_PORT]
    proc = subprocess.Popen(BEACON + args)
    time.sleep(0.5)  # wait for beacon to be really started
    redis_db = redis.Redis(port=REDIS_PORT)
    redis_db.flushall()
    beacon_connection = connection.Connection("localhost", BEACON_PORT)
    client._default_connection = beacon_connection
    cfg = static.get_config()
    os.environ["TANGO_HOST"] = "localhost:%d" % TANGO_PORT
    yield cfg
    proc.terminate()


@pytest.fixture
def redis_data_conn():
    cnx = get_default_connection()
    redis_conn = cnx.get_redis_connection(db=1)
    yield redis_conn


@pytest.fixture
def scan_tmpdir(tmpdir):
    yield tmpdir
    tmpdir.remove()


@pytest.fixture(scope="session")
def lima_simulator(beacon):
    from Lima.Server.LimaCCDs import main
    from tango import DeviceProxy, DevFailed

    device_name = "id00/limaccds/simulator1"
    device_fqdn = "tango://localhost:%d/%s" % (TANGO_PORT, device_name)
    
    p = subprocess.Popen(['LimaCCDs', 'simulator']) 

    with gevent.Timeout(3, RuntimeError("Lima simulator is not running")):
        while True:
            try:
                dev_proxy = DeviceProxy(device_fqdn)
                dev_proxy.ping()
                dev_proxy.state()
            except DevFailed as e:
                gevent.sleep(0.5)
            else:
                break

    yield device_fqdn, dev_proxy
    p.terminate()


@pytest.fixture(scope="session")
def bliss_tango_server(beacon):
    from tango import DeviceProxy, DevFailed

    device_name = "id00/bliss/test"
    device_fqdn = "tango://localhost:%d/%s" % (TANGO_HOST, device_name)

    bliss_ds = [sys.executable, '-m', 'bliss.tango.servers.bliss_ds']
    p = subprocess.Popen(bliss_ds+["test"])

    with gevent.Timeout(3, RuntimeError("Bliss tango server is not running")):
        while True:
            try:
                dev_proxy = DeviceProxy(device_fqdn)
                dev_proxy.ping()
                dev_proxy.state()
            except DevFailed as e:
                gevent.sleep(0.5)
            else:
                break

    yield device_fqdn, dev_proxy
    p.terminate()

def motor_fixture(name):
    def get_motor(beacon):
        m = beacon.get(name)
        yield m
        m.stop()
        m.wait_move()
        m.apply_config()
        m.controller.set_hw_limits(m, None, None)
        m.dial(0)
        m.position(0)
        for hook in m.motion_hooks:
            hook.nb_pre_move = 0
            hook.nb_post_move = 0
    get_motor.__name__ = name
    return pytest.fixture(get_motor)


def calc_motor_fixture(name):
    def get_motor(beacon):
        m = beacon.get(name)
        m.no_offset = False
        yield m
        m.stop()
        m.wait_move()
    get_motor.__name__ = name
    return pytest.fixture(get_motor)

m0 = motor_fixture("m0")
m1 = motor_fixture("m1")

# add more fixtures here as needed