repl.py 26.7 KB
Newer Older
Benoit Formet's avatar
Benoit Formet committed
1
# -*- coding: utf-8 -*-
2
3
4
#
# This file is part of the bliss project
#
5
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
6
7
# Distributed under the GNU LGPLv3. See LICENSE for more info.

8
"""Bliss REPL (Read Eval Print Loop)"""
9

10
import asyncio
11
from typing import Optional
12
13
from prompt_toolkit import print_formatted_text
from prompt_toolkit.formatted_text import FormattedText
14
15
16
17
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from pygments.styles import get_style_by_name
from pygments.util import ClassNotFound

18
import re
19
20
import os
import sys
21
import types
22
import socket
23
import functools
24
import traceback
25
import gevent
26
import signal
27
import logging
28
import platform
29
import importlib
Valentin Valls's avatar
Valentin Valls committed
30
from collections import deque
31
from datetime import datetime
32

33

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
34
import ptpython.layout
35
from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context
36
from prompt_toolkit.output import DummyOutput
37

Benoit Formet's avatar
Benoit Formet committed
38
# imports needed to have control over _execute of ptpython
39
from prompt_toolkit.keys import Keys
40
from prompt_toolkit.utils import is_windows
41
42
43
from prompt_toolkit.filters import has_focus
from prompt_toolkit.enums import DEFAULT_BUFFER

44
from .. import log_utils
45
from bliss.shell.data.display import ScanDisplayDispatcher
46
47
48
from bliss.shell.cli.prompt import BlissPrompt
from bliss.shell.cli.typing_helper import TypingHelper
from bliss.shell.cli.ptpython_statusbar_patch import NEWstatus_bar, TMUXstatus_bar
49
50
51
52
from bliss.shell.bliss_banners import print_rainbow_banner
from bliss.shell.cli.protected_dict import ProtectedDict
from bliss.shell.cli.no_thread_repl import NoThreadPythonRepl
from bliss.shell import standard
53

Valentin Valls's avatar
Valentin Valls committed
54
from bliss import set_bliss_shell_mode
55
from bliss.common.utils import Singleton
56
from bliss.common import constants
57
58
from bliss.common import session as session_mdl
from bliss.common.session import DefaultSession
59
60
from bliss import release, current_session
from bliss.config import static
61
from bliss.config.conductor.client import get_default_connection
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
62
from bliss.shell.standard import info
63
from bliss.common.logtools import userlogger, elogbook
64
from bliss.common.protocols import ErrorReportInterface
65

66
logger = logging.getLogger(__name__)
67

68
if is_windows():
69

70
71
72
73
74
75
    class Terminal:
        def __getattr__(self, prop):
            if prop.startswith("__"):
                raise AttributeError(prop)
            return ""

76

77
78
79
80
81
82
83
84
else:
    from blessings import Terminal


session_mdl.set_current_session = functools.partial(
    session_mdl.set_current_session, force=False
)

85

86
# =================== ERROR REPORTING ============================
87
88


89
90
91
92
93
94
95
96
class PrettyTraceback:
    _blacklist = [
        # gevent & aiogevent module paths
        os.path.dirname(importlib.util.find_spec("gevent").origin),
        os.path.dirname(importlib.util.find_spec("aiogevent").origin),
        # gevent compiled functions root path
        "src/gevent",
    ]
97

98
99
100
101
    def __init__(self, exc_type, exc_value, tb):
        self._timestamp = datetime.now()
        self._exc_type = exc_type
        self._exc_value = exc_value
102

103
104
105
106
        # convert traceback to StackSummary to stringify references and avoid memory leak
        self._stack_summary = traceback.StackSummary.extract(
            traceback.walk_tb(tb), lookup_lines=True, capture_locals=True
        )
107

108
109
110
111
112
113
    def _is_blacklisted(self, filename):
        for black_path in PrettyTraceback._blacklist:
            if filename.startswith(black_path):
                return True
        return False

114
115
116
117
118
119
120
    def print_formatted(self, disable_blacklist=False):
        header = [
            (
                "class:pygments.generic.heading",
                self._timestamp.strftime("--- %d/%m/%Y %H:%M:%S ---\n"),
            )
        ]
121
122

        # Stack trace formatting
123
        stack = []
124
        for f in self._stack_summary:
125
            if not disable_blacklist and self._is_blacklisted(f.filename):
126
127
                continue

128
            # skip bottom calls from ptpython
129
            if f.filename == "<stdin>":
130
                stack.clear()
131

132
133
134
135
136
137
            stack += [
                ("class:pygments.literal", f.filename),
                ("class:pygments.punctuation", ":"),
                ("class:pygments.literal.number", str(f.lineno) + " "),
                ("class:pygments.name.function", f.name + "\n"),
            ]
138
139

            if f._line:
140
141
142
143
                stack += [
                    ("class:pygments.generic.error", "  > "),
                    ("class:pygments.text", f._line + "\n\n"),
                ]
144
145
146
147
                # locals are not printed when f._line is empty, this happens with traces coming from
                # the interpreter or compiled files and locals dict is full of obscure symbols.
                if f.locals is not None:
                    for key, val in f.locals.items():
148
149
150
151
152
153
                        stack += [
                            ("class:pygments.name.attribute", key),
                            ("class:pygments.punctuation", ": "),
                            ("class:pygments.text", val + "\n"),
                        ]
                    stack += [("class:pygments.text", "\n")]
154
            else:
155
                stack += [("class:pygments.text", "\n")]
156
157

        # Exception formatting
158
159
160
161
        footer = []
        footer.append(
            ("class:pygments.generic.strong", self._exc_type.__name__ + ":\n")
        )
162
        for arg in self._exc_value.args:
163
            footer.append(("class:pygments.name.exception", str(arg) + "\n"))
164

165
166
167
168
        fmt_text = FormattedText(header + stack + footer)
        print_formatted_text(
            fmt_text, style=BlissRepl()._current_style, file=sys.stderr
        )
169
170


171
class ErrorReport(ErrorReportInterface):
Benoit Formet's avatar
Benoit Formet committed
172
    """
173
    Manage the behavior of the error reporting in the shell.
Benoit Formet's avatar
Benoit Formet committed
174

175
176
    - ErrorReport.expert_mode = False (default) => prints a user friendly error message without traceback
    - ErrorReport.expert_mode = True            => prints the full error message with traceback
Benoit Formet's avatar
Benoit Formet committed
177

178
179
180
181
    - ErrorReport.last_error stores the last error traceback

    """

182
183
    _orig_sys_excepthook = sys.excepthook
    _orig_gevent_print_exception = gevent.hub.Hub.print_exception
184

185
    def __init__(self):
186
        self._expert_mode = False
187
        self._history = deque(maxlen=100)
188
189

    @property
190
191
    def history(self):
        return self._history
192
193
194
195
196
197
198
199
200
201

    @property
    def expert_mode(self):
        return self._expert_mode

    @expert_mode.setter
    def expert_mode(self, enable):
        self._expert_mode = bool(enable)


202
203
204
205
def install_excepthook():
    """Patch the system exception hook,
    and the print exception for gevent greenlet
    """
206
    error_report = ErrorReport()
207

208
    exc_logger = logging.getLogger("exceptions")
209

210
    def repl_excepthook(exc_type, exc_value, tb, _with_elogbook=True):
211
212
213
        if exc_value is None:
            # filter exceptions from aiogevent(?) with no traceback, no value
            return
214

215
        error_report.history.append(PrettyTraceback(exc_type, exc_value, tb))
216

217
        # python generic formatting for the exception logger
218
        exc_text = "".join(traceback.format_exception(exc_type, exc_value, tb))
219
        exc_logger.error(exc_text)
220

221
222
        # Adapt the error message depending on the expert_mode
        if error_report._expert_mode:
223
            error_report.history[-1].print_formatted(disable_blacklist=True)
Perceval Guillou's avatar
Perceval Guillou committed
224
225
        elif current_session:
            if current_session.is_loading_config:
226
                print(f"{exc_type.__name__}: {exc_value}", file=sys.stderr)
Perceval Guillou's avatar
Perceval Guillou committed
227
            else:
228
                print(
229
                    f"!!! === {exc_type.__name__}: {exc_value} === !!! ( for more details type cmd 'last_error()' )",
230
                    file=sys.stderr,
231
                )
232

233
        if _with_elogbook:
234
            try:
235
                elogbook.error(f"{exc_type.__name__}: {exc_value}")
236
            except Exception:
237
                repl_excepthook(*sys.exc_info(), _with_elogbook=False)
238

239
    def print_exception(self, context, exc_type, exc_value, tb):
240
        if gevent.getcurrent() is self:
241
242
243
244
            # repl_excepthook tries to yield to the gevent loop
            gevent.spawn(repl_excepthook, exc_type, exc_value, tb)
        else:
            repl_excepthook(exc_type, exc_value, tb)
245

246
    sys.excepthook = repl_excepthook
247
    gevent.hub.Hub.print_exception = types.MethodType(print_exception, gevent.get_hub())
248
    return error_report
249

250

251
252
253
254
255
def reset_excepthook():
    sys.excepthook = ErrorReport._orig_sys_excepthook
    gevent.hub.Hub.print_exception = ErrorReport._orig_gevent_print_exception


256
__all__ = ("BlissRepl", "embed", "cli", "configure_repl")
257

258
259
#############
# patch ptpython signaturetoolbar
260
import bliss.shell.cli.ptpython_signature_patch  # noqa: F401,E402
261

262
# patch ptpython completer, and jedi
263
import bliss.shell.cli.ptpython_completer_patch  # noqa: F401,E402
264

265
#############
266
267


268
269
270
class Info:
    def __init__(self, obj_with_info):
        self.info_repr = info(obj_with_info)
271

272
    def __repr__(self):
273
274
275
276
        try:
            return self.info_repr
        except AttributeError:
            return super().__repr__()
277
278


279
class WrappedStdout:
280
281
    def __init__(self, output_buffer):
        self._buffer = output_buffer
282
        self._orig_stdout = sys.stdout
283

284
285
286
287
288
    # context manager
    def __enter__(self, *args, **kwargs):
        self._orig_stdout = sys.stdout
        sys.stdout = self
        return self
289

290
291
    def __exit__(self, *args, **kwargs):
        sys.stdout = self._orig_stdout
292

293
294
295
296
    # delegated members
    @property
    def encoding(self):
        return self._orig_stdout.encoding
297

298
299
300
    @property
    def errors(self):
        return self._orig_stdout.errors
301

302
303
304
    def fileno(self) -> int:
        # This is important for code that expects sys.stdout.fileno() to work.
        return self._orig_stdout.fileno()
305

306
307
    def isatty(self) -> bool:
        return self._orig_stdout.isatty()
308

309
310
    def flush(self):
        self._orig_stdout.flush()
311

312
313
314
315
    # extended members
    def write(self, data):
        # wait for stdout to be ready to receive output
        if True:  # if gevent.select.select([],[self.fileno()], []):
316
            self._buffer.append(data)
317
            self._orig_stdout.write(data)
318

319

320
class PromptToolkitOutputWrapper(DummyOutput):
Valentin Valls's avatar
Valentin Valls committed
321
    """This class is used to keep track of the output history."""
322
323
324

    _MAXLEN = 20

325
326
    def __init__(self, output):
        self.__wrapped_output = output
327
        self._output_buffer = []
328
329
        self._cell_counter = 0
        self._cell_output_history = deque(maxlen=self._MAXLEN)
330
331
332
333
334
335
336
337

    def __getattr__(self, attr):
        if attr.startswith("__"):
            raise AttributeError(attr)
        return getattr(self.__wrapped_output, attr)

    @property
    def capture_stdout(self):
338
339
        return WrappedStdout(self._output_buffer)

340
    def finalize_cell(self):
Valentin Valls's avatar
Valentin Valls committed
341
        """Store the current buffered output as 1 cell output in the history."""
342
343
344
345
346
347
348
349
350
351
352
353
        if self._output_buffer:
            output = "".join(
                [x if isinstance(x, str) else str(x) for x in self._output_buffer]
            )
            output = re.sub(
                r"^(\s+Out\s\[\d+\]:\s+)", "", output, count=1, flags=re.MULTILINE
            )
            self._output_buffer.clear()
        else:
            output = None
        self._cell_output_history.append(output)
        self._cell_counter += 1
354

