test_standard.py 8.52 KB
Newer Older
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
1
import subprocess
2
import time
Matias Guijarro's avatar
Matias Guijarro committed
3
import builtins
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
4
import pytest
5
import numpy
6
import psutil
7
import gevent
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
8

9
from bliss.shell import standard
10
from bliss.shell.standard import wa, wm, sta, stm, _launch_silx, umv
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
11

12
13
14
from bliss.shell.standard import sin, cos, tan, arcsin, arccos, arctan, arctan2
from bliss.shell.standard import log, log10, sqrt, exp, power, deg2rad, rad2deg
from bliss.shell.standard import rand, date, sleep
15
from bliss.shell.standard import flint
Valentin Valls's avatar
Valentin Valls committed
16
from bliss.controllers.lima import roi as lima_rois
17

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
18
19
20
21
22
23
24
25

@pytest.fixture
def s1hg(default_session):
    s1hg = default_session.config.get("s1hg")
    yield s1hg
    s1hg.__close__()


26
27
28
29
30
31
32
33
34
35
36
37
38
def test_std_func():

    # No mathematical proof, just to ensure all functions are imported.
    numpy.testing.assert_almost_equal(sin(cos(tan(arcsin(0.1)))), 0.838733, 4)
    numpy.testing.assert_almost_equal(arccos(arctan(arctan2(0.1, 1))), 1.47129, 4)
    numpy.testing.assert_almost_equal(log(sqrt(exp(power(2, 3)))), 4.0, 4)
    numpy.testing.assert_almost_equal(log10(deg2rad(rad2deg(4))), 0.602, 4)

    _ = rand()
    _ = date()
    sleep(0.001)


Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def test_wa_normal(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = False
    wa()
    captured = capsys.readouterr()
    output = "Current Positions: user\n"
    output += "                   dial\n"
    output += "\n"
    output += "    bad\n"
    output += "-------\n"
    output += "0.00000\n"
    output += "0.00000\n"

    assert captured.out == output


def test_wa_exception(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = True
    wa()
    captured = capsys.readouterr()

    output = "Current Positions: user\n"
    output += "                   dial\n"
    output += "\n"
    output += "bad\n"
    output += "-----\n"
    output += "!ERR\n"
    output += "!ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
    assert captured.err[: len(errmsg)] == errmsg


def test_wa_slits(s1hg, capsys):
    wa()
    captured = capsys.readouterr()

    assert "s1hg" in captured.out
    assert not "s1f" in captured.out
    assert not "s1b" in captured.out


def test_wm_normal(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = False
    wm("bad")
    captured = capsys.readouterr()

    output = "\n"
    output += "              bad\n"
    output += "--------  -------\n"
    output += "User\n"
    output += " High         inf\n"
    output += " Current  0.00000\n"
    output += " Low         -inf\n"
    output += "Offset    0.00000\n"
    output += "\n"
    output += "Dial\n"
    output += " High         inf\n"
    output += " Current  0.00000\n"
    output += " Low         -inf\n"

    assert captured.out == output


def test_wm_exception(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = True
    wm("bad")
    captured = capsys.readouterr()

    output = "\n"
    output += "          bad\n"
    output += "--------  -----\n"
    output += "User\n"
    output += " High     inf\n"
    output += " Current  !ERR\n"
    output += " Low      -inf\n"
120
    output += "Offset    0.0\n"
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
121
122
123
124
125
126
127
128
129
    output += "\n"
    output += "Dial\n"
    output += " High     inf\n"
    output += " Current  !ERR\n"
    output += " Low      -inf\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
130
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
131
132

    errmsg = "RuntimeError: Error on motor 'bad': BAD POSITION\n"
133
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
134
135
136
137


def test_sta_normal(default_session, capsys):
    bad = default_session.config.get("bad")
138
    bad.controller.bad_state = False
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
    sta()
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  ---------------------\n"
    output += "bad     READY (Axis is READY)\n"

    assert captured.out == output


def test_sta_slits(s1hg, capsys):
    sta()

    captured = capsys.readouterr()

    assert "s1hg" in captured.out
155
156
    assert "s1f" not in captured.out
    assert "s1b" not in captured.out
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
157
158
159
160


def test_sta_exception(default_session, capsys):
    bad = default_session.config.get("bad")
161
    bad.controller.bad_state = True
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
162
163
164
165
166
167
168
169
170
171
    sta()
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  --------\n"
    output += "bad     !ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
172
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
173

174
    errmsg = "RuntimeError: Error on motor 'bad': BAD STATE"
175
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
176
177
178
179


def test_stm_normal(default_session, capsys):
    bad = default_session.config.get("bad")
180
    bad.controller.bad_state = False
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
181
182
183
184
185
186
187
188
189
190
191
192
    stm("bad")
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  ---------------------\n"
    output += "bad     READY (Axis is READY)\n"

    assert captured.out == output


def test_stm_exception(default_session, capsys):
    bad = default_session.config.get("bad")
193
    bad.controller.bad_state = True
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
194
195
196
197
198
199
200
201
202
203
    stm("bad")
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  --------\n"
    output += "bad     !ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
204
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
205

206
    errmsg = "RuntimeError: Error on motor 'bad': BAD STATE"
207
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
208
209


210
211
212
def test_umv_typecheck(default_session):
    m0 = default_session.config.get("m0")
    calc_mot5 = default_session.config.get("calc_mot5")
213
214
215
216

    umv(m0, 1.2)
    with pytest.raises(RuntimeError):
        umv(m0, 1, 2)
217
    with pytest.raises(RuntimeError):
218
219
220
        umv(1, m0)
    with pytest.raises(RuntimeError):
        umv()
221
222
    with pytest.raises(TypeError):
        umv(calc_mot5, 1)
223
224


225
226
227
228
def test_umv_signature(session):
    assert str(umv.__signature__) == "(*args: 'motor1, pos1, motor2, pos2, ...')"


Benoit Formet's avatar
Benoit Formet committed
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
OUTPUT_UMV_ROBY = """
       roby  

\x1b[Fuser    0.000
dial    0.000\x1b[Fuser    1.000
dial    1.000
"""

OUTPUT_UMV_CALC_MOT2 = """
     calc_mot2[keV]  calc_mot1[keV]       roby     

\x1b[Fuser          4.000           2.000           1.000
dial          4.000           2.000           1.000\x1b[Fuser          4.000           2.000           1.000
dial          4.000           2.000           1.000
"""


def test_umv_shell(capfd, default_session):
Matias Guijarro's avatar
Matias Guijarro committed
247
248
    roby = default_session.config.get("roby")
    umv(roby, 1)
Benoit Formet's avatar
Benoit Formet committed
249
250
251
252
253
254
255
256
257
258
    output = capfd.readouterr().out
    assert output == OUTPUT_UMV_ROBY

    calc_mot2 = default_session.config.get("calc_mot2")
    umv(calc_mot2, 4)
    output = capfd.readouterr().out
    assert output == OUTPUT_UMV_CALC_MOT2

    default_session.config.get("calc_mot1").controller.close()
    calc_mot2.controller.close()
Matias Guijarro's avatar
Matias Guijarro committed
259
260


261
262
263
264
265
266
def test_open_silx(xvfb):
    # checking if the process opens without stdout errors
    process = _launch_silx()
    time.sleep(1)
    assert process.returncode is None
    process.terminate()
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282


def test_open_close_flint(test_session_without_flint):
    f = flint()
    assert f is not None
    pid = f.pid
    assert psutil.pid_exists(pid)
    f.close()
    assert not psutil.pid_exists(pid)


def test_open_kill_flint(test_session_without_flint):
    f = flint()
    assert f is not None
    pid = f.pid
    assert psutil.pid_exists(pid)
283
    f.kill()
284
285
286
287
288
    try:
        process = psutil.Process(pid)
    except psutil.NoSuchProcess:
        pass
    else:
289
290
291
292
293
294
295
        try:
            with gevent.Timeout(1):
                # gevent timeout have to be used here
                # See https://github.com/gevent/gevent/issues/622
                process.wait(timeout=None)
        except gevent.Timeout:
            pass
296
    assert not psutil.pid_exists(pid)
297
298
299
300
301
302


def test_edit_roi_counters(
    mocker, beacon, default_session, lima_simulator, test_session_with_flint
):
    # Mock few functions to coverage the code without flint
Valentin Valls's avatar
Valentin Valls committed
303
304
305
306
307
308
    roi1 = lima_rois.Roi(10, 11, 100, 101, name="roi1")
    roi2 = lima_rois.RoiProfile(20, 21, 200, 201, name="roi2", mode="vertical")
    plot_mock = mocker.Mock()
    plot_mock.select_shapes = mocker.Mock(return_value=[roi1, roi2])

    mocker.patch("bliss.common.plot.plot_image", return_value=plot_mock)
309
310
311
312

    cam = beacon.get("lima_simulator")

    cam.roi_counters.clear()
313
    cam.roi_profiles.clear()
Valentin Valls's avatar
Valentin Valls committed
314
315
    cam.roi_counters["foo1"] = 20, 20, 18, 20
    cam.roi_profiles["foo2"] = 20, 20, 18, 20, "vertical"
316
317
    standard.edit_roi_counters(cam)
    assert "roi1" in cam.roi_counters
Valentin Valls's avatar
Valentin Valls committed
318
    assert "roi2" in cam.roi_profiles
Valentin Valls's avatar
Valentin Valls committed
319
    plot_mock.select_shapes.assert_called_once()
320
    plot_mock.focus.assert_called_once()