blob: c1349828711e7bfbaf99f189725a4e11a3d44fb4 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""PwPtPythonPane class."""
import asyncio
import functools
import io
import logging
import os
import sys
import shlex
import subprocess
from typing import Iterable, Optional, TYPE_CHECKING
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.layout.controls import BufferControl
from prompt_toolkit.completion import merge_completers
from prompt_toolkit.filters import (
from ptpython.completer import ( # type: ignore
CompletePrivateAttributes, PythonCompleter,
import ptpython.repl # type: ignore
from ptpython.layout import ( # type: ignore
CompletionVisualisation, Dimension,
import pw_console.text_formatting
from pw_console.repl_pane import ReplPane
_LOG = logging.getLogger(__package__)
_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command')
class MissingPtpythonBufferControl(Exception):
"""Exception for a missing ptpython BufferControl object."""
def _user_input_is_a_shell_command(text: str) -> bool:
return text.startswith('!')
class PwPtPythonRepl(ptpython.repl.PythonRepl): # pylint: disable=too-many-instance-attributes
"""A ptpython repl class with changes to code execution and output related
def __init__(
# pw_console specific kwargs
extra_completers: Optional[Iterable] = None,
completer = None
if extra_completers:
# Create the default python completer used by
# ptpython.repl.PythonRepl
python_completer = PythonCompleter(
# No self.get_globals yet so this must be a lambda
# pylint: disable=unnecessary-lambda
lambda: self.get_globals(),
lambda: self.get_locals(),
lambda: self.enable_dictionary_completion, # type: ignore
all_completers = [python_completer]
# Merge default Python completer with the new custom one.
completer = merge_completers(all_completers)
# Absolute minimum height of 1
self.enable_mouse_support: bool = True
self.enable_history_search: bool = True
self.enable_dictionary_completion: bool = True
# Change some ptpython.repl defaults.
self.show_status_bar = False
self.show_exit_confirmation = False
self.complete_private_attributes = (
# Function signature that shows args, kwargs, and types under the cursor
# of the input window.
self.show_signature: bool = True
# Docstring of the current completed function that appears at the bottom
# of the input window.
self.show_docstring: bool = False
# Turn off the completion menu in ptpython. The CompletionsMenu in
# ConsoleApp.root_container will handle this.
self.completion_visualisation: CompletionVisualisation = (
# Additional state variables.
self.repl_pane: 'Optional[ReplPane]' = None
self._last_result = None
self._last_exception = None
def _set_pt_python_input_buffer_control_focusable(self) -> None:
"""Enable focus_on_click for ptpython's input buffer."""
error_message = (
'Unable to find ptpythons BufferControl input object.\n'
' For the last known position see:\n'
'The installed version of ptpython may not be compatible with'
' pw console; please try re-running environment setup.')
# Fetch the Window's BufferControl object.
# From ptpython/
# self.root_container = HSplit([
# VSplit([
# HSplit([
# FloatContainer(
# content=HSplit(
# [create_python_input_window()] + extra_body
# ), ...
ptpython_buffer_control = (
# This should be a BufferControl instance
if not isinstance(ptpython_buffer_control, BufferControl):
raise MissingPtpythonBufferControl(error_message)
# Enable focus options
ptpython_buffer_control.focusable = to_filter(True)
ptpython_buffer_control.focus_on_click = to_filter(True)
except IndexError as _error:
raise MissingPtpythonBufferControl(error_message)
def __pt_container__(self):
"""Return the prompt_toolkit root container for class.
This allows self to be used wherever prompt_toolkit expects a container
return self.ptpython_layout.root_container
def set_repl_pane(self, repl_pane):
"""Update the parent pw_console.ReplPane reference."""
self.repl_pane = repl_pane
def _save_result(self, formatted_text):
"""Save the last repl execution result."""
unformatted_result = pw_console.text_formatting.remove_formatting(
self._last_result = unformatted_result
def _save_exception(self, formatted_text):
"""Save the last repl exception."""
unformatted_result = pw_console.text_formatting.remove_formatting(
self._last_exception = unformatted_result
def clear_last_result(self):
"""Erase the last repl execution result."""
self._last_result = None
self._last_exception = None
def show_result(self, result):
"""Format and save output results.
This function is called from the _run_user_code() function which is
always run from the user code thread, within
formatted_result = self._format_result_output(result)
def _handle_exception(self, e: BaseException) -> None:
"""Format and save output results.
This function is called from the _run_user_code() function which is
always run from the user code thread, within
formatted_result = self._format_exception_output(e)
def user_code_complete_callback(self, input_text, future):
"""Callback to run after user repl code is finished."""
# If there was an exception it will be saved in self._last_result
result_text = self._last_result
result_object = None
exception_text = self._last_exception
# _last_results consumed, erase for the next run.
stdout_contents = None
stderr_contents = None
if future.result():
future_result = future.result()
stdout_contents = future_result['stdout']
stderr_contents = future_result['stderr']
result_object = future_result['result']
if result_object is not None:
# Use ptpython formatted results:
formatted_result = self._format_result_output(result_object)
result_text = pw_console.text_formatting.remove_formatting(
# Job is finished, append the last result.
# Rebuild output buffer.
# Trigger a prompt_toolkit application redraw.
async def _run_system_command(self, text, stdout_proxy,
_stdin_proxy) -> int:
"""Run a shell command and print results to the repl."""
command = shlex.split(text)
returncode = None
env = os.environ.copy()
# Force colors in Pigweed subcommands and some terminal apps.
env['PW_USE_COLOR'] = '1'
env['CLICOLOR_FORCE'] = '1'
def _handle_output(output):
# Force tab characters to 8 spaces to prevent \t from showing in
# prompt_toolkit.
output = output.replace('\t', ' ')
# Strip some ANSI sequences that don't render.
output = output.replace('\x1b(B\x1b[m', '')
output = output.replace('\x1b[1m', '')
with subprocess.Popen(command,
errors='replace') as proc:
# Print the command'')'$ %s', text)
while returncode is None:
if not proc.stdout:
# Check for one line and update.
output = proc.stdout.readline()
returncode = proc.poll()
# Print any remaining lines.
for output in proc.stdout.readlines():
return returncode
async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
"""Run user code and capture stdout+err.
This fuction should be run in a separate thread from the main
prompt_toolkit application."""
# NOTE: This function runs in a separate thread using the asyncio event
# loop defined by self.repl_pane.application.user_code_loop. Patching
# stdout here will not effect the stdout used by prompt_toolkit and the
# main user interface.
# Patch stdout and stderr to capture repl print() statements.
original_stdout = sys.stdout
original_stderr = sys.stderr
sys.stdout = stdout_proxy
sys.stderr = stdin_proxy
# Run user repl code
if _user_input_is_a_shell_command(text):
result = await self._run_system_command(
text[1:], stdout_proxy, stdin_proxy)
result = await self.run_and_show_expression_async(text)
# Always restore original stdout and stderr
sys.stdout = original_stdout
sys.stderr = original_stderr
# Save the captured output
stdout_contents = stdout_proxy.getvalue()
stderr_contents = stdin_proxy.getvalue()
return {
'stdout': stdout_contents,
'stderr': stderr_contents,
'result': result
def _accept_handler(self, buff: Buffer) -> bool:
"""Function executed when pressing enter in the ptpython.repl.PythonRepl
input buffer."""
# Do nothing if no text is entered.
if len(buff.text) == 0:
return False
if self.repl_pane is None:
return False
repl_input_text = buff.text
# Exit if quit or exit
if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
self.repl_pane.application.application.exit() # type: ignore
# Create stdout and stderr proxies
temp_stdout = io.StringIO()
temp_stderr = io.StringIO()
# The help() command with no args uses it's own interactive prompt which
# will not work if prompt_toolkit is running.
if repl_input_text.strip() in ['help()']:
# Run nothing
repl_input_text = ''
# Override stdout
'Error: Interactive help() is not compatible with this repl.')
# Pop open the system command log pane for shell commands.
if _user_input_is_a_shell_command(repl_input_text):
# Execute the repl code in the the separate user_code thread loop.
future = asyncio.run_coroutine_threadsafe(
# This function will be executed in a separate thread.
self._run_user_code(repl_input_text, temp_stdout, temp_stderr),
# Using this asyncio event loop.
self.repl_pane.application.user_code_loop) # type: ignore
# Save the input text and future object.
self.repl_pane.append_executed_code(repl_input_text, future,
temp_stderr) # type: ignore
# Run user_code_complete_callback() when done.
done_callback = functools.partial(self.user_code_complete_callback,
# Rebuild the parent ReplPane output buffer.
# TODO(tonymd): Return True if exception is found?
# Don't keep input for now. Return True to keep input text.
return False
def line_break_count(self) -> int:
return self.default_buffer.text.count('\n')
def input_empty_if_in_focus_condition(self) -> Condition:
def test() -> bool:
if has_focus(self)() and len(self.default_buffer.text) == 0:
return True
return not has_focus(self)()
return test