blob: 3886f80ba90e0f23de61501c3da3b135e73c956a [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
""" Prompt toolkit application for pw watch. """
import asyncio
import logging
from pathlib import Path
import re
import sys
import time
from typing import Callable, Dict, Iterable, List, NoReturn, Optional
from prompt_toolkit.application import Application
from prompt_toolkit.clipboard.pyperclip import PyperclipClipboard
from prompt_toolkit.filters import Condition
from prompt_toolkit.history import (
InMemoryHistory,
History,
ThreadedHistory,
)
from prompt_toolkit.key_binding import (
KeyBindings,
KeyBindingsBase,
merge_key_bindings,
)
from prompt_toolkit.layout import (
Dimension,
DynamicContainer,
Float,
FloatContainer,
FormattedTextControl,
HSplit,
Layout,
Window,
)
from prompt_toolkit.layout.controls import BufferControl
from prompt_toolkit.styles import DynamicStyle, merge_styles, Style
from prompt_toolkit.formatted_text import StyleAndTextTuples
from pw_console.console_app import get_default_colordepth
from pw_console.get_pw_console_app import PW_CONSOLE_APP_CONTEXTVAR
from pw_console.key_bindings import DEFAULT_KEY_BINDINGS
from pw_console.log_pane import LogPane
from pw_console.plugin_mixin import PluginMixin
from pw_console.plugins.twenty48_pane import Twenty48Pane
from pw_console.quit_dialog import QuitDialog
from pw_console.window_manager import WindowManager
import pw_console.style
from pw_console.style import get_theme_colors
import pw_console.widgets.border
from pw_build.project_builder_prefs import ProjectBuilderPrefs
class WatchAppPrefs(ProjectBuilderPrefs):
"""Add pw_console specific prefs standard ProjectBuilderPrefs."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.registered_commands = DEFAULT_KEY_BINDINGS
self.registered_commands.update(self.user_key_bindings)
self.default_config.update(
{
'key_bindings': DEFAULT_KEY_BINDINGS,
}
)
self.reset_config()
# Required pw_console preferences for key bindings and themes
@property
def user_key_bindings(self) -> Dict[str, List[str]]:
return self._config.get('key_bindings', {})
@property
def ui_theme(self) -> str:
return self._config.get('ui_theme', '')
@ui_theme.setter
def ui_theme(self, new_ui_theme: str) -> None:
self._config['ui_theme'] = new_ui_theme
@property
def theme_colors(self):
return get_theme_colors(self.ui_theme)
def get_function_keys(self, name: str) -> List:
"""Return the keys for the named function."""
try:
return self.registered_commands[name]
except KeyError as error:
raise KeyError('Unbound key function: {}'.format(name)) from error
def register_named_key_function(
self, name: str, default_bindings: List[str]
) -> None:
self.registered_commands[name] = default_bindings
def register_keybinding(
self, name: str, key_bindings: KeyBindings, **kwargs
) -> Callable:
"""Apply registered keys for the given named function."""
def decorator(handler: Callable) -> Callable:
"`handler` is a callable or Binding."
for keys in self.get_function_keys(name):
key_bindings.add(*keys.split(' '), **kwargs)(handler)
return handler
return decorator
# Required pw_console preferences for using a log window pane.
@property
def spaces_between_columns(self) -> int:
return 2
@property
def window_column_split_method(self) -> str:
return 'horizontal'
@property
def hide_date_from_log_time(self) -> bool:
return True
@property
def column_order(self) -> list:
return []
def column_style( # pylint: disable=no-self-use
self,
_column_name: str,
_column_value: str,
default='',
) -> str:
return default
@property
def show_python_file(self) -> bool:
return self._config.get('show_python_file', False)
@property
def show_source_file(self) -> bool:
return self._config.get('show_source_file', False)
@property
def show_python_logger(self) -> bool:
return self._config.get('show_python_logger', False)
_LOG = logging.getLogger('pw_build.watch')
class WatchWindowManager(WindowManager):
def update_root_container_body(self):
self.application.window_manager_container = self.create_root_container()
class WatchApp(PluginMixin):
"""Pigweed Watch main window application."""
# pylint: disable=too-many-instance-attributes
NINJA_FAILURE_TEXT = '\033[31mFAILED: '
NINJA_BUILD_STEP = re.compile(
r"^\[(?P<step>[0-9]+)/(?P<total_steps>[0-9]+)\] (?P<action>.*)$"
)
def __init__(
self,
event_handler,
prefs: WatchAppPrefs,
log_file_name: Optional[str] = None,
):
self.event_handler = event_handler
self.external_logfile: Optional[Path] = (
Path(log_file_name) if log_file_name else None
)
self.color_depth = get_default_colordepth()
# Necessary for some of pw_console's window manager features to work
# such as mouse drag resizing.
PW_CONSOLE_APP_CONTEXTVAR.set(self) # type: ignore
self.prefs = prefs
self.quit_dialog = QuitDialog(self, self.exit) # type: ignore
self.search_history: History = ThreadedHistory(InMemoryHistory())
self.window_manager = WatchWindowManager(self)
pw_console.python_logging.setup_python_logging()
self._build_error_count = 0
self._errors_in_output = False
self.log_ui_update_frequency = 0.1 # 10 FPS
self._last_ui_update_time = time.time()
debug_logging = (
event_handler.project_builder.default_log_level == logging.DEBUG
)
level_name = 'DEBUG' if debug_logging else 'INFO'
if event_handler.separate_logfiles:
for recipe in reversed(event_handler.project_builder.build_recipes):
self.add_build_log_pane(
recipe.display_name,
recipe.log,
level_name=level_name,
)
self.add_build_log_pane(
'Pigweed Watch',
_LOG,
level_name=level_name,
)
self.ninja_log_pane = list(self.all_log_panes())[0]
self.time_waster = Twenty48Pane(include_resize_handle=True)
self.time_waster.application = self
self.time_waster.show_pane = False
self.window_manager.add_pane(self.time_waster)
self.window_manager_container = (
self.window_manager.create_root_container()
)
self.status_bar_border_style = 'class:command-runner-border'
self.root_container = FloatContainer(
HSplit(
[
pw_console.widgets.border.create_border(
HSplit(
[
# The top toolbar.
Window(
content=FormattedTextControl(
self.get_statusbar_text
),
height=Dimension.exact(1),
style='class:toolbar_inactive',
),
# Result Toolbar.
Window(
content=FormattedTextControl(
self.get_resultbar_text
),
height=lambda: len(
self.event_handler.project_builder
),
style='class:toolbar_inactive',
),
]
),
border_style=lambda: self.status_bar_border_style,
base_style='class:toolbar_inactive',
left_margin_columns=1,
right_margin_columns=1,
),
# The main content.
DynamicContainer(lambda: self.window_manager_container),
]
),
floats=[
Float(
content=self.quit_dialog,
top=2,
left=2,
),
],
)
key_bindings = KeyBindings()
@key_bindings.add('enter', filter=self.input_box_not_focused())
def _run_build(_event):
"Rebuild."
self.run_build()
@key_bindings.add('c-t', filter=self.input_box_not_focused())
def _pass_time(_event):
"Rebuild."
self.time_waster.show_pane = not self.time_waster.show_pane
self.refresh_layout()
self.window_manager.focus_first_visible_pane()
register = self.prefs.register_keybinding
@register('global.exit-no-confirmation', key_bindings)
def _quit_no_confirm(_event):
"""Quit without confirmation."""
_LOG.info('Got quit signal; exiting...')
self.exit(0)
@register('global.exit-with-confirmation', key_bindings)
def _quit_with_confirm(_event):
"""Quit with confirmation dialog."""
self.quit_dialog.open_dialog()
self.key_bindings = merge_key_bindings(
[
self.window_manager.key_bindings,
key_bindings,
]
)
self.current_theme = pw_console.style.generate_styles(
self.prefs.ui_theme
)
self.style_overrides = Style.from_dict(
{
# 'search': 'bg:ansired ansiblack',
}
)
self.layout = Layout(
self.root_container,
focused_element=self.ninja_log_pane,
)
self.application: Application = Application(
layout=self.layout,
key_bindings=self.key_bindings,
mouse_support=True,
color_depth=self.color_depth,
clipboard=PyperclipClipboard(),
style=DynamicStyle(
lambda: merge_styles(
[
self.current_theme,
self.style_overrides,
]
)
),
full_screen=True,
)
self.plugin_init(
plugin_callback=self.check_build_status,
plugin_callback_frequency=0.5,
plugin_logger_name='pw_watch_stdout_checker',
)
def add_build_log_pane(
self, title: str, logger: logging.Logger, level_name: str
) -> None:
"""Setup a new build log pane."""
new_log_pane = LogPane(application=self, pane_title=title)
new_log_pane.add_log_handler(logger, level_name=level_name)
# Set python log format to just the message itself.
new_log_pane.log_view.log_store.formatter = logging.Formatter(
'%(message)s'
)
new_log_pane.table_view = False
# Disable line wrapping for improved error visibility.
if new_log_pane.wrap_lines:
new_log_pane.toggle_wrap_lines()
# Blank right side toolbar text
new_log_pane._pane_subtitle = ' ' # pylint: disable=protected-access
# Make tab and shift-tab search for next and previous error
next_error_bindings = KeyBindings()
@next_error_bindings.add('s-tab')
def _previous_error(_event):
self.jump_to_error(backwards=True)
@next_error_bindings.add('tab')
def _next_error(_event):
self.jump_to_error()
existing_log_bindings: Optional[
KeyBindingsBase
] = new_log_pane.log_content_control.key_bindings
key_binding_list: List[KeyBindingsBase] = []
if existing_log_bindings:
key_binding_list.append(existing_log_bindings)
key_binding_list.append(next_error_bindings)
new_log_pane.log_content_control.key_bindings = merge_key_bindings(
key_binding_list
)
self.window_manager.add_pane(new_log_pane)
def logs_redraw(self):
emit_time = time.time()
# Has enough time passed since last UI redraw due to new logs?
if emit_time > self._last_ui_update_time + self.log_ui_update_frequency:
# Update last log time
self._last_ui_update_time = emit_time
# Trigger Prompt Toolkit UI redraw.
self.redraw_ui()
def jump_to_error(self, backwards: bool = False) -> None:
if not self.ninja_log_pane.log_view.search_text:
self.ninja_log_pane.log_view.set_search_regex(
'^FAILED: ', False, None
)
if backwards:
self.ninja_log_pane.log_view.search_backwards()
else:
self.ninja_log_pane.log_view.search_forwards()
self.ninja_log_pane.log_view.log_screen.reset_logs(
log_index=self.ninja_log_pane.log_view.log_index
)
self.ninja_log_pane.log_view.move_selected_line_to_top()
def refresh_layout(self) -> None:
self.window_manager.update_root_container_body()
def update_menu_items(self):
"""Required by the Window Manager Class."""
def redraw_ui(self):
"""Redraw the prompt_toolkit UI."""
if hasattr(self, 'application'):
self.application.invalidate()
def focus_on_container(self, pane):
"""Set application focus to a specific container."""
# Try to focus on the given pane
try:
self.application.layout.focus(pane)
except ValueError:
# If the container can't be focused, focus on the first visible
# window pane.
self.window_manager.focus_first_visible_pane()
def focused_window(self):
"""Return the currently focused window."""
return self.application.layout.current_window
def focus_main_menu(self):
"""Focus on the main menu.
Currently pw_watch has no main menu so focus on the first visible pane
instead."""
self.window_manager.focus_first_visible_pane()
def command_runner_is_open(self) -> bool:
# pylint: disable=no-self-use
return False
def all_log_panes(self) -> Iterable[LogPane]:
for pane in self.window_manager.active_panes():
if isinstance(pane, LogPane):
yield pane
def clear_ninja_log(self) -> None:
for pane in self.all_log_panes():
pane.log_view.log_store.clear_logs()
pane.log_view._restart_filtering() # pylint: disable=protected-access
pane.log_view.view_mode_changed()
# Re-enable follow if needed
if not pane.log_view.follow:
pane.log_view.toggle_follow()
def run_build(self):
"""Manually trigger a rebuild."""
self.clear_ninja_log()
self.event_handler.rebuild()
def rebuild_on_filechange(self):
for pane in self.all_log_panes():
pane.log_view.log_store.clear_logs()
pane.log_view.view_mode_changed()
def get_statusbar_text(self):
status = self.event_handler.status_message
fragments = [('class:logo', 'Pigweed Watch')]
is_building = False
if status:
fragments = [status]
is_building = status[1].endswith('Building')
separator = ('', ' ')
self.status_bar_border_style = 'class:theme-fg-green'
if is_building:
percent = self.event_handler.current_build_percent
percent *= 100
fragments.append(separator)
fragments.append(('ansicyan', '{:.0f}%'.format(percent)))
self.status_bar_border_style = 'class:theme-fg-yellow'
if self.event_handler.current_build_errors > 0:
fragments.append(separator)
fragments.append(('', 'Errors:'))
fragments.append(
('ansired', str(self.event_handler.current_build_errors))
)
self.status_bar_border_style = 'class:theme-fg-red'
if is_building:
fragments.append(separator)
fragments.append(('', self.event_handler.current_build_step))
return fragments
def get_resultbar_text(self) -> StyleAndTextTuples:
result = self.event_handler.result_message
if not result:
result = [('', 'Loading...')]
return result
def exit(self, exit_code: int = 0) -> None:
log_file = self.external_logfile
def _really_exit(future: asyncio.Future) -> NoReturn:
if log_file:
# Print a message showing where logs were saved to.
print('Logs saved to: {}'.format(log_file.resolve()))
sys.exit(future.result())
if self.application.future:
self.application.future.add_done_callback(_really_exit)
self.application.exit(result=exit_code)
def check_build_status(self) -> bool:
if not self.event_handler.current_stdout:
return False
if self._errors_in_output:
return True
if self.event_handler.current_build_errors > self._build_error_count:
self._errors_in_output = True
self.jump_to_error()
return True
def run(self):
self.plugin_start()
# Run the prompt_toolkit application
self.application.run(set_exception_handler=True)
def input_box_not_focused(self) -> Condition:
"""Condition checking the focused control is not a text input field."""
@Condition
def _test() -> bool:
"""Check if the currently focused control is an input buffer.
Returns:
bool: True if the currently focused control is not a text input
box. For example if the user presses enter when typing in
the search box, return False.
"""
return not isinstance(
self.application.layout.current_control, BufferControl
)
return _test