repl.py 24.2 KB
Newer Older
Benoit Formet's avatar
Benoit Formet committed
1
# -*- coding: utf-8 -*-
2
3
4
#
# This file is part of the bliss project
#
5
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
6
7
# Distributed under the GNU LGPLv3. See LICENSE for more info.

8
"""Bliss REPL (Read Eval Print Loop)"""
9

10
import asyncio
11
from typing import Optional
12
13
14
15
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from pygments.styles import get_style_by_name
from pygments.util import ClassNotFound

16
import re
17
18
import os
import sys
19
import types
20
import socket
21
import functools
22
import traceback
23
import gevent
24
import signal
25
import logging
26
import platform
Valentin Valls's avatar
Valentin Valls committed
27
from collections import deque
28
from datetime import datetime
29

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
30
import ptpython.layout
31
from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context
32
from prompt_toolkit.output import DummyOutput
33

Benoit Formet's avatar
Benoit Formet committed
34
# imports needed to have control over _execute of ptpython
35
from prompt_toolkit.keys import Keys
36
from prompt_toolkit.utils import is_windows
37
38
39
from prompt_toolkit.filters import has_focus
from prompt_toolkit.enums import DEFAULT_BUFFER

40
from .. import log_utils
41
from bliss.shell.data.display import ScanDisplayDispatcher
42
43
44
from bliss.shell.cli.prompt import BlissPrompt
from bliss.shell.cli.typing_helper import TypingHelper
from bliss.shell.cli.ptpython_statusbar_patch import NEWstatus_bar, TMUXstatus_bar
45
46
47
from bliss.shell.bliss_banners import print_rainbow_banner
from bliss.shell.cli.protected_dict import ProtectedDict
from bliss.shell.cli.no_thread_repl import NoThreadPythonRepl
48
from bliss.shell.cli.formatted_traceback import BlissTraceback, pprint_traceback
49
from bliss.shell import standard
50

Valentin Valls's avatar
Valentin Valls committed
51
from bliss import set_bliss_shell_mode
52
from bliss.common.utils import Singleton
53
from bliss.common import constants
54
55
from bliss.common import session as session_mdl
from bliss.common.session import DefaultSession
56
57
from bliss import release, current_session
from bliss.config import static
58
from bliss.config.conductor.client import get_default_connection
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
59
from bliss.shell.standard import info
60
from bliss.common.logtools import userlogger, elogbook
61
from bliss.common.protocols import ErrorReportInterface
62

63
logger = logging.getLogger(__name__)
64

65
if is_windows():
66

67
68
69
70
71
72
    class Terminal:
        def __getattr__(self, prop):
            if prop.startswith("__"):
                raise AttributeError(prop)
            return ""

73

74
75
76
77
78
79
80
81
else:
    from blessings import Terminal


session_mdl.set_current_session = functools.partial(
    session_mdl.set_current_session, force=False
)

82

83
# =================== ERROR REPORTING ============================
84
85


86
class ErrorReport(ErrorReportInterface):
Benoit Formet's avatar
Benoit Formet committed
87
    """
88
    Manage the behavior of the error reporting in the shell.
Benoit Formet's avatar
Benoit Formet committed
89

90
91
    - ErrorReport.expert_mode = False (default) => prints a user friendly error message without traceback
    - ErrorReport.expert_mode = True            => prints the full error message with traceback
Benoit Formet's avatar
Benoit Formet committed
92

93
94
95
96
    - ErrorReport.last_error stores the last error traceback

    """

97
98
    _orig_sys_excepthook = sys.excepthook
    _orig_gevent_print_exception = gevent.hub.Hub.print_exception
99

100
    def __init__(self):
101
        self._expert_mode = False
102
        self._history = deque(maxlen=100)
103
104

    @property
105
106
    def history(self):
        return self._history
107
108
109
110
111
112
113
114
115
116

    @property
    def expert_mode(self):
        return self._expert_mode

    @expert_mode.setter
    def expert_mode(self, enable):
        self._expert_mode = bool(enable)


