From f86ceeeab706f31682663b3bc16d380e187212b7 Mon Sep 17 00:00:00 2001 From: Matias Guijarro Date: Mon, 19 Apr 2021 17:40:10 +0200 Subject: [PATCH] repl: introduce 'NoThreadPythonRepl', a thread-free version of ptpython's PythonInput Threads have been introduced in ptpython 3.0.11 ; the input UI runs in a separate thread. In addition, default completers also run in threads. This is a problem for us, for 3 reasons: - aiogevent sets up a gevent backend for asyncio, as a result operations that run in an executor for example are executing in different gevent hubs ; it is not safe to share gevent objects between threads - when showing results, code is called from another thread - as we display `__info__` strings which can communicate via sockets etc, we get "cannot switch to a different thread" error since sockets cannot be shared between gevent loops in different threads - when executing properties and methods discovery for completion, there is a possibility of communication via sockets, to get redis keys (for example), this cannot be executed in another thread (same reason as above) This code overwrites ._create_buffer(), .read() and .run_async() in order to provide versions with no threads ; in our case there is no blocking because we use aiogevent for asyncio + monkey-patched Python so we can avoid threads. completely. --- bliss/shell/cli/no_thread_repl.py | 188 ++++++++++++++++++++++++++++++ bliss/shell/cli/repl.py | 51 +++----- 2 files changed, 202 insertions(+), 37 deletions(-) create mode 100644 bliss/shell/cli/no_thread_repl.py diff --git a/bliss/shell/cli/no_thread_repl.py b/bliss/shell/cli/no_thread_repl.py new file mode 100644 index 0000000000..f040d33385 --- /dev/null +++ b/bliss/shell/cli/no_thread_repl.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the bliss project +# +# Copyright (c) 2015-2020 Beamline Control Unit, ESRF +# Distributed under the GNU LGPLv3. See LICENSE for more info. + +"""PtPython REPL with no threads""" +import asyncio +from typing import Optional + +from ptpython.repl import PythonRepl, set_title, clear_title +from ptpython.python_input import ( + AutoSuggestFromHistory, + Buffer, + ConditionalValidator, + ConditionalAutoSuggest, + Condition, + DEFAULT_BUFFER, + Document, + InputMode, + unindent_code, +) + + +class NoThreadPythonRepl(PythonRepl): + """ + ptpython PythonRepl with no threads + + Threads have been introduced in ptpython 3.0.11 ; the input UI runs in a + separate thread. In addition, default completers also run in threads. + This is a problem for us, for 3 reasons: + + - aiogevent sets up a gevent backend for asyncio, as a result operations that + run in an executor for example are executing in different gevent hubs ; it + is not safe to share gevent objects between threads + - when showing results, code is called from another thread + - as we display `__info__` strings which can communicate via sockets etc, + we get "cannot switch to a different thread" error since sockets cannot be + shared between gevent loops in different threads + - when executing properties and methods discovery for completion, there is a + possibility of communication via sockets, to get redis keys (for example), + this cannot be executed in another thread (same reason as above) + + This code overwrites ._create_buffer(), .read() and .run_async() in order to provide + versions with no threads ; in our case there is no blocking because we use + aiogevent for asyncio + monkey-patched Python so we can avoid threads + completely. + """ + + def _create_buffer(self) -> Buffer: + """ + Create the `Buffer` for the Python input. + + Same method as super()._create_buffer, except that completers and auto-suggestion are + replaced by non-threaded flavours + """ + # only completers are changed to non-threaded flavours + python_buffer = Buffer( + name=DEFAULT_BUFFER, + complete_while_typing=Condition(lambda: self.complete_while_typing), + enable_history_search=Condition(lambda: self.enable_history_search), + tempfile_suffix=".py", + history=self.history, + completer=self._completer, # was: ThreadedCompleter(self._completer) + validator=ConditionalValidator( + self._validator, Condition(lambda: self.enable_input_validation) + ), + auto_suggest=ConditionalAutoSuggest( + AutoSuggestFromHistory(), # was: ThreadedAutoSuggest(AutoSuggestFromHistory()) + Condition(lambda: self.enable_auto_suggest), + ), + accept_handler=self._accept_handler, + on_text_changed=self._on_input_timeout, + ) + + return python_buffer + + async def read(self) -> str: + """Read the input + + Same method as super().read, except that thread is replaced by asyncio + (hence the 'async' keyword added to method definition) + """ + # Capture the current input_mode in order to restore it after reset, + # for ViState.reset() sets it to InputMode.INSERT unconditionally and + # doesn't accept any arguments. + def pre_run(last_input_mode: InputMode = self.app.vi_state.input_mode,) -> None: + if self.vi_keep_last_used_mode: + self.app.vi_state.input_mode = last_input_mode + + if not self.vi_keep_last_used_mode and self.vi_start_in_navigation_mode: + self.app.vi_state.input_mode = InputMode.NAVIGATION + + # Run the UI. + result: str = "" + exception: Optional[BaseException] = None + + async def in_thread() -> None: + nonlocal result, exception + try: + while True: + try: + result = await self.app.run_async( + pre_run=pre_run + ) # was: self.app.run(pre_run=pre_run) + + if result.lstrip().startswith("\x1a"): + # When the input starts with Ctrl-Z, quit the REPL. + # (Important for Windows users.) + raise EOFError + + # Remove leading whitespace. + # (Users can add extra indentation, which happens for + # instance because of copy/pasting code.) + result = unindent_code(result) + + if result and not result.isspace(): + return + except KeyboardInterrupt: + # Abort - try again. + self.default_buffer.document = Document() + except BaseException as e: + exception = e + return + + finally: + if self.insert_blank_line_after_input: + self.app.output.write("\n") + + await in_thread() # was: threading.Thread(target=in_thread); thread.start(); thread.join() + + if exception is not None: + raise exception + return result + + async def run_async(self) -> None: + """Run the REPL loop + + This is the same as super().run_async except that there is no thread + """ + loop = asyncio.get_event_loop() + + if self.terminal_title: + set_title(self.terminal_title) + + self._add_to_namespace() + + try: + while True: + try: + # Read. + try: + text = ( + await self.read() + ) # was: await loop.run_in_executor(None, self.read) + except EOFError: + return + + # Eval. + try: + result = await self.eval_async(text) + except KeyboardInterrupt as e: # KeyboardInterrupt doesn't inherit from Exception. + raise + except SystemExit: + return + except BaseException as e: + self._handle_exception(e) + else: + # Print. + if result is not None: + self.show_result( + result + ) # was: await loop.run_in_executor(None, lambda: self.show_result(result)) + + # Loop. + self.current_statement_index += 1 + self.signatures = [] + + except KeyboardInterrupt as e: + # XXX: This does not yet work properly. In some situations, + # `KeyboardInterrupt` exceptions can end up in the event + # loop selector. + self._handle_keyboard_interrupt(e) + finally: + if self.terminal_title: + clear_title() + self._remove_from_namespace() diff --git a/bliss/shell/cli/repl.py b/bliss/shell/cli/repl.py index a7c97b66f7..2ed0fe3dcf 100644 --- a/bliss/shell/cli/repl.py +++ b/bliss/shell/cli/repl.py @@ -7,8 +7,6 @@ """Bliss REPL (Read Eval Print Loop)""" import asyncio -import queue -import threading import contextlib import os import sys @@ -20,14 +18,11 @@ import traceback import gevent import logging import platform - from collections import deque from datetime import datetime -from ptpython.repl import PythonRepl - -from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context import ptpython.layout +from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context from prompt_toolkit.output import DummyOutput # imports needed to have control over _execute of ptpython @@ -41,21 +36,21 @@ from bliss.shell.cli import style as repl_style from bliss.shell.cli.prompt import BlissPrompt from bliss.shell.cli.typing_helper import TypingHelper from bliss.shell.cli.ptpython_statusbar_patch import NEWstatus_bar, TMUXstatus_bar +from bliss.shell.bliss_banners import print_rainbow_banner +from bliss.shell.cli.protected_dict import ProtectedDict +from bliss.shell.cli.no_thread_repl import NoThreadPythonRepl +from bliss.shell import standard from bliss import set_bliss_shell_mode from bliss.common.utils import ShellStr, Singleton from bliss.common import constants +from bliss.common import session as session_mdl +from bliss.common.session import DefaultSession from bliss import release, current_session from bliss.config import static +from bliss.config.conductor.client import get_default_connection from bliss.shell.standard import info from bliss.common.logtools import userlogger, elogbook -from bliss.shell.cli.protected_dict import ProtectedDict -from bliss.shell import standard - -from bliss.common import session as session_mdl -from bliss.common.session import DefaultSession -from bliss.config.conductor.client import get_default_connection -from bliss.shell.bliss_banners import print_rainbow_banner logger = logging.getLogger(__name__) @@ -307,12 +302,8 @@ class PromptToolkitOutputWrapper(DummyOutput): return self.__wrapped_output.fileno() -class BlissRepl(PythonRepl, metaclass=Singleton): +class BlissRepl(NoThreadPythonRepl, metaclass=Singleton): def __init__(self, *args, **kwargs): - self._show_result_aw = gevent.get_hub().loop.async_() - self._show_result_aw.start(self._on_result) - self._result_q = queue.Queue() - prompt_label = kwargs.pop("prompt_label", "BLISS") title = kwargs.pop("title", None) session = kwargs.pop("session") @@ -370,29 +361,15 @@ class BlissRepl(PythonRepl, metaclass=Singleton): self.typing_helper = TypingHelper(self) - def _on_result(self): - # spawn, because we cannot block in async watcher callback - gevent.spawn(self._do_handle_result, self._last_result) - - def _do_handle_result(self, result): - if hasattr(result, "__info__"): - result = Info(result) - logging.getLogger("user_input").info(result) - elogbook.command(result) - self._result_q.put(result) - ## # NB: next methods are overloaded ## def show_result(self, result): - # warning: this may be called from a different thread each time - # (when "run_async" is used) - if threading.current_thread() is threading.main_thread(): - self._do_handle_result(result) - else: - self._last_result = result - self._show_result_aw.send() - return super().show_result(self._result_q.get()) + if hasattr(result, "__info__"): + result = Info(result) + logging.getLogger("user_input").info(result) + elogbook.command(result) + return super().show_result(result) def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None: sys.excepthook(*sys.exc_info()) -- GitLab