standard.py 43.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Standard functions provided to the BLISS shell.
"""

import contextlib
import inspect
import itertools
import linecache
15
import logging
16
import numpy
17
18
import os
import shutil
19
import socket
20
21
22
23
24
25
26
import subprocess
import sys
import time
import typeguard
import typing

from pprint import pprint  # noqa (pprint is exported in shell from this module)
27
from gevent import sleep, Timeout
28
29
30
31
32
33
34

from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import TerminalFormatter

from prompt_toolkit import print_formatted_text, HTML

35
import bliss
36
37
from bliss import global_map, global_log, setup_globals, current_session
from bliss.common import timedisplay
38
39
from bliss.common.plot import plot  # noqa
from bliss.common.standard import (  # noqa
40
41
42
43
44
45
46
47
48
    iter_counters,
    iter_axes_state,
    iter_axes_state_all,
    iter_axes_position,
    iter_axes_position_all,
    sync,
    info,
    __move,
    reset_equipment,
49
)
50
from bliss.common.standard import wid as std_wid
51
from bliss.common.event import connect
52
53
54
55
56
from bliss.controllers.lima.limatools import *
from bliss.controllers.lima import limatools
from bliss.controllers.lima import roi as lima_roi
from bliss.common.protocols import CounterContainer
from bliss.common import measurementgroup
57
58
from bliss.common.soft_axis import SoftAxis  # noqa
from bliss.common.counter import SoftCounter, Counter  # noqa
59
60
61
62
63
64
65
66
from bliss.common.utils import (
    ShellStr,
    typecheck_var_args_pattern,
    modify_annotations,
    custom_error_msg,
    shorten_signature,
    TypeguardTypeError,
)
67
68
from bliss.config.conductor.client import get_redis_proxy

69
70
71
72
73
from bliss.common.measurementgroup import MeasurementGroup
from bliss.shell.dialog.helpers import find_dialog, dialog as dialog_dec_cls


# objects given to Bliss shell user
74
75
from bliss.common.standard import mv, mvr, mvd, mvdr, move, rockit  # noqa
from bliss.common.cleanup import cleanup, error_cleanup  # noqa
76
77
78
79

from bliss.common import scans
from bliss.common.scans import *
from bliss.scanning.scan import Scan
80
from bliss.comm.rpc import Client
81
from bliss.common import logtools
82
from bliss.common.logtools import elog_print, user_print
83
84
85
86
87
88
89
90
91
92
93
from bliss.common.interlocks import interlock_state
from bliss.common.session import get_current_session
from bliss.data import lima_image

from bliss.scanning.scan_tools import (
    cen,
    goto_cen,
    com,
    goto_com,
    peak,
    goto_peak,
Jibril Mammeri's avatar
Jibril Mammeri committed
94
95
    trough,
    goto_min,
96
97
98
99
100
101
102
103
104
    where,
    find_position,
    goto_custom,
    fwhm,  # noqa: F401
)
from bliss.common.plot import meshselect  # noqa: F401
from bliss.common import plot as plot_module
from bliss.shell.cli import user_dialog, pt_widgets

Benoit Formet's avatar
Benoit Formet committed
105
import tabulate
106

107
from bliss.common.utils import typeguardTypeError_to_hint, chunk_list
108
109
from typing import Optional, Union
from bliss.controllers.lima.lima_base import Lima
110
from bliss.common.protocols import Scannable
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from bliss.common.types import (
    _countable,
    _scannable_or_name,
    _float,
    _providing_channel,
)

from bliss.common.profiling import time_profile

############## imports that are only used simpyly the
############## shell user access to these functions

# hint: don't forget to add to __all__ as well
from numpy import (
    sin,
    cos,
    tan,
    arcsin,
    arccos,
    arctan,
    arctan2,
    log,
    log10,
    sqrt,
    exp,
    power,
    deg2rad,
    rad2deg,
)
from numpy.random import rand
from time import asctime as date

__all__ = (
    [
        "wa",
        "wm",
        "sta",
        "stm",
        "mv",
        "umv",
        "mvr",
        "umvr",
        "mvd",
        "umvd",
        "mvdr",
        "umvdr",
        "rockit",
        "move",
        "lsmot",
160
        "lsconfig",
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
        "plotinit",
        "plotselect",
        "flint",
        "prdef",
        "sync",
        "lslog",
        "lsdebug",
        "debugon",
        "debugoff",
        "interlock_show",
        "interlock_state",
        "info",
        "bench",
        "clear",
        "newproposal",
        "endproposal",
        "newsample",
        "newcollection",
        "newdataset",
        "enddataset",
        "silx_view",
        "pymca",
        "cen",
        "goto_cen",
        "peak",
        "goto_peak",
Jibril Mammeri's avatar
Jibril Mammeri committed
187
188
        "trough",
        "goto_min",
189
190
191
192
193
194
195
196
197
        "com",
        "goto_com",
        "where",
        "fwhm",
        "menu",
        "pprint",
        "find_position",
        "goto_custom",
        "time_profile",
198
        "tw",
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    ]
    + scans.__all__
    + ["lprint", "ladd", "elog_print", "elog_add"]
    + [
        "cleanup",
        "error_cleanup",
        "plot",
        "lscnt",
        "lsmg",
        "lsobj",
        "wid",
        "reset_equipment",
    ]
    + ["SoftAxis", "SoftCounter", "edit_roi_counters", "edit_mg"]
    + list(limatools.__all__)
    + [
        "sin",
        "cos",
        "tan",
        "arcsin",
        "arccos",
        "arctan",
        "arctan2",
        "log",
        "log10",
        "sqrt",
        "exp",
        "power",
        "deg2rad",
        "rad2deg",
        "rand",
        "sleep",
        "date",
    ]
)

tabulate.PRESERVE_WHITESPACE = True

_ERR = "!ERR"
_MAX_COLS = 9
_MISSING_VAL = "-----"
_FLOAT_FORMAT = ".05f"


def _print_errors_with_traceback(errors, device_type="motor"):
244
245
246
    """
    RE-raise caught errors with original traceback
    """
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
    for (label, error_with_traceback_obj) in errors:
        exc_type, exc_val, exc_tb = error_with_traceback_obj.exc_info
        try:
            # we re-raise in order to pass the motor label to the error msg
            # else calling sys.excepthook(*sys.exc_info()) would be fine
            raise exc_type(
                f"Error on {device_type} '{label}': {str(exc_val)}"
            ).with_traceback(exc_tb)
        except Exception:
            sys.excepthook(*sys.exc_info())


def _tabulate(data, **kwargs):
    kwargs.setdefault("headers", "firstrow")
    kwargs.setdefault("floatfmt", _FLOAT_FORMAT)
    kwargs.setdefault("numalign", "right")

Benoit Formet's avatar
Benoit Formet committed
264
    return str(tabulate.tabulate(data, **kwargs))
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337


def __row_positions(positions, motors, fmt, sep=" "):
    positions = [positions[m] for m in motors]
    return __row(positions, fmt, sep="  ")


def __row(cols, fmt, sep=" "):
    return sep.join([format(col, fmt) for col in cols])


def lslog(glob: str = None, debug_only: bool = False):
    """
    Search for loggers.

    It uses a pattern matching normally used by shells.
    Common operators are `*` for any number of characters
    and `?` for one character of any type.

    Args:
        glob: a logger name with optional glob matching
        debug_only: True to display only loggers at debug level
                    (equivalent to lslog)

    Examples:

    >>> lslog()  # prints all loggers

    >>> lslog('*motor?')  # prints loggers that finish with 'motor' + 1 char
                          # like motor1, motor2, motork

    >>> lslog('*Socket*')  # prints loggers that contains 'Socket'

    """
    if glob is None:
        loggers = {
            **global_log._find_loggers("bliss*"),
            **global_log._find_loggers("flint*"),
            **global_log._find_loggers("global*"),
        }
    else:
        loggers = global_log._find_loggers(glob)
    if loggers.items():
        maxlen = max([len(name) for name, _ in loggers.items()])
    else:
        maxlen = 0
    msgfmt = "{0:{width}} {1:8}"
    output = False

    for name in sorted(loggers.keys()):
        logger = loggers[name]
        try:
            has_debug = logger.getEffectiveLevel() == logging.DEBUG
        except AttributeError:
            has_debug = False
        if debug_only and not has_debug:
            continue
        if not output:
            output = True
            print("\n" + msgfmt.format("logger name", "level", width=maxlen))
            print(msgfmt.format("=" * maxlen, 8 * "=", width=maxlen))
        level = logging.getLevelName(logger.getEffectiveLevel())
        if logger.disabled:
            level = "%s [DISABLED]" % level
        print(msgfmt.format(name, level, width=maxlen))
    if output:
        print("")
    else:
        print("No loggers found.\n")


def lsdebug(glob: str = None, debug_only=False) -> None:
    """