117
118
119
120
def install_excepthook():
    """Patch the system exception hook,
    and the print exception for gevent greenlet
    """
121
    error_report = ErrorReport()
122

123
    exc_logger = logging.getLogger("exceptions")
124

125
    def repl_excepthook(exc_type, exc_value, tb, _with_elogbook=True):
126
127
128
        if exc_value is None:
            # filter exceptions from aiogevent(?) with no traceback, no value
            return
129

130
131
        # BlissTraceback captures traceback information without holding any reference on its content
        fmt_tb = BlissTraceback(exc_type, exc_value, tb)
132

133
134
135
136
137
        # store BlissTraceback for later formatting
        error_report.history.append(fmt_tb)

        # publish full error to logger
        exc_logger.error(fmt_tb.format(disable_blacklist=True))
138

139
140
        # Adapt the error message depending on the expert_mode
        if error_report._expert_mode:
141
142
143
144
145
146
147
148
149
            fmt_tb = error_report.history[-1].format(disable_blacklist=True)
            try:
                style = BlissRepl()._current_style
            except Exception:
                # BlissRepl singleton is not instantiated yet
                # falling back to monochrome
                print(fmt_tb)
            else:
                pprint_traceback(fmt_tb, style)
Perceval Guillou's avatar
Perceval Guillou committed
150
151
        elif current_session:
            if current_session.is_loading_config:
152
                print(f"{exc_type.__name__}: {exc_value}", file=sys.stderr)
Perceval Guillou's avatar
Perceval Guillou committed
153
            else:
154
                print(
155
                    f"!!! === {exc_type.__name__}: {exc_value} === !!! ( for more details type cmd 'last_error()' )",
156
                    file=sys.stderr,
157
                )
158

159
        if _with_elogbook:
160
            try:
161
                elogbook.error(f"{exc_type.__name__}: {exc_value}")
162
            except Exception:
163
                repl_excepthook(*sys.exc_info(), _with_elogbook=False)
164

165
    def print_exception(self, context, exc_type, exc_value, tb):
166
        if gevent.getcurrent() is self:
167
168
169
170
            # repl_excepthook tries to yield to the gevent loop
            gevent.spawn(repl_excepthook, exc_type, exc_value, tb)
        else:
            repl_excepthook(exc_type, exc_value, tb)
171

172
    sys.excepthook = repl_excepthook
173
    gevent.hub.Hub.print_exception = types.MethodType(print_exception, gevent.get_hub())
174
    return error_report
175

176

177
178
179
180
181
def reset_excepthook():
    sys.excepthook = ErrorReport._orig_sys_excepthook
    gevent.hub.Hub.print_exception = ErrorReport._orig_gevent_print_exception


182
__all__ = ("BlissRepl", "embed", "cli", "configure_repl")
183

184
185
#############
# patch ptpython signaturetoolbar
186
import bliss.shell.cli.ptpython_signature_patch  # noqa: F401,E402
187

188
# patch ptpython completer, and jedi
189
import bliss.shell.cli.ptpython_completer_patch  # noqa: F401,E402
190

191
#############
192
193


194
195
196
class Info:
    def __init__(self, obj_with_info):
        self.info_repr = info(obj_with_info)
197

198
    def __repr__(self):
199
200
201
202
        try:
            return self.info_repr
        except AttributeError:
            return super().__repr__()
203
204


205
class WrappedStdout:
206
207
    def __init__(self, output_buffer):
        self._buffer = output_buffer
208
        self._orig_stdout = sys.stdout
209

210
211
212
213
214
    # context manager
    def __enter__(self, *args, **kwargs):
        self._orig_stdout = sys.stdout
        sys.stdout = self
        return self
215

216
217
    def __exit__(self, *args, **kwargs):
        sys.stdout = self._orig_stdout
218

219
220
221
222
    # delegated members
    @property
    def encoding(self):
        return self._orig_stdout.encoding
223