355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
    def __getitem__(self, item: int) -> Optional[str]:
        """Note that the ptpython cell index starts counting from 1

        item > 0 will be interpreted as the cell index
        item < 0 will be interpreted as the most recent cell output (-1 is the last output)
        item == 0 raise IndexError

        The output value of a cell without output is `None`.
        """
        if not isinstance(item, int):
            raise TypeError(item)
        if item > 0:
            # convert cell index to queue index
            idx = item - self._cell_counter - 1
            if idx >= 0:
                raise IndexError(f"the last cell is OUT [{self._cell_counter}]")
        elif item == 0:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]")
        elif (item + self._cell_counter) < 0:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]")
        else:
            idx = item
        try:
            return self._cell_output_history[idx]
        except IndexError:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]") from None
384
385

    def write(self, data):
386
        self._output_buffer.append(data)
387
388
        self.__wrapped_output.write(data)

389
390
391
    def fileno(self):
        return self.__wrapped_output.fileno()

392

393
class BlissReplBase(NoThreadPythonRepl, metaclass=Singleton):
394
    def __init__(self, *args, **kwargs):
395
396
397
        prompt_label = kwargs.pop("prompt_label", "BLISS")
        title = kwargs.pop("title", None)
        session = kwargs.pop("session")
398
        style = kwargs.pop("style")
399

400
401
402
        # bliss_bar = status_bar(self)
        # toolbars = list(kwargs.pop("extra_toolbars", ()))
        # kwargs["_extra_toolbars"] = [bliss_bar] + toolbars
403

GUILLOU Perceval's avatar
GUILLOU Perceval committed
404
405
406
        # Catch and remove additional kwargs
        self.session_name = kwargs.pop("session_name", "default")
        self.use_tmux = kwargs.pop("use_tmux", False)
407

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
408
        # patch ptpython statusbar
409
        if self.use_tmux and not is_windows():
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
410
411
412
            ptpython.layout.status_bar = TMUXstatus_bar
        else:
            ptpython.layout.status_bar = NEWstatus_bar
413

414
        super().__init__(*args, **kwargs)
415

416
417
        self.app.output = PromptToolkitOutputWrapper(self.app.output)

418
419
        if title:
            self.terminal_title = title
420

Linus Pithan's avatar
Linus Pithan committed
421
        # self.show_bliss_bar = True
422
423
        # self.bliss_bar = bliss_bar
        # self.bliss_bar_format = "normal"
424
        self.bliss_session = session
425
        self.bliss_prompt = BlissPrompt(self, prompt_label)
Geoffrey Mant's avatar
Geoffrey Mant committed
426
        self.all_prompt_styles["bliss"] = self.bliss_prompt
Linus Pithan's avatar
Linus Pithan committed
427
        self.prompt_style = "bliss"
428

Linus Pithan's avatar
Linus Pithan committed
429
        self.show_signature = True
430

431
432
433
434
435
436
437
438
        self.color_depth = "DEPTH_8_BIT"
        try:
            theme = style_from_pygments_cls(get_style_by_name(style))
        except ClassNotFound:
            print(
                f"Unknown color style class: {style}. using default. (check your bliss.ini)."
            )
            theme = style_from_pygments_cls(get_style_by_name("default"))
Jibril Mammeri's avatar
Jibril Mammeri committed
439

440
441
442
443
        self.install_ui_colorscheme("bliss_ui", theme)
        self.use_ui_colorscheme("bliss_ui")
        self.install_code_colorscheme("bliss_code_ui", theme)
        self.use_code_colorscheme("bliss_code_ui")
444

445
446
447
        # PTPYTHON SHELL PREFERENCES
        self.enable_history_search = True
        self.show_status_bar = True
GUILLOU Perceval's avatar
GUILLOU Perceval committed
448
        self.confirm_exit = True
449
        self.enable_mouse_support = False
450

451
452
453
454
455
        if self.use_tmux:
            self.exit_message = (
                "Do you really want to close session? (CTRL-B D to detach)"
            )

Linus Pithan's avatar
Linus Pithan committed
456
457
        self.typing_helper = TypingHelper(self)

458
459
460
    ##
    # NB: next methods are overloaded
    ##
461
462
463
464
465
466
    def eval(self, text):
        logging.getLogger("user_input").info(text)
        elogbook.command(text)
        with self.app.output.capture_stdout:
            result = super().eval(text)
            if result is None:
467
468
                # show_result will not be called
                self.app.output.finalize_cell()
469
470
            return result

471
    async def eval_async(self, text):
Matias Guijarro's avatar
Matias Guijarro committed
472
473
        logging.getLogger("user_input").info(text)
        elogbook.command(text)