338
    Display current Loggers at DEBUG level
339
340
341
342
343
344
    """
    lslog(glob, debug_only=True)


def debugon(glob_logger_pattern_or_obj) -> None:
    """
345
    Activate debug-level logging for a specifig logger or an object
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396

    Args:
        glob_logger_pattern_or_obj: glob style pattern matching for logger name, or instance

    Hints on glob: pattern matching normally used by shells
                   common operators are * for any number of characters
                   and ? for one character of any type

    Return:
        None

    Examples:
        >>> log.debugon('*motorsrv')
        Set logger [motorsrv] to DEBUG level
        Set logger [motorsrv.Connection] to DEBUG level
        >>> log.debugon('*rob?')
        Set logger [session.device.controller.roby] to DEBUG level
        Set logger [session.device.controller.robz] to DEBUG level
    """
    activated = global_log.debugon(glob_logger_pattern_or_obj)
    if activated:
        for name in activated:
            print(f"Setting {name} to show debug messages")
    else:
        print(f"NO loggers found for [{glob_logger_pattern_or_obj}]")


def debugoff(glob_logger_pattern_or_obj):
    deactivated = global_log.debugoff(glob_logger_pattern_or_obj)
    if deactivated:
        for name in deactivated:
            print(f"Setting {name} to hide debug messages")
    else:
        print(f"NO loggers found for [{glob_logger_pattern_or_obj}]")


@typeguard.typechecked
def lscnt(counter_container: typing.Union[CounterContainer, Counter, None] = None):
    """
    Display the list of all counters, sorted alphabetically
    """
    if counter_container is None:
        counters = None
    elif isinstance(counter_container, CounterContainer):
        counters = counter_container.counters
    else:
        # must be Counter
        counters = [counter_container]

    table_info = []
    for counter_name, shape, prefix, name, alias in sorted(iter_counters(counters)):
Benoit Formet's avatar
Benoit Formet committed
397
398
399
        if alias:
            alias = "      *"
        table_info.append(itertools.chain([counter_name], (shape, prefix, alias, name)))
400
401
402
    print("")
    print(
        str(
Benoit Formet's avatar
Benoit Formet committed
403
404
            tabulate.tabulate(
                table_info, headers=["Fullname", "Shape", "Controller", "Alias", "Name"]
405
406
407
408
409
410
            )
        )
    )


def _lsmg():
411
412
    """
    Return the list of measurment groups
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
    Indicate the current active one with a star char: '*'
    """
    active_mg_name = measurementgroup.get_active_name()
    lsmg_str = ""

    for mg_name in measurementgroup.get_all_names():
        if mg_name == active_mg_name:
            lsmg_str += f" * {mg_name}\n"
        else:
            lsmg_str += f"   {mg_name}\n"

    return lsmg_str


def lsmg():
428
429
    """
    Print the list of measurment groups