224
225
226
    @property
    def errors(self):
        return self._orig_stdout.errors
227

228
229
230
    def fileno(self) -> int:
        # This is important for code that expects sys.stdout.fileno() to work.
        return self._orig_stdout.fileno()
231

232
233
    def isatty(self) -> bool:
        return self._orig_stdout.isatty()
234

235
236
    def flush(self):
        self._orig_stdout.flush()
237

238
239
240
241
    # extended members
    def write(self, data):
        # wait for stdout to be ready to receive output
        if True:  # if gevent.select.select([],[self.fileno()], []):
242
            self._buffer.append(data)
243
            self._orig_stdout.write(data)
244

245

246
class PromptToolkitOutputWrapper(DummyOutput):
Valentin Valls's avatar
Valentin Valls committed
247
    """This class is used to keep track of the output history."""
248
249
250

    _MAXLEN = 20

251
252
    def __init__(self, output):
        self.__wrapped_output = output
253
        self._output_buffer = []
254
255
        self._cell_counter = 0
        self._cell_output_history = deque(maxlen=self._MAXLEN)
256
257
258
259
260
261
262
263

    def __getattr__(self, attr):
        if attr.startswith("__"):
            raise AttributeError(attr)
        return getattr(self.__wrapped_output, attr)

    @property
    def capture_stdout(self):
264
265
        return WrappedStdout(self._output_buffer)

266
    def finalize_cell(self):
Valentin Valls's avatar
Valentin Valls committed
267
        """Store the current buffered output as 1 cell output in the history."""
268
269
270
271
272
273
274
275
276
277
278
279
        if self._output_buffer:
            output = "".join(
                [x if isinstance(x, str) else str(x) for x in self._output_buffer]
            )
            output = re.sub(
                r"^(\s+Out\s\[\d+\]:\s+)", "", output, count=1, flags=re.MULTILINE
            )
            self._output_buffer.clear()
        else:
            output = None
        self._cell_output_history.append(output)
        self._cell_counter += 1
280

281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    def __getitem__(self, item: int) -> Optional[str]:
        """Note that the ptpython cell index starts counting from 1

        item > 0 will be interpreted as the cell index
        item < 0 will be interpreted as the most recent cell output (-1 is the last output)
        item == 0 raise IndexError

        The output value of a cell without output is `None`.
        """
        if not isinstance(item, int):
            raise TypeError(item)
        if item > 0:
            # convert cell index to queue index
            idx = item - self._cell_counter - 1
            if idx >= 0:
                raise IndexError(f"the last cell is OUT [{self._cell_counter}]")
        elif item == 0:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]")
        elif (item + self._cell_counter) < 0:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]")
        else:
            idx = item
        try:
            return self._cell_output_history[idx]
        except IndexError:
            idx_min = max(self._cell_counter - self._MAXLEN + 1, 1)
            raise IndexError(f"the first available cell is OUT [{idx_min}]") from None
310
311

    def write(self, data):
312
        self._output_buffer.append(data)
313
314
        self.__wrapped_output.write(data)

315
316
317
    def fileno(self):
        return self.__wrapped_output.fileno()

318

319
class BlissReplBase(NoThreadPythonRepl, metaclass=Singleton):
320
    def __init__(self, *args, **kwargs):
321
322
323
        prompt_label = kwargs.pop("prompt_label", "BLISS")
        title = kwargs.pop("title", None)
        session = kwargs.pop("session")
324
        style = kwargs.pop("style")
325

326
327
328
        # bliss_bar = status_bar(self)
        # toolbars = list(kwargs.pop("extra_toolbars", ()))
        # kwargs["_extra_toolbars"] = [bliss_bar] + toolbars
329

GUILLOU Perceval's avatar
GUILLOU Perceval committed
330
331
332
        # Catch and remove additional kwargs
        self.session_name = kwargs.pop("session_name", "default")
        self.use_tmux = kwargs.pop("use_tmux", False)
333

Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
334
        # patch ptpython statusbar