474
475
476
        with self.app.output.capture_stdout:
            result = await super().eval_async(text)
            if result is None:
477
478
                # show_result will not be called
                self.app.output.finalize_cell()
479
480
            return result

481
    def show_result(self, result):
Valentin Valls's avatar
Valentin Valls committed
482
        """This is called when the return value of the command is not None."""
Matias Guijarro's avatar
Matias Guijarro committed
483
484
485
486
487
488
489
490
        try:
            if hasattr(result, "__info__"):
                result = Info(result)
        except BaseException:
            # display exception, but do not propagate and make shell to die
            sys.excepthook(*sys.exc_info())
        else:
            return super().show_result(result)
491
        finally:
492
            self.app.output.finalize_cell()
493

494
495
    def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None:
        sys.excepthook(*sys.exc_info())
496

497
498
    def _handle_exception(self, e):
        sys.excepthook(*sys.exc_info())
499

500

501
502
503
class BlissRepl(BlissReplBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
504

505
506
507
508
509
        self._sigint_handler = gevent.signal_handler(signal.SIGINT, self._handle_ctrl_c)

    def _handle_ctrl_c(self):
        with self._lock:
            if self._current_eval_g:
510
                self._current_eval_g.kill(KeyboardInterrupt, block=False)
511

512

513
def configure_repl(repl):
514

Linus Pithan's avatar
Linus Pithan committed
515
516
517
518
519
520
521
522
523
524
525
526
    # intended to be used for testing as ctrl+t can be send via stdin.write(bytes.fromhex("14"))
    # @repl.add_key_binding(Keys.ControlT)
    # def _(event):
    #    sys.stderr.write("<<BLISS REPL TEST>>")
    #    text = repl.default_buffer.text
    #    sys.stderr.write("<<BUFFER TEST>>")
    #    sys.stderr.write(text)
    #    sys.stderr.write("<<BUFFER TEST>>")
    #    sys.stderr.write("<<HISTORY>>")
    #    sys.stderr.write(repl.default_buffer.history._loaded_strings[-1])
    #    sys.stderr.write("<<HISTORY>>")
    #    sys.stderr.write("<<BLISS REPL TEST>>")
527

528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
    @repl.add_key_binding(
        Keys.ControlSpace, filter=has_focus(DEFAULT_BUFFER), eager=True
    )
    def _(event):
        """
        Initialize autocompletion at cursor.
        If the autocompletion menu is not showing, display it with the
        appropriate completions for the context.
        If the menu is showing, select the next completion.
        """

        b = event.app.current_buffer
        if b.complete_state:
            b.complete_next()
        else:
            b.start_completion(select_first=False)

545

546
def initialize(
547
    session_name=None, session_env=None, expert_error_report=True, early_log_info=None
548
) -> session_mdl.Session:
549
550
551
552
553
554
555
556
557
558
559
560
561
562
    """
    Initialize a session.

    Create a session from its name, and update a provided env dictionary.

    Arguments:
        session_name: Name of the session to load
        session_env: Dictionary containing an initial env to feed. If not defined
                     an empty dict is used
    """
    if session_env is None:
        session_env = {}

    # Add config to the user namespace
Lucas Felix's avatar
Lucas Felix committed
563
    config = static.get_config()
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578

    """ BLISS CLI welcome messages """

    t = Terminal()

    # Version
    _version = "version %s" % release.short_version

    # Hostname
    _hostname = platform.node()

    # Beacon host/port
    try:
        _host = get_default_connection()._host
        _port = str(get_default_connection()._port)
579
    except Exception:
580
581
582
583
584
        _host = "UNKNOWN"
        _port = "UNKNOWN"

    # Conda environment
    try:
585
        _conda_env = "(in %s Conda environment)" % os.environ["CONDA_DEFAULT_ENV"]
586
587
588
589
590
591
592
593
594
    except KeyError:
        _conda_env = ""

    print_rainbow_banner()
    print("")
    print(
        "Welcome to BLISS %s running on {t.blue}%s{t.normal} %s".format(t=t)
        % (_version, _hostname, _conda_env)
    )
595
    print("Copyright (c) 2015-2022 Beamline Control Unit, ESRF")
596
597
598
599
600
601
    print("-")
    print(
        "Connected to Beacon server on {t.blue}%s{t.normal} (port %s)".format(t=t)
        % (_host, _port)
    )

602
603
604
605
606
607
    if early_log_info is not None and early_log_info.count > 0:
        print()
        print(
            f"During the import {early_log_info.count} warnings were ignored. Restart BLISS with --debug to display them."
        )

608
609
610
611
612
613
    if config.invalid_yaml_files:
        print()
        print(
            f"Found {len(config.invalid_yaml_files)} YAML parsing error(s), use config.parsing_report() for details.\n"
        )

614
    # Setup(s)
615
616
617
618
619
620
621
622
623
    if session_name is None:
        session = DefaultSession()
    else:
        # we will lock the session name
        # this will prevent to start serveral bliss shell
        # with the same session name
        # lock will only be released at the end of process
        default_cnx = get_default_connection()
        try:
624
            default_cnx.lock(session_name, timeout=1.0)
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
        except RuntimeError:
            try:
                lock_dict = default_cnx.who_locked(session_name)
            except RuntimeError:  # Beacon is to old to answer
                raise RuntimeError(f"{session_name} is already started")
            else:
                raise RuntimeError(
                    f"{session_name} is already running on %s"
                    % lock_dict.get(session_name)
                )
        # set the client name to somethings useful
        try:
            default_cnx.set_client_name(
                f"host:{socket.gethostname()},pid:{os.getpid()} cmd: **bliss -s {session_name}**"
            )
        except RuntimeError:  # Beacon is too old
            pass
        session = config.get(session_name)
        print("%s: Loading config..." % session.name)

    from bliss.shell import standard

    cmds = {k: standard.__dict__[k] for k in standard.__all__}
    session_env.update(cmds)

    session_env["history"] = lambda: print("Please press F3-key to view history!")

652
653
654
    if session.setup(
        session_env, verbose=True, expert_error_report=expert_error_report
    ):
655
        print("Done.")
Matias Guijarro's avatar
Matias Guijarro committed
656
657
658
    else:
        print("Warning: error(s) happened during setup, setup may not be complete.")
    print("")
659

660
661
662
663
664
665
666
667
668
    log = logging.getLogger("startup")
    log.info(
        f"Started BLISS version "
        f"{_version} running on "
        f"{_hostname} "
        f"{_conda_env} "
        f"connected to Beacon server {_host}"
    )

669
670
671
    return session


672
def _archive_history(
673
    history_filename, file_size_thresh=10 ** 6, keep_active_entries=1000
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
):
    if (
        os.path.exists(history_filename)
        and os.stat(history_filename).st_size > file_size_thresh
    ):
        with open(history_filename, "r") as f:
            lines = f.readlines()

        # history is handled as a list of entries (block of lines) to avoid splitting them while archiving
        entries = []
        entry = []
        for line in lines:
            if not line.isspace():
                entry.append(line)
            elif entry:
                entries.append(entry)
                entry = []
        if entry:
            entries.append(entry)

        now = datetime.now()
        archive_filename = f"{history_filename}_{now.year}{now.month:02}{now.day:02}"
        with open(archive_filename, "a") as f:
            for entry in entries[:-keep_active_entries]:
                f.write("".join(entry) + "\n")

        with open(history_filename, "w") as f:
            for entry in entries[-keep_active_entries:]:
                f.write("".join(entry) + "\n")


705
706
707
708
709
def cli(
    locals=None,
    session_name=None,
    vi_mode=False,
    startup_paths=None,
GUILLOU Perceval's avatar
GUILLOU Perceval committed
710
    use_tmux=False,
711
    expert_error_report=False,
712
    style="default",
713
    early_log_info=None,
714
    **kwargs,
715
):
716
    """
717
    Create a command line interface
Valentin Valls's avatar
Valentin Valls committed
718

719
    Args:
720
        session_name : session to initialize (default: None)
721
722
        vi_mode (bool): Use Vi instead of Emacs key bindings.
    """
723
724
    set_bliss_shell_mode(True)

725
726
727
    # Enable loggers
    userlogger.enable()  # destination: user
    elogbook.enable()  # destination: electronic logbook
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
728

729
730
731
732
    # user namespace
    user_ns = {}
    protected_user_ns = ProtectedDict(user_ns)

733
    # These 2 commands can be used by user script loaded during
734
735
736
    # the initialization
    user_ns["protect"] = protected_user_ns.protect
    user_ns["unprotect"] = protected_user_ns.unprotect
737

738
    if session_name and not session_name.startswith(constants.DEFAULT_SESSION_NAME):
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
739
        try:
740
741
742
743
            session = initialize(
                session_name,
                session_env=user_ns,
                expert_error_report=expert_error_report,
744
                early_log_info=early_log_info,
745
            )
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
746
747
748
749
750
        except RuntimeError as e:
            if use_tmux:
                print("\n", "*" * 20, "\n", e, "\n", "*" * 20)
                gevent.sleep(10)  # just to let the eyes to see the message ;)
            raise
GUILLOU Perceval's avatar
GUILLOU Perceval committed
751
    else:
752
753
754
755
        session = initialize(
            session_name=None,
            session_env=user_ns,
            expert_error_report=expert_error_report,
756
            early_log_info=early_log_info,
757
        )
758

759
    if session.name != constants.DEFAULT_SESSION_NAME:
760
        protected_user_ns._protect(session.object_names)
761
762
763
764
        # protect Aliases if they exist
        if "ALIASES" in protected_user_ns:
            for alias in protected_user_ns["ALIASES"].names_iter():
                if alias in protected_user_ns:
765
                    protected_user_ns._protect(alias)
766

767
768
769
770
    def last_error(index=None):
        hist = user_ns["ERROR_REPORT"].history
        try:
            idx = -1 if index is None else index
771
772
773
            hist[idx].print_formatted(
                disable_blacklist=user_ns["ERROR_REPORT"].expert_mode
            )
774
775
776
777
778
779
        except IndexError:
            if index is None:
                print("None")
            else:
                print(f"No exception with index {index} found, size is {len(hist)}")

780
    # handle the last error report
781
    # (in the shell env only)
782
    user_ns["last_error"] = last_error
783

784
785
786
787
788
789
790
791
792
793
    # protect certain imports and Globals
    to_protect = [
        "ERROR_REPORT",
        "last_error",
        "ALIASES",
        "SCAN_DISPLAY",
        "SCAN_SAVING",
        "SCANS",
    ]
    to_protect.extend(standard.__all__)
794
    protected_user_ns._protect(to_protect)
795

796
    def get_globals():
797
        return protected_user_ns
798

799
    if session_name and not session_name.startswith(constants.DEFAULT_SESSION_NAME):
800
        session_id = session_name
Vincent Michel's avatar
Vincent Michel committed
801
        session_title = "Bliss shell ({0})".format(session_name)
802
        prompt_label = session_name.upper()
803
    else:
804
        session_id = "default"
Vincent Michel's avatar
Vincent Michel committed
805
        session_title = "Bliss shell"
806
        prompt_label = "BLISS"
807

808
    history_filename = ".bliss_%s_history" % (session_id)
809
    if is_windows():
810
811
812
        history_filename = os.path.join(os.environ["USERPROFILE"], history_filename)
    else:
        history_filename = os.path.join(os.environ["HOME"], history_filename)
813

814
815
    _archive_history(history_filename)

816
    # Create REPL.
817
    repl = BlissRepl(
818
        get_globals=get_globals,
819
820
821
822
823
824
        session=session,
        vi_mode=vi_mode,
        prompt_label=prompt_label,
        title=session_title,
        history_filename=history_filename,
        startup_paths=startup_paths,
825
        session_name=session_name,
GUILLOU Perceval's avatar
GUILLOU Perceval committed
826
        use_tmux=use_tmux,
827
        style=style,
828
        **kwargs,
829
    )
830

831
    # Custom keybindings
832
833
834
    configure_repl(repl)

    return repl
835

836
837
838

def embed(*args, **kwargs):
    """
839
    Call this to embed bliss shell at the current point in your program
840
    """
841
842
    use_tmux = kwargs.get("use_tmux", False)

843
    with log_utils.filter_warnings():
844
        cmd_line_i = cli(*args, **kwargs)
845
846
847
        scans_display = ScanDisplayDispatcher(cmd_line_i)
        if not is_windows() and use_tmux:
            scans_display.set_use_progress_bar(True)
848

849
        with patch_stdout_context(raw=True):
850
            asyncio.run(cmd_line_i.run_async())