430
431
432
433
434
435
    Indicate the current active one with a star char: '*'
    """
    print(_lsmg())


def lsobj(pattern=None):
436
437
    """
    Print the list of BLISS object in current session matching the
438
439
440
441
    <pattern> string.
    <pattern> can contain jocker characters like '*' or '?'.
    NB: print also badly initilized objects...
    """
442
443

    for obj_name in bliss.common.standard._lsobj(pattern):
444
445
446
447
448
449
        print(obj_name, end="  ")

    print("")


def wid():
450
451
    """
    Print the list of undulators defined in the session
452
453
454
455
456
457
458
459
460
    and their positions.
    Print all axes of the ID device server.
    """
    print(std_wid())


@typeguard.typechecked
def stm(*axes: _scannable_or_name, read_hw: bool = False):
    """
461
    Display state information of the given axes
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500

    Args:
        axis (~bliss.common.axis.Axis): motor axis

    Keyword Args:
        read_hw (bool): If True, force communication with hardware, otherwise
                        (default) use cached value.
    """
    data = iter_axes_state(*axes, read_hw=read_hw)

    table = [(axis, state) for (axis, state) in data]

    print(_tabulate([("Axis", "Status")] + table))

    errors = []
    for label, state in table:
        if str(state) == _ERR:
            errors.append((label, state))

    _print_errors_with_traceback(errors, device_type="motor")


@typeguard.typechecked
def sta(read_hw: bool = False):
    """
    Return state information about all axes

    Keyword Args:
        read_hw (bool): If True, force communication with hardware, otherwise
                        (default) use cached value.
    """
    return stm(*list(global_map.get_axes_iter()), read_hw=read_hw)


_ERR = "!ERR"
_MAX_COLS = 9
_MISSING_VAL = "-----"


501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def tw(*motors):
    """
    Display an user interface to move selected motors. (Limited to 5 motors)

    Args:
        motors (~bliss.common.axis.Axis): motor axis

    example:
      DEMO [18]: tw(m0, m1, m2)
    """
    import gevent

    def get_url(timeout=None):
        key = "tweak_ui_" + current_session.name
        redis = get_redis_proxy()

        if timeout is None:
            value = redis.lpop(key)
        else:
            result = redis.blpop(key, timeout=timeout)
            if result is not None:
                key, value = result
                redis.lpush(key, value)
            else:
                value = None

        if value is None:
            raise ValueError(
                f"Tweak UI: cannot retrieve Tweak RPC server address from pid "
            )
        url = value.decode().split()[-1]
        return url

    def wait_tweak(tweak):
        while True:
            try:
                tweak.loaded
                break
            except socket.error as e:
                pass
            gevent.sleep(0.3)

    def create_env():
        from bliss.config.conductor.client import get_default_connection

        beacon = get_default_connection()
        beacon_config = f"{beacon._host}:{beacon._port}"

        env = dict(os.environ)
        env["BEACON_HOST"] = beacon_config
        return env

    if len(motors) > 5:
        raise TypeError("This tool can only display a maximum of 5 motors")

    try:
        task = None
        with Timeout(10):
            try:
                url = get_url()
            except ValueError:
                pass
            else:
                tweak = Client(url)
                try:
                    tweak.close_new = True
                except socket.error:
                    pass

            tweak = None
            args = f"{sys.executable} -m bliss.shell.qtapp.tweak_ui --session {current_session.name} --motors".split()
            for motor in motors:
                args.append(motor.name)

            process = subprocess.Popen(args, env=create_env())

577
578
579
580
581
582
583
584
585
            try:
                url = get_url(timeout=10)
                tweak = Client(url)
                wait_tweak(tweak)
                connect(tweak, "ct_requested", _tw_ct_requested)
                print("Tweak UI started")
            except Exception:
                process.kill()
                print("Tweak UI launch has failed, please try again")
586
587
588
589
590
591
592
593
594
595

    except Timeout:
        process.kill()
        raise TimeoutError("The application took too long to start")


def _tw_ct_requested(acq_time, sender):
    ct(acq_time, title="auto_ct")


596
597
def wa(**kwargs):
    """