335
        if self.use_tmux and not is_windows():
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
336
337
338
            ptpython.layout.status_bar = TMUXstatus_bar
        else:
            ptpython.layout.status_bar = NEWstatus_bar
339

340
        super().__init__(*args, **kwargs)
341

342
343
        self.app.output = PromptToolkitOutputWrapper(self.app.output)

344
345
        if title:
            self.terminal_title = title
346

Linus Pithan's avatar
Linus Pithan committed
347
        # self.show_bliss_bar = True
348
349
        # self.bliss_bar = bliss_bar
        # self.bliss_bar_format = "normal"
350
        self.bliss_session = session
351
        self.bliss_prompt = BlissPrompt(self, prompt_label)
Geoffrey Mant's avatar
Geoffrey Mant committed
352
        self.all_prompt_styles["bliss"] = self.bliss_prompt
Linus Pithan's avatar
Linus Pithan committed
353
        self.prompt_style = "bliss"
354

Linus Pithan's avatar
Linus Pithan committed
355
        self.show_signature = True
356

357
358
359
360
361
362
363
364
        self.color_depth = "DEPTH_8_BIT"
        try:
            theme = style_from_pygments_cls(get_style_by_name(style))
        except ClassNotFound:
            print(
                f"Unknown color style class: {style}. using default. (check your bliss.ini)."
            )
            theme = style_from_pygments_cls(get_style_by_name("default"))
Jibril Mammeri's avatar
Jibril Mammeri committed
365

366
367
368
369
        self.install_ui_colorscheme("bliss_ui", theme)
        self.use_ui_colorscheme("bliss_ui")
        self.install_code_colorscheme("bliss_code_ui", theme)
        self.use_code_colorscheme("bliss_code_ui")
370

371
372
373
        # PTPYTHON SHELL PREFERENCES
        self.enable_history_search = True
        self.show_status_bar = True
GUILLOU Perceval's avatar
GUILLOU Perceval committed
374
        self.confirm_exit = True
375
        self.enable_mouse_support = False
376

377
378
379
380
381
        if self.use_tmux:
            self.exit_message = (
                "Do you really want to close session? (CTRL-B D to detach)"
            )

Linus Pithan's avatar
Linus Pithan committed
382
383
        self.typing_helper = TypingHelper(self)

384
385
386
    ##
    # NB: next methods are overloaded
    ##
387
388
389
390
391
392
    def eval(self, text):
        logging.getLogger("user_input").info(text)
        elogbook.command(text)
        with self.app.output.capture_stdout:
            result = super().eval(text)
            if result is None:
393
394
                # show_result will not be called
                self.app.output.finalize_cell()
395
396
            return result

397
    async def eval_async(self, text):
Matias Guijarro's avatar
Matias Guijarro committed
398
399
        logging.getLogger("user_input").info(text)
        elogbook.command(text)
400
401
402
        with self.app.output.capture_stdout:
            result = await super().eval_async(text)
            if result is None:
403
404
                # show_result will not be called
                self.app.output.finalize_cell()
405
406
            return result

407
    def show_result(self, result):
Valentin Valls's avatar
Valentin Valls committed
408
        """This is called when the return value of the command is not None."""
Matias Guijarro's avatar
Matias Guijarro committed
409
410
411
412
413
414
415
416
        try:
            if hasattr(result, "__info__"):
                result = Info(result)
        except BaseException:
            # display exception, but do not propagate and make shell to die
            sys.excepthook(*sys.exc_info())
        else:
            return super().show_result(result)
417
        finally:
418
            self.app.output.finalize_cell()
419

420
421
    def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None:
        sys.excepthook(*sys.exc_info())
422

423
424
    def _handle_exception(self, e):
        sys.excepthook(*sys.exc_info())
425

426

