| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """PwPtPythonPane class.""" |
| |
| import asyncio |
| import functools |
| import io |
| import logging |
| import os |
| import sys |
| import shlex |
| import subprocess |
| from typing import Iterable, Optional, TYPE_CHECKING |
| |
| from prompt_toolkit.buffer import Buffer |
| from prompt_toolkit.layout.controls import BufferControl |
| from prompt_toolkit.completion import merge_completers |
| from prompt_toolkit.filters import ( |
| Condition, |
| has_focus, |
| to_filter, |
| ) |
| from ptpython.completer import ( # type: ignore |
| CompletePrivateAttributes, PythonCompleter, |
| ) |
| import ptpython.repl # type: ignore |
| from ptpython.layout import ( # type: ignore |
| CompletionVisualisation, Dimension, |
| ) |
| |
| import pw_console.text_formatting |
| |
| if TYPE_CHECKING: |
| from pw_console.repl_pane import ReplPane |
| |
| _LOG = logging.getLogger(__package__) |
| _SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command') |
| |
| |
| class MissingPtpythonBufferControl(Exception): |
| """Exception for a missing ptpython BufferControl object.""" |
| |
| |
| def _user_input_is_a_shell_command(text: str) -> bool: |
| return text.startswith('!') |
| |
| |
| class PwPtPythonRepl(ptpython.repl.PythonRepl): # pylint: disable=too-many-instance-attributes |
| """A ptpython repl class with changes to code execution and output related |
| methods.""" |
| def __init__( |
| self, |
| *args, |
| # pw_console specific kwargs |
| extra_completers: Optional[Iterable] = None, |
| **ptpython_kwargs, |
| ): |
| |
| completer = None |
| if extra_completers: |
| # Create the default python completer used by |
| # ptpython.repl.PythonRepl |
| python_completer = PythonCompleter( |
| # No self.get_globals yet so this must be a lambda |
| # pylint: disable=unnecessary-lambda |
| lambda: self.get_globals(), |
| lambda: self.get_locals(), |
| lambda: self.enable_dictionary_completion, # type: ignore |
| ) |
| |
| all_completers = [python_completer] |
| all_completers.extend(extra_completers) |
| # Merge default Python completer with the new custom one. |
| completer = merge_completers(all_completers) |
| |
| super().__init__( |
| *args, |
| create_app=False, |
| # Absolute minimum height of 1 |
| _input_buffer_height=Dimension(min=1), |
| _completer=completer, |
| **ptpython_kwargs, |
| ) |
| |
| self.enable_mouse_support: bool = True |
| self.enable_history_search: bool = True |
| self.enable_dictionary_completion: bool = True |
| self._set_pt_python_input_buffer_control_focusable() |
| |
| # Change some ptpython.repl defaults. |
| self.show_status_bar = False |
| self.show_exit_confirmation = False |
| self.complete_private_attributes = ( |
| CompletePrivateAttributes.IF_NO_PUBLIC) |
| |
| # Function signature that shows args, kwargs, and types under the cursor |
| # of the input window. |
| self.show_signature: bool = True |
| # Docstring of the current completed function that appears at the bottom |
| # of the input window. |
| self.show_docstring: bool = False |
| |
| # Turn off the completion menu in ptpython. The CompletionsMenu in |
| # ConsoleApp.root_container will handle this. |
| self.completion_visualisation: CompletionVisualisation = ( |
| CompletionVisualisation.NONE) |
| |
| # Additional state variables. |
| self.repl_pane: 'Optional[ReplPane]' = None |
| self._last_result = None |
| self._last_exception = None |
| |
| def _set_pt_python_input_buffer_control_focusable(self) -> None: |
| """Enable focus_on_click for ptpython's input buffer.""" |
| error_message = ( |
| 'Unable to find ptpythons BufferControl input object.\n' |
| ' For the last known position see:\n' |
| ' https://github.com/prompt-toolkit/ptpython/' |
| 'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/' |
| 'ptpython/layout.py#L598\n' |
| '\n' |
| 'The installed version of ptpython may not be compatible with' |
| ' pw console; please try re-running environment setup.') |
| |
| try: |
| # Fetch the Window's BufferControl object. |
| # From ptpython/layout.py: |
| # self.root_container = HSplit([ |
| # VSplit([ |
| # HSplit([ |
| # FloatContainer( |
| # content=HSplit( |
| # [create_python_input_window()] + extra_body |
| # ), ... |
| ptpython_buffer_control = ( |
| self.ptpython_layout.root_container.children[0].children[0]. |
| children[0].content.children[0].content) |
| # This should be a BufferControl instance |
| if not isinstance(ptpython_buffer_control, BufferControl): |
| raise MissingPtpythonBufferControl(error_message) |
| # Enable focus options |
| ptpython_buffer_control.focusable = to_filter(True) |
| ptpython_buffer_control.focus_on_click = to_filter(True) |
| except IndexError as _error: |
| raise MissingPtpythonBufferControl(error_message) |
| |
| def __pt_container__(self): |
| """Return the prompt_toolkit root container for class. |
| |
| This allows self to be used wherever prompt_toolkit expects a container |
| object.""" |
| return self.ptpython_layout.root_container |
| |
| def set_repl_pane(self, repl_pane): |
| """Update the parent pw_console.ReplPane reference.""" |
| self.repl_pane = repl_pane |
| |
| def _save_result(self, formatted_text): |
| """Save the last repl execution result.""" |
| unformatted_result = pw_console.text_formatting.remove_formatting( |
| formatted_text) |
| self._last_result = unformatted_result |
| |
| def _save_exception(self, formatted_text): |
| """Save the last repl exception.""" |
| unformatted_result = pw_console.text_formatting.remove_formatting( |
| formatted_text) |
| self._last_exception = unformatted_result |
| |
| def clear_last_result(self): |
| """Erase the last repl execution result.""" |
| self._last_result = None |
| self._last_exception = None |
| |
| def show_result(self, result): |
| """Format and save output results. |
| |
| This function is called from the _run_user_code() function which is |
| always run from the user code thread, within |
| .run_and_show_expression_async(). |
| """ |
| formatted_result = self._format_result_output(result) |
| self._save_result(formatted_result) |
| |
| def _handle_exception(self, e: BaseException) -> None: |
| """Format and save output results. |
| |
| This function is called from the _run_user_code() function which is |
| always run from the user code thread, within |
| .run_and_show_expression_async(). |
| """ |
| formatted_result = self._format_exception_output(e) |
| self._save_exception(formatted_result.__pt_formatted_text__()) |
| |
| def user_code_complete_callback(self, input_text, future): |
| """Callback to run after user repl code is finished.""" |
| # If there was an exception it will be saved in self._last_result |
| result_text = self._last_result |
| result_object = None |
| exception_text = self._last_exception |
| |
| # _last_results consumed, erase for the next run. |
| self.clear_last_result() |
| |
| stdout_contents = None |
| stderr_contents = None |
| if future.result(): |
| future_result = future.result() |
| stdout_contents = future_result['stdout'] |
| stderr_contents = future_result['stderr'] |
| result_object = future_result['result'] |
| |
| if result_object is not None: |
| # Use ptpython formatted results: |
| formatted_result = self._format_result_output(result_object) |
| result_text = pw_console.text_formatting.remove_formatting( |
| formatted_result) |
| |
| # Job is finished, append the last result. |
| self.repl_pane.append_result_to_executed_code( |
| input_text, |
| future, |
| result_text, |
| stdout_contents, |
| stderr_contents, |
| exception_text=exception_text, |
| result_object=result_object, |
| ) |
| |
| # Rebuild output buffer. |
| self.repl_pane.update_output_buffer( |
| 'pw_ptpython_repl.user_code_complete_callback') |
| |
| # Trigger a prompt_toolkit application redraw. |
| self.repl_pane.application.application.invalidate() |
| |
| async def _run_system_command(self, text, stdout_proxy, |
| _stdin_proxy) -> int: |
| """Run a shell command and print results to the repl.""" |
| command = shlex.split(text) |
| returncode = None |
| env = os.environ.copy() |
| # Force colors in Pigweed subcommands and some terminal apps. |
| env['PW_USE_COLOR'] = '1' |
| env['CLICOLOR_FORCE'] = '1' |
| |
| def _handle_output(output): |
| # Force tab characters to 8 spaces to prevent \t from showing in |
| # prompt_toolkit. |
| output = output.replace('\t', ' ') |
| # Strip some ANSI sequences that don't render. |
| output = output.replace('\x1b(B\x1b[m', '') |
| output = output.replace('\x1b[1m', '') |
| stdout_proxy.write(output) |
| _SYSTEM_COMMAND_LOG.info(output.rstrip()) |
| |
| with subprocess.Popen(command, |
| env=env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| errors='replace') as proc: |
| # Print the command |
| _SYSTEM_COMMAND_LOG.info('') |
| _SYSTEM_COMMAND_LOG.info('$ %s', text) |
| while returncode is None: |
| if not proc.stdout: |
| continue |
| |
| # Check for one line and update. |
| output = proc.stdout.readline() |
| _handle_output(output) |
| |
| returncode = proc.poll() |
| |
| # Print any remaining lines. |
| for output in proc.stdout.readlines(): |
| _handle_output(output) |
| |
| return returncode |
| |
| async def _run_user_code(self, text, stdout_proxy, stdin_proxy): |
| """Run user code and capture stdout+err. |
| |
| This fuction should be run in a separate thread from the main |
| prompt_toolkit application.""" |
| # NOTE: This function runs in a separate thread using the asyncio event |
| # loop defined by self.repl_pane.application.user_code_loop. Patching |
| # stdout here will not effect the stdout used by prompt_toolkit and the |
| # main user interface. |
| |
| # Patch stdout and stderr to capture repl print() statements. |
| original_stdout = sys.stdout |
| original_stderr = sys.stderr |
| |
| sys.stdout = stdout_proxy |
| sys.stderr = stdin_proxy |
| |
| # Run user repl code |
| try: |
| if _user_input_is_a_shell_command(text): |
| result = await self._run_system_command( |
| text[1:], stdout_proxy, stdin_proxy) |
| else: |
| result = await self.run_and_show_expression_async(text) |
| finally: |
| # Always restore original stdout and stderr |
| sys.stdout = original_stdout |
| sys.stderr = original_stderr |
| |
| # Save the captured output |
| stdout_contents = stdout_proxy.getvalue() |
| stderr_contents = stdin_proxy.getvalue() |
| |
| return { |
| 'stdout': stdout_contents, |
| 'stderr': stderr_contents, |
| 'result': result |
| } |
| |
| def _accept_handler(self, buff: Buffer) -> bool: |
| """Function executed when pressing enter in the ptpython.repl.PythonRepl |
| input buffer.""" |
| # Do nothing if no text is entered. |
| if len(buff.text) == 0: |
| return False |
| if self.repl_pane is None: |
| return False |
| |
| repl_input_text = buff.text |
| # Exit if quit or exit |
| if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']: |
| self.repl_pane.application.application.exit() # type: ignore |
| |
| # Create stdout and stderr proxies |
| temp_stdout = io.StringIO() |
| temp_stderr = io.StringIO() |
| |
| # The help() command with no args uses it's own interactive prompt which |
| # will not work if prompt_toolkit is running. |
| if repl_input_text.strip() in ['help()']: |
| # Run nothing |
| repl_input_text = '' |
| # Override stdout |
| temp_stdout.write( |
| 'Error: Interactive help() is not compatible with this repl.') |
| |
| # Pop open the system command log pane for shell commands. |
| if _user_input_is_a_shell_command(repl_input_text): |
| self.repl_pane.application.setup_command_runner_log_pane() |
| |
| # Execute the repl code in the the separate user_code thread loop. |
| future = asyncio.run_coroutine_threadsafe( |
| # This function will be executed in a separate thread. |
| self._run_user_code(repl_input_text, temp_stdout, temp_stderr), |
| # Using this asyncio event loop. |
| self.repl_pane.application.user_code_loop) # type: ignore |
| |
| # Save the input text and future object. |
| self.repl_pane.append_executed_code(repl_input_text, future, |
| temp_stdout, |
| temp_stderr) # type: ignore |
| |
| # Run user_code_complete_callback() when done. |
| done_callback = functools.partial(self.user_code_complete_callback, |
| repl_input_text) |
| future.add_done_callback(done_callback) |
| |
| # Rebuild the parent ReplPane output buffer. |
| self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler') |
| |
| # TODO(tonymd): Return True if exception is found? |
| # Don't keep input for now. Return True to keep input text. |
| return False |
| |
| def line_break_count(self) -> int: |
| return self.default_buffer.text.count('\n') |
| |
| def input_empty_if_in_focus_condition(self) -> Condition: |
| @Condition |
| def test() -> bool: |
| if has_focus(self)() and len(self.default_buffer.text) == 0: |
| return True |
| return not has_focus(self)() |
| |
| return test |