598
    Display all positions (Where All) in both user and dial units
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
    """
    print("Current Positions: user")
    print("                   dial")

    max_cols = kwargs.get("max_cols", _MAX_COLS)

    header, pos, dial = [], [], []
    tables = [(header, pos, dial)]
    errors = []

    data = iter_axes_position_all(**kwargs)
    for axis_name, axis_unit, position, dial_position in data:
        if len(header) == max_cols:
            header, pos, dial = [], [], []
            tables.append((header, pos, dial))

        axis_label = axis_name
        if axis_unit:
            axis_label += "[{0}]".format(axis_unit)

        header.append(axis_label)
        pos.append(position)
        dial.append(dial_position)

        if _ERR in [str(position), str(dial_position)]:
            errors.append((axis_label, dial_position))

        _print_errors_with_traceback(errors, device_type="motor")

    for table in tables:
        print("")
        print(_tabulate(table))


def lsmot():
    """
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
    Display names of motors configured in current session.
    """

    motor_list = bliss.common.standard._lsmot()

    # Maximal length of objects names (min 5).
    display_width = shutil.get_terminal_size().columns
    if len(motor_list) == 0:
        max_length = 5
        print("No motor found in current session's config.")
    else:
        max_length = max([len(x) for x in motor_list])

        # Number of items displayable on one line.
        item_number = int(display_width / max_length) + 1

        motor_list.sort(key=str.casefold)

        user_print("Motors configured in current session:")
        user_print("-------------------------------------")
        print(tabulate.tabulate(chunk_list(motor_list, item_number), tablefmt="plain"))
        user_print("\n")


from bliss.config import static


def lsconfig():
    """
    Print all objects found in config.
    Not only objects declared in current session's config.