427
428
429
class BlissRepl(BlissReplBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
430

431
432
433
434
435
        self._sigint_handler = gevent.signal_handler(signal.SIGINT, self._handle_ctrl_c)

    def _handle_ctrl_c(self):
        with self._lock:
            if self._current_eval_g:
436
                self._current_eval_g.kill(KeyboardInterrupt, block=False)
437

438

439
def configure_repl(repl):
440

Linus Pithan's avatar
Linus Pithan committed
441
442
443
444
445
446
447
448
449
450
451
452
    # intended to be used for testing as ctrl+t can be send via stdin.write(bytes.fromhex("14"))
    # @repl.add_key_binding(Keys.ControlT)
    # def _(event):
    #    sys.stderr.write("<<BLISS REPL TEST>>")
    #    text = repl.default_buffer.text
    #    sys.stderr.write("<<BUFFER TEST>>")
    #    sys.stderr.write(text)
    #    sys.stderr.write("<<BUFFER TEST>>")
    #    sys.stderr.write("<<HISTORY>>")
    #    sys.stderr.write(repl.default_buffer.history._loaded_strings[-1])
    #    sys.stderr.write("<<HISTORY>>")
    #    sys.stderr.write("<<BLISS REPL TEST>>")
453

454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
    @repl.add_key_binding(
        Keys.ControlSpace, filter=has_focus(DEFAULT_BUFFER), eager=True
    )
    def _(event):
        """
        Initialize autocompletion at cursor.
        If the autocompletion menu is not showing, display it with the
        appropriate completions for the context.
        If the menu is showing, select the next completion.
        """

        b = event.app.current_buffer
        if b.complete_state:
            b.complete_next()
        else:
            b.start_completion(select_first=False)

471

472
def initialize(
473
    session_name=None, session_env=None, expert_error_report=True, early_log_info=None
474
) -> session_mdl.Session:
475
476
477
478
479
480
481
482
483
484
485
486
487
488
    """
    Initialize a session.

    Create a session from its name, and update a provided env dictionary.

    Arguments:
        session_name: Name of the session to load
        session_env: Dictionary containing an initial env to feed. If not defined
                     an empty dict is used
    """
    if session_env is None:
        session_env = {}

    # Add config to the user namespace
Lucas Felix's avatar
Lucas Felix committed
489
    config = static.get_config()
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504

    """ BLISS CLI welcome messages """

    t = Terminal()

    # Version
    _version = "version %s" % release.short_version

    # Hostname
    _hostname = platform.node()

    # Beacon host/port
    try:
        _host = get_default_connection()._host
        _port = str(get_default_connection()._port)
505
    except Exception:
506
507
508
509
510
        _host = "UNKNOWN"
        _port = "UNKNOWN"

    # Conda environment
    try:
511
        _conda_env = "(in %s Conda environment)" % os.environ["CONDA_DEFAULT_ENV"]
512
513
514
515
516
517
518
519
520
    except KeyError:
        _conda_env = ""

    print_rainbow_banner()
    print("")
    print(
        "Welcome to BLISS %s running on {t.blue}%s{t.normal} %s".format(t=t)
        % (_version, _hostname, _conda_env)
    )
521
    print("Copyright (c) 2015-2022 Beamline Control Unit, ESRF")
522
523
524
525
526
527
    print("-")
    print(
        "Connected to Beacon server on {t.blue}%s{t.normal} (port %s)".format(t=t)
        % (_host, _port)
    )

528
529
530
531
532
533
    if early_log_info is not None and early_log_info.count > 0:
        print()
        print(
            f"During the import {early_log_info.count} warnings were ignored. Restart BLISS with --debug to display them."
        )

534
535
536
537
538
539
    if config.invalid_yaml_files:
        print()
        print(
            f"Found {len(config.invalid_yaml_files)} YAML parsing error(s), use config.parsing_report() for details.\n"
        )

540
    # Setup(s)
541
542
543
544
545
546
547
548
549
    if session_name is None:
        session = DefaultSession()
    else:
        # we will lock the session name
        # this will prevent to start serveral bliss shell
        # with the same session name
        # lock will only be released at the end of process
        default_cnx = get_default_connection()
        try:
550
            default_cnx.lock(session_name, timeout=1.0)
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
        except RuntimeError:
            try:
                lock_dict = default_cnx.who_locked(session_name)
            except RuntimeError:  # Beacon is to old to answer
                raise RuntimeError(f"{session_name} is already started")
            else:
                raise RuntimeError(
                    f"{session_name} is already running on %s"
                    % lock_dict.get(session_name)
                )
        # set the client name to somethings useful
        try:
            default_cnx.set_client_name(
                f"host:{socket.gethostname()},pid:{os.getpid()} cmd: **bliss -s {session_name}**"
            )
        except RuntimeError:  # Beacon is too old
            pass
        session = config.get(session_name)
        print("%s: Loading config..." % session.name)

    from bliss.shell import standard

    cmds = {k: standard.__dict__[k] for k in standard.__all__}
    session_env.update(cmds)

    session_env["history"] = lambda: print("Please press F3-key to view history!")

578
579
580
    if session.setup(
        session_env, verbose=True, expert_error_report=expert_error_report
    ):
581
        print("Done.")
Matias Guijarro's avatar
Matias Guijarro committed
582
583
584
    else:
        print("Warning: error(s) happened during setup, setup may not be complete.")
    print("")
585

586
587
588
589
590
591
592
593
594
    log = logging.getLogger("startup")
    log.info(
        f"Started BLISS version "
        f"{_version} running on "
        f"{_hostname} "
        f"{_conda_env} "
        f"connected to Beacon server {_host}"
    )

595
596
597
    return session


598
def _archive_history(
599
    history_filename, file_size_thresh=10 ** 6, keep_active_entries=1000
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
):
    if (
        os.path.exists(history_filename)
        and os.stat(history_filename).st_size > file_size_thresh
    ):
        with open(history_filename, "r") as f:
            lines = f.readlines()

        # history is handled as a list of entries (block of lines) to avoid splitting them while archiving
        entries = []
        entry = []
        for line in lines:
            if not line.isspace():
                entry.append(line)
            elif entry:
                entries.append(entry)
                entry = []
        if entry:
            entries.append(entry)

        now = datetime.now()
        archive_filename = f"{history_filename}_{now.year}{now.month:02}{now.day:02}"
        with open(archive_filename, "a") as f:
            for entry in entries[:-keep_active_entries]:
                f.write("".join(entry) + "\n")

        with open(history_filename, "w") as f:
            for entry in entries[-keep_active_entries:]:
                f.write("".join(entry) + "\n")


631
632
633
634
635
def cli(
    locals=None,
    session_name=None,
    vi_mode=False,
    startup_paths=None,
GUILLOU Perceval's avatar
GUILLOU Perceval committed
636
    use_tmux=False,
637
    expert_error_report=False,
638
    style="default",
639
    early_log_info=None,
640
    **kwargs,
641
):
642
    """
643
    Create a command line interface
Valentin Valls's avatar
Valentin Valls committed
644

645
    Args:
646
        session_name : session to initialize (default: None)
647
648
        vi_mode (bool): Use Vi instead of Emacs key bindings.
    """
649
650
    set_bliss_shell_mode(True)

651
652
653
    # Enable loggers
    userlogger.enable()  # destination: user
    elogbook.enable()  # destination: electronic logbook
Piergiorgio Pancino's avatar
Piergiorgio Pancino committed
654

655
656
657
658
    # user namespace
    user_ns = {}
    protected_user_ns = ProtectedDict(user_ns)

659
    # These 2 commands can be used by user script loaded during
660
661
662
    # the initialization
    user_ns["protect"] = protected_user_ns.protect
    user_ns["unprotect"] = protected_user_ns.unprotect
663

664
    if session_name and not session_name.startswith(constants.DEFAULT_SESSION_NAME):
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
665
        try:
666
667
668
669
            session = initialize(
                session_name,
                session_env=user_ns,
                expert_error_report=expert_error_report,
670
                early_log_info=early_log_info,
671
            )
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
672
673
674
675
676
        except RuntimeError as e:
            if use_tmux:
                print("\n", "*" * 20, "\n", e, "\n", "*" * 20)
                gevent.sleep(10)  # just to let the eyes to see the message ;)
            raise
GUILLOU Perceval's avatar
GUILLOU Perceval committed
677
    else:
678
679
680
681
        session = initialize(
            session_name=None,
            session_env=user_ns,
            expert_error_report=expert_error_report,
682
            early_log_info=early_log_info,
683
        )
684

685
    if session.name != constants.DEFAULT_SESSION_NAME:
686
        protected_user_ns._protect(session.object_names)
687
688
689
690
        # protect Aliases if they exist
        if "ALIASES" in protected_user_ns:
            for alias in protected_user_ns["ALIASES"].names_iter():
                if alias in protected_user_ns:
691
                    protected_user_ns._protect(alias)
692

693
694
695
696
    def last_error(index=None):
        hist = user_ns["ERROR_REPORT"].history
        try:
            idx = -1 if index is None else index
697
698
699
700
            fmt_tb = hist[idx].format(
                disable_blacklist=user_ns["ERROR_REPORT"].expert_mode,
                max_nb_locals=15,
                max_local_len=200,
701
            )
702
            pprint_traceback(fmt_tb, BlissRepl()._current_style)
703
704
705
706
707
708
        except IndexError:
            if index is None:
                print("None")
            else:
                print(f"No exception with index {index} found, size is {len(hist)}")

709
    # handle the last error report
710
    # (in the shell env only)
711
    user_ns["last_error"] = last_error
712

713
714
715
716
717
718
719
720
721
722
    # protect certain imports and Globals
    to_protect = [
        "ERROR_REPORT",
        "last_error",
        "ALIASES",
        "SCAN_DISPLAY",
        "SCAN_SAVING",
        "SCANS",
    ]
    to_protect.extend(standard.__all__)
723
    protected_user_ns._protect(to_protect)
724

725
    def get_globals():
726
        return protected_user_ns
727

728
    if session_name and not session_name.startswith(constants.DEFAULT_SESSION_NAME):
729
        session_id = session_name
Vincent Michel's avatar
Vincent Michel committed
730
        session_title = "Bliss shell ({0})".format(session_name)
731
        prompt_label = session_name.upper()
732
    else:
733
        session_id = "default"
Vincent Michel's avatar
Vincent Michel committed
734
        session_title = "Bliss shell"
735
        prompt_label = "BLISS"
736

737
    history_filename = ".bliss_%s_history" % (session_id)
738
    if is_windows():
739
740
741
        history_filename = os.path.join(os.environ["USERPROFILE"], history_filename)
    else:
        history_filename = os.path.join(os.environ["HOME"], history_filename)
742

743
744
    _archive_history(history_filename)

745
    # Create REPL.
746
    repl = BlissRepl(
747
        get_globals=get_globals,
748
749
750
751
752
753
        session=session,
        vi_mode=vi_mode,
        prompt_label=prompt_label,
        title=session_title,
        history_filename=history_filename,
        startup_paths=startup_paths,
754
        session_name=session_name,
GUILLOU Perceval's avatar
GUILLOU Perceval committed
755
        use_tmux=use_tmux,
756
        style=style,
757
        **kwargs,
758
    )
759

760
    # Custom keybindings
761
762
763
    configure_repl(repl)

    return repl
764

765
766
767

def embed(*args, **kwargs):
    """
768
    Call this to embed bliss shell at the current point in your program
769
    """
770
771
    use_tmux = kwargs.get("use_tmux", False)

772
    with log_utils.filter_warnings():
773
        cmd_line_i = cli(*args, **kwargs)
774
775
776
        scans_display = ScanDisplayDispatcher(cmd_line_i)
        if not is_windows() and use_tmux:
            scans_display.set_use_progress_bar(True)
777

778
        with patch_stdout_context(raw=True):
779
            asyncio.run(cmd_line_i.run_async())