test_standard.py 10.5 KB
Newer Older
1
2
3
4
5
6
7
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

8
import time
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
9
import pytest
10
import numpy
11
import psutil
12
import gevent
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
13

14
from bliss.shell import standard
15
from bliss.shell.standard import wa, wm, sta, stm, _launch_silx, umv
16
from bliss.shell.standard import lsmot, lsconfig, lsobj
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
17

18
19
20
from bliss.shell.standard import sin, cos, tan, arcsin, arccos, arctan, arctan2
from bliss.shell.standard import log, log10, sqrt, exp, power, deg2rad, rad2deg
from bliss.shell.standard import rand, date, sleep
21
from bliss.shell.standard import flint
Valentin Valls's avatar
Valentin Valls committed
22
from bliss.controllers.lima import roi as lima_rois
23

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
24
25
26
27
28
29
30
31

@pytest.fixture
def s1hg(default_session):
    s1hg = default_session.config.get("s1hg")
    yield s1hg
    s1hg.__close__()


32
33
34
35
36
37
38
39
40
41
42
43
44
def test_std_func():

    # No mathematical proof, just to ensure all functions are imported.
    numpy.testing.assert_almost_equal(sin(cos(tan(arcsin(0.1)))), 0.838733, 4)
    numpy.testing.assert_almost_equal(arccos(arctan(arctan2(0.1, 1))), 1.47129, 4)
    numpy.testing.assert_almost_equal(log(sqrt(exp(power(2, 3)))), 4.0, 4)
    numpy.testing.assert_almost_equal(log10(deg2rad(rad2deg(4))), 0.602, 4)

    _ = rand()
    _ = date()
    sleep(0.001)


Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def test_wa_normal(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = False
    wa()
    captured = capsys.readouterr()
    output = "Current Positions: user\n"
    output += "                   dial\n"
    output += "\n"
    output += "    bad\n"
    output += "-------\n"
    output += "0.00000\n"
    output += "0.00000\n"

    assert captured.out == output


def test_wa_exception(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = True
    wa()
    captured = capsys.readouterr()

    output = "Current Positions: user\n"
    output += "                   dial\n"
    output += "\n"
    output += "bad\n"
    output += "-----\n"
    output += "!ERR\n"
    output += "!ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
    assert captured.err[: len(errmsg)] == errmsg


def test_wa_slits(s1hg, capsys):
    wa()
    captured = capsys.readouterr()

    assert "s1hg" in captured.out
    assert not "s1f" in captured.out
    assert not "s1b" in captured.out


def test_wm_normal(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = False
    wm("bad")
    captured = capsys.readouterr()

    output = "\n"
    output += "              bad\n"
    output += "--------  -------\n"
    output += "User\n"
    output += " High         inf\n"
    output += " Current  0.00000\n"
    output += " Low         -inf\n"
    output += "Offset    0.00000\n"
    output += "\n"
    output += "Dial\n"
    output += " High         inf\n"
    output += " Current  0.00000\n"
    output += " Low         -inf\n"

    assert captured.out == output


def test_wm_exception(default_session, capsys):
    bad = default_session.config.get("bad")
    bad.controller.bad_position = True
    wm("bad")
    captured = capsys.readouterr()

    output = "\n"
    output += "          bad\n"
    output += "--------  -----\n"
    output += "User\n"
    output += " High     inf\n"
    output += " Current  !ERR\n"
    output += " Low      -inf\n"
126
    output += "Offset    0.0\n"
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
127
128
129
130
131
132
133
134
135
    output += "\n"
    output += "Dial\n"
    output += " High     inf\n"
    output += " Current  !ERR\n"
    output += " Low      -inf\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
136
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
137
138

    errmsg = "RuntimeError: Error on motor 'bad': BAD POSITION\n"
139
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
140
141
142
143


def test_sta_normal(default_session, capsys):
    bad = default_session.config.get("bad")
144
    bad.controller.bad_state = False
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
    sta()
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  ---------------------\n"
    output += "bad     READY (Axis is READY)\n"

    assert captured.out == output


def test_sta_slits(s1hg, capsys):
    sta()

    captured = capsys.readouterr()

    assert "s1hg" in captured.out
161
162
    assert "s1f" not in captured.out
    assert "s1b" not in captured.out
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
163
164
165
166


def test_sta_exception(default_session, capsys):
    bad = default_session.config.get("bad")
167
    bad.controller.bad_state = True
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
168
169
170
171
172
173
174
175
176
177
    sta()
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  --------\n"
    output += "bad     !ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
178
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
179

180
    errmsg = "RuntimeError: Error on motor 'bad': BAD STATE"
181
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
182
183
184
185


def test_stm_normal(default_session, capsys):
    bad = default_session.config.get("bad")
186
    bad.controller.bad_state = False
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
187
188
189
190
191
192
193
194
195
196
197
198
    stm("bad")
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  ---------------------\n"
    output += "bad     READY (Axis is READY)\n"

    assert captured.out == output


def test_stm_exception(default_session, capsys):
    bad = default_session.config.get("bad")
199
    bad.controller.bad_state = True
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
200
201
202
203
204
205
206
207
208
209
    stm("bad")
    captured = capsys.readouterr()

    output = "Axis    Status\n"
    output += "------  --------\n"
    output += "bad     !ERR\n"

    assert captured.out[: len(output)] == output

    errmsg = "Traceback (most recent call last):\n"
210
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
211

212
    errmsg = "RuntimeError: Error on motor 'bad': BAD STATE"
213
    assert errmsg in captured.err
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
214
215


216
217
218
def test_umv_typecheck(default_session):
    m0 = default_session.config.get("m0")
    calc_mot5 = default_session.config.get("calc_mot5")
219
220
221
222

    umv(m0, 1.2)
    with pytest.raises(RuntimeError):
        umv(m0, 1, 2)
223
    with pytest.raises(RuntimeError):
224
225
226
        umv(1, m0)
    with pytest.raises(RuntimeError):
        umv()
227
228
    with pytest.raises(TypeError):
        umv(calc_mot5, 1)
229
230


231
232
233
234
def test_umv_signature(session):
    assert str(umv.__signature__) == "(*args: 'motor1, pos1, motor2, pos2, ...')"


Benoit Formet's avatar
Benoit Formet committed
235
236
237
238
239
240
241
242
243
244
245
OUTPUT_UMV_ROBY = """
       roby  

\x1b[Fuser    0.000
dial    0.000\x1b[Fuser    1.000
dial    1.000
"""

OUTPUT_UMV_CALC_MOT2 = """
     calc_mot2[keV]  calc_mot1[keV]       roby     

Valentin Valls's avatar
Valentin Valls committed
246
247
\x1b[Fuser          0.000           0.000           0.000
dial          0.000           0.000           0.000\x1b[Fuser          4.000           2.000           1.000
Benoit Formet's avatar
Benoit Formet committed
248
249
250
251
252
dial          4.000           2.000           1.000
"""


def test_umv_shell(capfd, default_session):
Matias Guijarro's avatar
Matias Guijarro committed
253
254
    roby = default_session.config.get("roby")
    umv(roby, 1)
Benoit Formet's avatar
Benoit Formet committed
255
256
257
258
    output = capfd.readouterr().out
    assert output == OUTPUT_UMV_ROBY


Valentin Valls's avatar
Valentin Valls committed
259
260
261
262
263
264
265
266
267
def test_umv_calc_shell(capfd, default_session):
    calc_mot2 = default_session.config.get("calc_mot2")
    try:
        umv(calc_mot2, 4)
        output = capfd.readouterr().out
        assert output == OUTPUT_UMV_CALC_MOT2
    finally:
        default_session.config.get("calc_mot1").controller.close()
        calc_mot2.controller.close()
Matias Guijarro's avatar
Matias Guijarro committed
268
269


270
271
272
273
274
275
def test_open_silx(xvfb):
    # checking if the process opens without stdout errors
    process = _launch_silx()
    time.sleep(1)
    assert process.returncode is None
    process.terminate()
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291


def test_open_close_flint(test_session_without_flint):
    f = flint()
    assert f is not None
    pid = f.pid
    assert psutil.pid_exists(pid)
    f.close()
    assert not psutil.pid_exists(pid)


def test_open_kill_flint(test_session_without_flint):
    f = flint()
    assert f is not None
    pid = f.pid
    assert psutil.pid_exists(pid)
292
    f.kill()
293
294
295
296
297
    try:
        process = psutil.Process(pid)
    except psutil.NoSuchProcess:
        pass
    else:
298
299
300
301
302
303
304
        try:
            with gevent.Timeout(1):
                # gevent timeout have to be used here
                # See https://github.com/gevent/gevent/issues/622
                process.wait(timeout=None)
        except gevent.Timeout:
            pass
305
    assert not psutil.pid_exists(pid)
306
307
308
309
310
311


def test_edit_roi_counters(
    mocker, beacon, default_session, lima_simulator, test_session_with_flint
):
    # Mock few functions to coverage the code without flint
Valentin Valls's avatar
Valentin Valls committed
312
313
314
315
316
317
    roi1 = lima_rois.Roi(10, 11, 100, 101, name="roi1")
    roi2 = lima_rois.RoiProfile(20, 21, 200, 201, name="roi2", mode="vertical")
    plot_mock = mocker.Mock()
    plot_mock.select_shapes = mocker.Mock(return_value=[roi1, roi2])

    mocker.patch("bliss.common.plot.plot_image", return_value=plot_mock)
318
319
320
321

    cam = beacon.get("lima_simulator")

    cam.roi_counters.clear()
322
    cam.roi_profiles.clear()
Valentin Valls's avatar
Valentin Valls committed
323
324
    cam.roi_counters["foo1"] = 20, 20, 18, 20
    cam.roi_profiles["foo2"] = 20, 20, 18, 20, "vertical"
325
326
    standard.edit_roi_counters(cam)
    assert "roi1" in cam.roi_counters
Valentin Valls's avatar
Valentin Valls committed
327
    assert "roi2" in cam.roi_profiles
Valentin Valls's avatar
Valentin Valls committed
328
    plot_mock.select_shapes.assert_called_once()
329
    plot_mock.focus.assert_called_once()
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380


def test_lsmot(session, capsys, log_shell_mode):
    lsmot()

    captured = capsys.readouterr()
    # print(captured.out)
    # att1z  bad    bsy   bsz   calc_mot1  calc_mot2  custom_axis  hooked_error_m0
    # hooked_m0  hooked_m1  jogger  m0 m1 omega  roby  robz  robz2 s1b s1d s1f s1hg
    # s1ho s1u s1vg s1vo

    # Ensure to find only some motors to avoid formatting problems.
    assert "att1z" in captured.out
    assert "bad" in captured.out
    assert "custom_axis" in captured.out
    assert "hooked_error_m0" in captured.out
    assert "omega" in captured.out
    assert "roby" in captured.out
    assert "s1d" in captured.out
    assert "hooked_m1" in captured.out


def test_lsobj(session, capsys, log_shell_mode):
    lsobj()
    captured = capsys.readouterr()
    # print(captured.out)
    assert "att1" in captured.out
    assert "calc_mot1" in captured.out
    assert "diode0" in captured.out
    assert "hooked_m0" in captured.out
    assert "m1enc" in captured.out
    assert "s1u" in captured.out
    assert "sim_ct_gauss" in captured.out
    assert "sim_ct_flat_12" in captured.out
    assert "thermo_sample" in captured.out
    assert "transfocator_simulator" in captured.out


def test_lsconfig(session, capsys, log_shell_mode):
    lsconfig()
    captured = capsys.readouterr()
    # print(captured.out)
    assert "Motor:" in captured.out
    assert "v6biturbo" in captured.out
    assert "wrong_counter" in captured.out
    assert "dummy1" in captured.out
    assert "xrfxrdMG" in captured.out
    assert "working_ctrl" in captured.out
    assert "machinfo" in captured.out
    assert "times2_2d" in captured.out
    assert "xia1" in captured.out