666
    """
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
    obj_dict = dict()

    config = static.get_config()

    # Maximal length of objects names (min 5).
    display_width = shutil.get_terminal_size().columns

    print()

    for name in config.names_list:
        c = config.get_config(name).get("class")
        # print(f"{name}: {c}")
        if c is None and config.get_config(name).plugin == "emotion":
            c = "Motor"
        try:
            obj_dict[c].append(name)
        except KeyError:
            obj_dict[c] = list()
            obj_dict[c].append(name)

    # For each class
    for cc in obj_dict.keys():
        user_print(f"{cc}: ")
        if cc is None:
            user_print("----")
        else:
            user_print("-" * len(cc))
        obj_list = list()

        # put all objects of this class in a list
        while obj_dict[cc]:
            obj_list.append(obj_dict[cc].pop())
        # print(obj_list)

        max_length = max([len(x) for x in obj_list])

        # Number of items displayable on one line.
        item_count = int(display_width / max_length) + 1
705

706
707
        print(tabulate.tabulate(chunk_list(obj_list, item_count), tablefmt="plain"))
        user_print()
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743


@custom_error_msg(
    TypeguardTypeError,
    "intended usage: wm(axis1, axis2, ... ) Hint:",
    new_exception_type=RuntimeError,
    display_original_msg=True,
)
@shorten_signature(annotations={"axes": "axis1, axis2, ... "}, hidden_kwargs=("kwargs"))
@typeguard.typechecked
def wm(*axes: _scannable_or_name, **kwargs):
    """
    Display information (position - user and dial, limits) of the given axes

    Args:
        axis: A motor axis

    Example:

    >>> wm(m2, m1, m3)

    .. code-block::

                            m2      m1[mm]       m3
          --------  ----------  ----------  -------
          User
           High     -123.00000   128.00000      inf
           Current   -12.00000     7.00000  3.00000
           Low       456.00000  -451.00000     -inf
          Offset       0.00000     3.00000  0.00000
          Dial
           High      123.00000   123.00000      inf
           Current    12.00000     2.00000  3.00000
           Low      -456.00000  -456.00000     -inf
    """
    if not axes:
744
745
746
747
748
        print(
            "wm() needs at least one axis name/object as parameter.\n"
            "example: wm(mot1)\n"
            "         wm(mot1, mot2, ... motN)"
        )
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
        return

    max_cols = kwargs.get("max_cols", _MAX_COLS)
    err = kwargs.get("err", _ERR)

    errors = []
    header = [""]
    User, high_user, user, low_user = ["User"], ["~High"], ["~Current"], ["~Low"]
    Dial, high_dial, dial, low_dial = ["Dial"], ["~High"], ["~Current"], ["~Low"]
    Offset, Spacer = ["Offset"], [""]
    tables = [
        (
            header,
            User,
            high_user,
            user,
            low_user,
            Offset,
            Spacer,
            Dial,
            high_dial,
            dial,
            low_dial,
        )
    ]

    for axis in iter_axes_position(*axes, **kwargs):

        if len(header) == max_cols:
            header = [None]
            User, high_user, user, low_user = (
                ["User"],
                ["~High"],
                ["~Current"],
                ["~Low"],
            )
            Dial, high_dial, dial, low_dial = (
                ["Dial"],
                ["~High"],
                ["~Current"],
                ["~Low"],
            )
            Offset = ["Offset"]
            tables.append(
                (
                    header,
                    User,
                    high_user,
                    user,
                    low_user,
                    Offset,
                    Spacer,
                    Dial,
                    high_dial,
                    dial,
                    low_dial,
                )
            )
        axis_label = axis.axis_name
        if axis.unit:
            axis_label += "[{0}]".format(axis.unit)
        header.append(axis_label)
        User.append(None)
        user_high_limit, dial_high_limit = (
            (axis.user_high_limit, axis.dial_high_limit)
            if axis.user_high_limit not in (None, err)
            else (_MISSING_VAL, _MISSING_VAL)
        )
        user_low_limit, dial_low_limit = (
            (axis.user_low_limit, axis.dial_low_limit)
            if axis.user_low_limit not in (None, err)
            else (_MISSING_VAL, _MISSING_VAL)
        )
        high_user.append(user_high_limit)
        position = axis.user_position
        user.append(position)
        low_user.append(user_low_limit)
        Dial.append(None)
        high_dial.append(dial_high_limit)
        dial_position = axis.dial_position
        dial.append(dial_position)
        low_dial.append(dial_low_limit)
        Offset.append(axis.offset)

        if err in [str(position), str(dial_position)]:
            errors.append((axis_label, dial_position))

    _print_errors_with_traceback(errors, device_type="motor")

    for table in tables:
        print("")
        print(_tabulate(table).replace("~", " "))


@custom_error_msg(
    TypeguardTypeError,
    "intended usage: umv(motor1, target_position_1, motor2, target_position_2, ... )",
    new_exception_type=RuntimeError,
    display_original_msg=False,
)
@modify_annotations({"args": "motor1, pos1, motor2, pos2, ..."})
850
@typecheck_var_args_pattern([Scannable, _float])
851
852
def umv(*args):
    """
853
    Move given axes to given absolute positions providing updated display of
854
855
856
857
858
859
860
861
862
863
864
865
866
867
    the motor(s) position(s) while it(they) is(are) moving.

    Arguments are interleaved axis and respective absolute target position.
    """
    __umove(*args)


@custom_error_msg(
    TypeguardTypeError,
    "intended usage: umvr(motor1, relative_displacement_1, motor2, relative_displacement_2, ... )",
    new_exception_type=RuntimeError,
    display_original_msg=False,
)
@modify_annotations({"args": "motor1, rel. pos1, motor2, rel. pos2, ..."})
868
@typecheck_var_args_pattern([Scannable, _float])
869
870
def umvr(*args):
    """
871
    Move given axes to given relative positions providing updated display of
872
873
874
875
876
877
878
879
880
881
882
883
884
885
    the motor(s) position(s) while it(they) is(are) moving.

    Arguments are interleaved axis and respective relative target position.
    """
    __umove(*args, relative=True)


@custom_error_msg(
    TypeguardTypeError,
    "intended usage: umvd(motor1, target_position_1, motor2, target_position_2, ... )",
    new_exception_type=RuntimeError,
    display_original_msg=False,
)
@modify_annotations({"args": "motor1, pos1, motor2, pos2, ..."})
886
@typecheck_var_args_pattern([Scannable, _float])
887
888
def umvd(*args):
    """
889
    Move given axes to given absolute dial positions providing updated display of
890
891
892
893
894
895
896
897
898
899
900
901
902
903
    the motor(s) user position(s) while it(they) is(are) moving.

    Arguments are interleaved axis and respective absolute target position.
    """
    __umove(*args, dial=True)


@custom_error_msg(
    TypeguardTypeError,
    "intended usage: umvdr(motor1, relative_displacement_1, motor2, relative_displacement_2, ... )",
    new_exception_type=RuntimeError,
    display_original_msg=False,
)
@modify_annotations({"args": "motor1, rel. pos1, motor2, rel. pos2, ..."})
904
@typecheck_var_args_pattern([Scannable, _float])
905
906
def umvdr(*args):
    """
907
    Move given axes to given relative dial positions providing updated display of
908
909
910
911
912
913
914
915
916
917
    the motor(s) user position(s) while it(they) is(are) moving.

    Arguments are interleaved axis and respective relative target position.
    """
    __umove(*args, relative=True, dial=True)


def __umove(*args, **kwargs):
    kwargs["wait"] = False
    group, motor_pos = __move(*args, **kwargs)
918
    motors = list(group.axes_with_reals.values())
919
920
    with error_cleanup(group.stop):
        motor_names = list()
921
        for axis in motors:
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
            if axis.unit:
                motor_names.append(
                    "{}[{}]".format(global_map.alias_or_name(axis), axis.unit)
                )
            else:
                motor_names.append(global_map.alias_or_name(axis))
        col_len = max(max(map(len, motor_names)), 8)
        hfmt = "^{width}".format(width=col_len)
        rfmt = ">{width}.03f".format(width=col_len)
        print("")
        # print("   " + __row(motor_names, hfmt, sep="  "))
        first_row = __row(motor_names, hfmt, sep="  ")
        row_len = len(first_row)
        print(first_row.rjust(row_len + 5))
        print("")
        magic_char = "\033[F"

        while group.is_moving:
940
941
            positions = group.position_with_reals
            dials = group.dial_with_reals
942
943
944
            row = "".join(
                [
                    "user ",
945
                    __row_positions(positions, motors, rfmt, sep="  "),
946
                    "\ndial ",
947
                    __row_positions(dials, motors, rfmt, sep="  "),
948
949
950
951
952
953
                ]
            )
            ret_depth = magic_char * row.count("\n")
            print("{}{}".format(ret_depth, row), end="", flush=True)
            sleep(0.1)
        # print last time for final positions
954
955
        positions = group.position_with_reals
        dials = group.dial_with_reals
956
957
958
        row = "".join(
            [
                "user ",
959
                __row_positions(positions, motors, rfmt, sep="  "),
960
                "\ndial ",
961
                __row_positions(dials, motors, rfmt, sep="  "),
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
            ]
        )
        ret_depth = magic_char * row.count("\n")
        print("{}{}".format(ret_depth, row), end="", flush=True)
        print("")

    return group, motor_pos


def __pyhighlight(code, bg="dark", outfile=None):
    formatter = TerminalFormatter(bg=bg)
    return highlight(code, PythonLexer(), formatter, outfile=outfile)


def prdef(obj_or_name):
    """
978
    Show the text of the source code for an object or the name of an object.
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
    """
    is_arg_str = isinstance(obj_or_name, str)
    if is_arg_str:
        obj, name = getattr(setup_globals, obj_or_name), obj_or_name
    else:
        obj = obj_or_name
        name = None
    try:
        real_name = obj.__name__
    except AttributeError:
        real_name = str(obj)
    if name is None:
        name = real_name

    if (
        inspect.ismodule(obj)
        or inspect.isclass(obj)
        or inspect.istraceback(obj)
        or inspect.isframe(obj)
        or inspect.iscode(obj)
    ):
        pass