blob: 445a33b5411fc46acff8abff6eca94acc21f47d8 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Watch files for changes and rebuild.
pw watch runs Ninja in a build directory when source files change. It works with
any Ninja project (GN or CMake).
Usage examples:
# Find a build directory and build the default target
pw watch
# Find a build directory and build the stm32f429i target
pw watch python.lint stm32f429i
# Build pw_run_tests.modules in the out/cmake directory
pw watch -C out/cmake pw_run_tests.modules
# Build the default target in out/ and pw_apps in out/cmake
pw watch -C out -C out/cmake pw_apps
# Find a directory and build python.tests, and build pw_apps in out/cmake
pw watch python.tests -C out/cmake pw_apps
"""
import argparse
from dataclasses import dataclass
import errno
from itertools import zip_longest
import logging
import os
from pathlib import Path
import re
import shlex
import subprocess
import sys
import threading
from threading import Thread
from typing import (
Iterable,
List,
NamedTuple,
NoReturn,
Optional,
Sequence,
Tuple,
)
try:
import httpwatcher # type: ignore[import]
except ImportError:
httpwatcher = None
from watchdog.events import FileSystemEventHandler # type: ignore[import]
from watchdog.observers import Observer # type: ignore[import]
from prompt_toolkit.formatted_text.base import OneStyleAndTextTuple
from prompt_toolkit.formatted_text import StyleAndTextTuples
import pw_cli.branding
import pw_cli.color
import pw_cli.env
import pw_cli.log
import pw_cli.plugins
import pw_console.python_logging
from pw_watch.watch_app import WatchApp
from pw_watch.debounce import DebouncedFunction, Debouncer
_COLOR = pw_cli.color.colors()
_LOG = logging.getLogger('pw_watch')
_NINJA_LOG = logging.getLogger('pw_watch_ninja_output')
_ERRNO_INOTIFY_LIMIT_REACHED = 28
# Suppress events under 'fsevents', generated by watchdog on every file
# event on MacOS.
# TODO(b/182281481): Fix file ignoring, rather than just suppressing logs
_FSEVENTS_LOG = logging.getLogger('fsevents')
_FSEVENTS_LOG.setLevel(logging.WARNING)
_PASS_MESSAGE = """
██████╗ █████╗ ███████╗███████╗██╗
██╔══██╗██╔══██╗██╔════╝██╔════╝██║
██████╔╝███████║███████╗███████╗██║
██╔═══╝ ██╔══██║╚════██║╚════██║╚═╝
██║ ██║ ██║███████║███████║██╗
╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝
"""
# Pick a visually-distinct font from "PASS" to ensure that readers can't
# possibly mistake the difference between the two states.
_FAIL_MESSAGE = """
▄██████▒░▄▄▄ ██▓ ░██▓
▓█▓ ░▒████▄ ▓██▒ ░▓██▒
▒████▒ ░▒█▀ ▀█▄ ▒██▒ ▒██░
░▓█▒ ░░██▄▄▄▄██ ░██░ ▒██░
░▒█░ ▓█ ▓██▒░██░░ ████████▒
▒█░ ▒▒ ▓▒█░░▓ ░ ▒░▓ ░
░▒ ▒ ▒▒ ░ ▒ ░░ ░ ▒ ░
░ ░ ░ ▒ ▒ ░ ░ ░
░ ░ ░ ░ ░
"""
_FULLSCREEN_STATUS_COLUMN_WIDTH = 10
# TODO(keir): Figure out a better strategy for exiting. The problem with the
# watcher is that doing a "clean exit" is slow. However, by directly exiting,
# we remove the possibility of the wrapper script doing anything on exit.
def _die(*args) -> NoReturn:
_LOG.critical(*args)
sys.exit(1)
class WatchCharset(NamedTuple):
slug_ok: str
slug_fail: str
_ASCII_CHARSET = WatchCharset(_COLOR.green('OK '), _COLOR.red('FAIL'))
_EMOJI_CHARSET = WatchCharset('✔️ ', '💥')
@dataclass(frozen=True)
class BuildCommand:
build_dir: Path
targets: Tuple[str, ...] = ()
def args(self) -> Tuple[str, ...]:
return (str(self.build_dir), *self.targets)
def __str__(self) -> str:
return ' '.join(shlex.quote(arg) for arg in self.args())
def git_ignored(file: Path) -> bool:
"""Returns true if this file is in a Git repo and ignored by that repo.
Returns true for ignored files that were manually added to a repo.
"""
file = file.resolve()
directory = file.parent
# Run the Git command from file's parent so that the correct repo is used.
while True:
try:
returncode = subprocess.run(
['git', 'check-ignore', '--quiet', '--no-index', file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=directory).returncode
return returncode in (0, 128)
except FileNotFoundError:
# If the directory no longer exists, try parent directories until
# an existing directory is found or all directories have been
# checked. This approach makes it possible to check if a deleted
# path is ignored in the repo it was originally created in.
if directory == directory.parent:
return False
directory = directory.parent
class PigweedBuildWatcher(FileSystemEventHandler, DebouncedFunction):
"""Process filesystem events and launch builds if necessary."""
# pylint: disable=too-many-instance-attributes
NINJA_BUILD_STEP = re.compile(
r'^\[(?P<step>[0-9]+)/(?P<total_steps>[0-9]+)\] (?P<action>.*)$')
def __init__(
self,
build_commands: Sequence[BuildCommand],
patterns: Sequence[str] = (),
ignore_patterns: Sequence[str] = (),
charset: WatchCharset = _ASCII_CHARSET,
restart: bool = True,
jobs: int = None,
fullscreen: bool = False,
banners: bool = True,
keep_going: bool = False,
):
super().__init__()
self.banners = banners
self.status_message: Optional[OneStyleAndTextTuple] = None
self.result_message: Optional[StyleAndTextTuples] = None
self.current_stdout = ''
self.current_build_step = ''
self.current_build_percent = 0.0
self.current_build_errors = 0
self.patterns = patterns
self.ignore_patterns = ignore_patterns
self.build_commands = build_commands
self.charset: WatchCharset = charset
self.restart_on_changes = restart
self.fullscreen_enabled = fullscreen
self.watch_app: Optional[WatchApp] = None
# Initialize self._current_build to an empty subprocess.
self._current_build = subprocess.Popen('',
shell=True,
errors='replace')
self._extra_ninja_args = [] if jobs is None else [f'-j{jobs}']
if keep_going:
self._extra_ninja_args.extend(['-k', '0'])
self.debouncer = Debouncer(self)
# Track state of a build. These need to be members instead of locals
# due to the split between dispatch(), run(), and on_complete().
self.matching_path: Optional[Path] = None
self.builds_succeeded: List[bool] = []
if not self.fullscreen_enabled:
self.wait_for_keypress_thread = threading.Thread(
None, self._wait_for_enter)
self.wait_for_keypress_thread.start()
def rebuild(self):
""" Rebuild command triggered from watch app."""
self._current_build.terminate()
self._current_build.wait()
self.debouncer.press('Manual build requested')
def _wait_for_enter(self) -> NoReturn:
try:
while True:
_ = input()
self._current_build.terminate()
self._current_build.wait()
self.debouncer.press('Manual build requested...')
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
def _path_matches(self, path: Path) -> bool:
"""Returns true if path matches according to the watcher patterns"""
return (not any(path.match(x) for x in self.ignore_patterns)
and any(path.match(x) for x in self.patterns))
def dispatch(self, event) -> None:
# There isn't any point in triggering builds on new directory creation.
# It's the creation or modification of files that indicate something
# meaningful enough changed for a build.
if event.is_directory:
return
# Collect paths of interest from the event.
paths: List[str] = []
if hasattr(event, 'dest_path'):
paths.append(os.fsdecode(event.dest_path))
if event.src_path:
paths.append(os.fsdecode(event.src_path))
for raw_path in paths:
_LOG.debug('File event: %s', raw_path)
# Check whether Git cares about any of these paths.
for path in (Path(p).resolve() for p in paths):
if not git_ignored(path) and self._path_matches(path):
self._handle_matched_event(path)
return
def _handle_matched_event(self, matching_path: Path) -> None:
if self.matching_path is None:
self.matching_path = matching_path
log_message = f'File change detected: {os.path.relpath(matching_path)}'
if self.restart_on_changes:
if self.fullscreen_enabled and self.watch_app:
self.watch_app.rebuild_on_filechange()
self.debouncer.press(f'{log_message} Triggering build...')
else:
_LOG.info('%s ; not rebuilding', log_message)
def _clear_screen(self) -> None:
if not self.fullscreen_enabled:
print('\033c', end='') # TODO(pwbug/38): Not Windows compatible.
sys.stdout.flush()
# Implementation of DebouncedFunction.run()
#
# Note: This will run on the timer thread created by the Debouncer, rather
# than on the main thread that's watching file events. This enables the
# watcher to continue receiving file change events during a build.
def run(self) -> None:
"""Run all the builds in serial and capture pass/fail for each."""
# Clear the screen and show a banner indicating the build is starting.
self._clear_screen()
if self.fullscreen_enabled:
self.create_result_message()
_LOG.info(
_COLOR.green(
'Watching for changes. Ctrl-d to exit; enter to rebuild'))
else:
for line in pw_cli.branding.banner().splitlines():
_LOG.info(line)
_LOG.info(
_COLOR.green(
' Watching for changes. Ctrl-C to exit; enter to rebuild')
)
_LOG.info('')
_LOG.info('Change detected: %s', self.matching_path)
self._clear_screen()
self.builds_succeeded = []
num_builds = len(self.build_commands)
_LOG.info('Starting build with %d directories', num_builds)
env = os.environ.copy()
# Force colors in Pigweed subcommands run through the watcher.
env['PW_USE_COLOR'] = '1'
# Force Ninja to output ANSI colors
env['CLICOLOR_FORCE'] = '1'
for i, cmd in enumerate(self.build_commands, 1):
index = f'[{i}/{num_builds}]'
self.builds_succeeded.append(self._run_build(index, cmd, env))
if self.builds_succeeded[-1]:
level = logging.INFO
tag = '(OK)'
else:
level = logging.ERROR
tag = '(FAIL)'
_LOG.log(level, '%s Finished build: %s %s', index, cmd, tag)
self.create_result_message()
def create_result_message(self):
if not self.fullscreen_enabled:
return
self.result_message = []
first_building_target_found = False
for (succeeded, command) in zip_longest(self.builds_succeeded,
self.build_commands):
if succeeded:
self.result_message.append(
('class:theme-fg-green',
'OK'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH)))
elif succeeded is None and not first_building_target_found:
first_building_target_found = True
self.result_message.append(
('class:theme-fg-yellow',
'Building'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH)))
elif first_building_target_found:
self.result_message.append(
('', ''.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH)))
else:
self.result_message.append(
('class:theme-fg-red',
'Failed'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH)))
self.result_message.append(('', f' {command}\n'))
def _run_build(self, index: str, cmd: BuildCommand, env: dict) -> bool:
# Make sure there is a build.ninja file for Ninja to use.
build_ninja = cmd.build_dir / 'build.ninja'
if not build_ninja.exists():
# If this is a CMake directory, prompt the user to re-run CMake.
if cmd.build_dir.joinpath('CMakeCache.txt').exists():
_LOG.error('%s %s does not exist; re-run CMake to generate it',
index, build_ninja)
return False
if not cmd.build_dir.joinpath('args.gn').exists():
_LOG.error(
'%s %s does not exist; run GN or CMake in %s to generate '
'it', index, build_ninja, cmd.build_dir)
return False
_LOG.warning('%s %s does not exist; running gn gen %s', index,
build_ninja, cmd.build_dir)
if not self._execute_command(['gn', 'gen', cmd.build_dir], env):
return False
command = ['ninja', *self._extra_ninja_args, '-C', *cmd.args()]
_LOG.info('%s Starting build: %s', index,
' '.join(shlex.quote(arg) for arg in command))
return self._execute_command(command, env)
def _execute_command(self, command: list, env: dict) -> bool:
"""Runs a command with a blank before/after for visual separation."""
self.current_build_errors = 0
self.status_message = (
'class:theme-fg-yellow',
'Building'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
if self.fullscreen_enabled:
return self._execute_command_watch_app(command, env)
print()
self._current_build = subprocess.Popen(command,
env=env,
errors='replace')
returncode = self._current_build.wait()
print()
return returncode == 0
def _execute_command_watch_app(self, command: list, env: dict) -> bool:
"""Runs a command with and outputs the logs."""
if not self.watch_app:
return False
self.current_stdout = ''
returncode = None
with subprocess.Popen(command,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
errors='replace') as proc:
self._current_build = proc
# Empty line at the start.
_NINJA_LOG.info('')
while returncode is None:
if not proc.stdout:
continue
output = proc.stdout.readline()
self.current_stdout += output
line_match_result = self.NINJA_BUILD_STEP.match(output)
if line_match_result:
matches = line_match_result.groupdict()
self.current_build_step = line_match_result.group(0)
self.current_build_percent = float(
int(matches.get('step', 0)) /
int(matches.get('total_steps', 1)))
elif output.startswith(WatchApp.NINJA_FAILURE_TEXT):
_NINJA_LOG.critical(output.strip())
self.current_build_errors += 1
else:
# Mypy output mixes character encoding in its colored output
# due to it's use of the curses module retrieving the 'sgr0'
# (or exit_attribute_mode) capability from the host
# machine's terminfo database.
#
# This can result in this sequence ending up in STDOUT as
# b'\x1b(B\x1b[m'. (B tells terminals to interpret text as
# USASCII encoding but will appear in prompt_toolkit as a B
# character.
#
# The following replace calls will strip out those
# instances.
_NINJA_LOG.info(
output.replace('\x1b(B\x1b[m',
'').replace('\x1b[1m', '').strip())
self.watch_app.redraw_ui()
returncode = proc.poll()
# Empty line at the end.
_NINJA_LOG.info('')
return returncode == 0
# Implementation of DebouncedFunction.cancel()
def cancel(self) -> bool:
if self.restart_on_changes:
self._current_build.terminate()
self._current_build.wait()
return True
return False
# Implementation of DebouncedFunction.run()
def on_complete(self, cancelled: bool = False) -> None:
# First, use the standard logging facilities to report build status.
if cancelled:
self.status_message = (
'', 'Cancelled'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
_LOG.error('Finished; build was interrupted')
elif all(self.builds_succeeded):
self.status_message = (
'class:theme-fg-green',
'Succeeded'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
_LOG.info('Finished; all successful')
else:
self.status_message = (
'class:theme-fg-red',
'Failed'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
_LOG.info('Finished; some builds failed')
# Show individual build results for fullscreen app
if self.fullscreen_enabled:
self.create_result_message()
# For non-fullscreen pw watch
else:
# Show a more distinct colored banner.
if not cancelled:
# Write out build summary table so you can tell which builds
# passed and which builds failed.
_LOG.info('')
_LOG.info(' .------------------------------------')
_LOG.info(' |')
for (succeeded, cmd) in zip(self.builds_succeeded,
self.build_commands):
slug = (self.charset.slug_ok
if succeeded else self.charset.slug_fail)
_LOG.info(' | %s %s', slug, cmd)
_LOG.info(' |')
_LOG.info(" '------------------------------------")
else:
# Build was interrupted.
_LOG.info('')
_LOG.info(' .------------------------------------')
_LOG.info(' |')
_LOG.info(' | %s- interrupted', self.charset.slug_fail)
_LOG.info(' |')
_LOG.info(" '------------------------------------")
# Show a large color banner for the overall result.
if self.banners:
if all(self.builds_succeeded) and not cancelled:
for line in _PASS_MESSAGE.splitlines():
_LOG.info(_COLOR.green(line))
else:
for line in _FAIL_MESSAGE.splitlines():
_LOG.info(_COLOR.red(line))
if self.watch_app:
self.watch_app.redraw_ui()
self.matching_path = None
# Implementation of DebouncedFunction.on_keyboard_interrupt()
def on_keyboard_interrupt(self) -> NoReturn:
_exit_due_to_interrupt()
_WATCH_PATTERN_DELIMITER = ','
_WATCH_PATTERNS = (
'*.bloaty',
'*.c',
'*.cc',
'*.css',
'*.cpp',
'*.cmake',
'CMakeLists.txt',
'*.dts',
'*.dtsi',
'*.gn',
'*.gni',
'*.go',
'*.h',
'*.hpp',
'*.ld',
'*.md',
'*.options',
'*.proto',
'*.py',
'*.rs',
'*.rst',
'*.s',
'*.S',
'*.toml',
)
def add_parser_arguments(parser: argparse.ArgumentParser) -> None:
"""Sets up an argument parser for pw watch."""
parser.add_argument('--patterns',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'watch to trigger recompile'),
default=_WATCH_PATTERN_DELIMITER.join(_WATCH_PATTERNS))
parser.add_argument('--ignore_patterns',
dest='ignore_patterns_string',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'ignore events from'))
parser.add_argument('--exclude_list',
nargs='+',
type=Path,
help='directories to ignore during pw watch',
default=[])
parser.add_argument('--no-restart',
dest='restart',
action='store_false',
help='do not restart ongoing builds if files change')
parser.add_argument('-k',
'--keep-going',
action='store_true',
help=('Keep building past the first failure. This is '
'equivalent to passing "-k 0" to ninja.'))
parser.add_argument(
'default_build_targets',
nargs='*',
metavar='target',
default=[],
help=('Automatically locate a build directory and build these '
'targets. For example, `host docs` searches for a Ninja '
'build directory at out/ and builds the `host` and `docs` '
'targets. To specify one or more directories, ust the '
'-C / --build_directory option.'))
parser.add_argument(
'-C',
'--build_directory',
dest='build_directories',
nargs='+',
action='append',
default=[],
metavar=('directory', 'target'),
help=('Specify a build directory and optionally targets to '
'build. `pw watch -C out tgt` is equivalent to `ninja '
'-C out tgt`'))
parser.add_argument(
'--serve-docs',
dest='serve_docs',
action='store_true',
default=False,
help=('Start a webserver for docs on localhost. The port for this '
'webserver can be set with the --serve-docs-port option. '
'Defaults to http://127.0.0.1:8000. This option requires '
'the httpwatcher package to be installed.'))
parser.add_argument(
'--serve-docs-port',
dest='serve_docs_port',
type=int,
default=8000,
help='Set the port for the docs webserver. Default to 8000.')
parser.add_argument(
'--serve-docs-path',
dest='serve_docs_path',
type=Path,
default="docs/gen/docs",
help=('Set the path for the docs to serve. Default to docs/gen/docs'
' in the build directory.'))
parser.add_argument(
'-j',
'--jobs',
type=int,
help="Number of cores to use; defaults to Ninja's default")
parser.add_argument('-f',
'--fullscreen',
action='store_true',
default=False,
help='Use a fullscreen interface.')
parser.add_argument('--debug-logging',
action='store_true',
help='Enable debug logging.')
parser.add_argument('--no-banners',
dest='banners',
action='store_false',
help='Hide pass/fail banners.')
def _exit(code: int) -> NoReturn:
# Note: The "proper" way to exit is via observer.stop(), then
# running a join. However it's slower, so just exit immediately.
#
# Additionally, since there are several threads in the watcher, the usual
# sys.exit approach doesn't work. Instead, run the low level exit which
# kills all threads.
os._exit(code) # pylint: disable=protected-access
def _exit_due_to_interrupt() -> NoReturn:
# To keep the log lines aligned with each other in the presence of
# a '^C' from the keyboard interrupt, add a newline before the log.
_LOG.info('Got Ctrl-C; exiting...')
_exit(0)
def _exit_due_to_inotify_watch_limit():
# Show information and suggested commands in OSError: inotify limit reached.
_LOG.error('Inotify watch limit reached: run this in your terminal if '
'you are in Linux to temporarily increase inotify limit. \n')
_LOG.info(
_COLOR.green(' sudo sysctl fs.inotify.max_user_watches='
'$NEW_LIMIT$\n'))
_LOG.info(' Change $NEW_LIMIT$ with an integer number, '
'e.g., 20000 should be enough.')
_exit(0)
def _exit_due_to_inotify_instance_limit():
# Show information and suggested commands in OSError: inotify limit reached.
_LOG.error('Inotify instance limit reached: run this in your terminal if '
'you are in Linux to temporarily increase inotify limit. \n')
_LOG.info(
_COLOR.green(' sudo sysctl fs.inotify.max_user_instances='
'$NEW_LIMIT$\n'))
_LOG.info(' Change $NEW_LIMIT$ with an integer number, '
'e.g., 20000 should be enough.')
_exit(0)
def _exit_due_to_pigweed_not_installed():
# Show information and suggested commands when pigweed environment variable
# not found.
_LOG.error('Environment variable $PW_ROOT not defined or is defined '
'outside the current directory.')
_LOG.error('Did you forget to activate the Pigweed environment? '
'Try source ./activate.sh')
_LOG.error('Did you forget to install the Pigweed environment? '
'Try source ./bootstrap.sh')
_exit(1)
# Go over each directory inside of the current directory.
# If it is not on the path of elements in directories_to_exclude, add
# (directory, True) to subdirectories_to_watch and later recursively call
# Observer() on them.
# Otherwise add (directory, False) to subdirectories_to_watch and later call
# Observer() with recursion=False.
def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]):
"""Determine which subdirectory to watch recursively"""
try:
to_watch = Path(to_watch)
except TypeError:
assert False, "Please watch one directory at a time."
# Reformat to_exclude.
directories_to_exclude: List[Path] = [
to_watch.joinpath(directory_to_exclude)
for directory_to_exclude in to_exclude
if to_watch.joinpath(directory_to_exclude).is_dir()
]
# Split the relative path of directories_to_exclude (compared to to_watch),
# and generate all parent paths needed to be watched without recursion.
exclude_dir_parents = {to_watch}
for directory_to_exclude in directories_to_exclude:
parts = list(
Path(directory_to_exclude).relative_to(to_watch).parts)[:-1]
dir_tmp = to_watch
for part in parts:
dir_tmp = Path(dir_tmp, part)
exclude_dir_parents.add(dir_tmp)
# Go over all layers of directory. Append those that are the parents of
# directories_to_exclude to the list with recursion==False, and others
# with recursion==True.
for directory in exclude_dir_parents:
dir_path = Path(directory)
yield dir_path, False
for item in Path(directory).iterdir():
if (item.is_dir() and item not in exclude_dir_parents
and item not in directories_to_exclude):
yield item, True
def get_common_excludes() -> List[Path]:
"""Find commonly excluded directories, and return them as a [Path]"""
exclude_list: List[Path] = []
typical_ignored_directories: List[str] = [
'.environment', # Legacy bootstrap-created CIPD and Python venv.
'.presubmit', # Presubmit-created CIPD and Python venv.
'.git', # Pigweed's git repo.
'.mypy_cache', # Python static analyzer.
'.cargo', # Rust package manager.
'environment', # Bootstrap-created CIPD and Python venv.
'out', # Typical build directory.
]
# Preset exclude list for Pigweed's upstream directories.
pw_root_dir = Path(os.environ['PW_ROOT'])
exclude_list.extend(pw_root_dir / ignored_directory
for ignored_directory in typical_ignored_directories)
# Preset exclude for common downstream project structures.
#
# If watch is invoked outside of the Pigweed root, exclude common
# directories.
pw_project_root_dir = Path(os.environ['PW_PROJECT_ROOT'])
if pw_project_root_dir != pw_root_dir:
exclude_list.extend(
pw_project_root_dir / ignored_directory
for ignored_directory in typical_ignored_directories)
# Check for and warn about legacy directories.
legacy_directories = [
'.cipd', # Legacy CIPD location.
'.python3-venv', # Legacy Python venv location.
]
found_legacy = False
for legacy_directory in legacy_directories:
full_legacy_directory = pw_root_dir / legacy_directory
if full_legacy_directory.is_dir():
_LOG.warning('Legacy environment directory found: %s',
str(full_legacy_directory))
exclude_list.append(full_legacy_directory)
found_legacy = True
if found_legacy:
_LOG.warning('Found legacy environment directory(s); these '
'should be deleted')
return exclude_list
def _serve_docs(build_dir: Path, serve_docs_port: int,
serve_docs_path: Path) -> None:
if httpwatcher is None:
_LOG.warning(
'--serve-docs was specified, but httpwatcher is not available')
_LOG.info('Install httpwatcher to use --serve-docs')
return
def httpwatcher_thread():
# Disable logs from httpwatcher and deps
logging.getLogger('httpwatcher').setLevel(logging.CRITICAL)
logging.getLogger('tornado').setLevel(logging.CRITICAL)
docs_path = build_dir.joinpath(serve_docs_path.joinpath('html'))
httpwatcher.watch(docs_path, host='127.0.0.1', port=serve_docs_port)
# Spin up an httpwatcher in a new thread since it blocks
threading.Thread(None, httpwatcher_thread, 'httpwatcher').start()
def watch_setup(
default_build_targets: List[str],
build_directories: List[str],
patterns: str,
ignore_patterns_string: str,
exclude_list: List[Path],
restart: bool,
jobs: Optional[int],
serve_docs: bool,
serve_docs_port: int,
serve_docs_path: Path,
fullscreen: bool,
banners: bool,
keep_going: bool,
debug_logging: bool, # pylint: disable=unused-argument
# pylint: disable=too-many-arguments
) -> Tuple[str, PigweedBuildWatcher, List[Path]]:
"""Watches files and runs Ninja commands when they change."""
_LOG.info('Starting Pigweed build watcher')
# Get pigweed directory information from environment variable PW_ROOT.
if os.environ['PW_ROOT'] is None:
_exit_due_to_pigweed_not_installed()
pw_root = Path(os.environ['PW_ROOT']).resolve()
if Path.cwd().resolve() not in [pw_root, *pw_root.parents]:
_exit_due_to_pigweed_not_installed()
# Preset exclude list for pigweed directory.
exclude_list += get_common_excludes()
# Add build directories to the exclude list.
exclude_list.extend(
Path(build_dir[0]).resolve() for build_dir in build_directories)
build_commands = [
BuildCommand(Path(build_dir[0]), tuple(build_dir[1:]))
for build_dir in build_directories
]
# If no build directory was specified, check for out/build.ninja.
if default_build_targets or not build_directories:
# Make sure we found something; if not, bail.
if not Path('out').exists():
_die("No build dirs found. Did you forget to run 'gn gen out'?")
build_commands.append(
BuildCommand(Path('out'), tuple(default_build_targets)))
# Verify that the build output directories exist.
for i, build_target in enumerate(build_commands, 1):
if not build_target.build_dir.is_dir():
_die("Build directory doesn't exist: %s", build_target)
else:
_LOG.info('Will build [%d/%d]: %s', i, len(build_commands),
build_target)
_LOG.debug('Patterns: %s', patterns)
if serve_docs:
_serve_docs(build_commands[0].build_dir, serve_docs_port,
serve_docs_path)
# Try to make a short display path for the watched directory that has
# "$HOME" instead of the full home directory. This is nice for users
# who have deeply nested home directories.
path_to_log = str(Path().resolve()).replace(str(Path.home()), '$HOME')
# Ignore the user-specified patterns.
ignore_patterns = (ignore_patterns_string.split(_WATCH_PATTERN_DELIMITER)
if ignore_patterns_string else [])
env = pw_cli.env.pigweed_environment()
if env.PW_EMOJI:
charset = _EMOJI_CHARSET
else:
charset = _ASCII_CHARSET
event_handler = PigweedBuildWatcher(
build_commands=build_commands,
patterns=patterns.split(_WATCH_PATTERN_DELIMITER),
ignore_patterns=ignore_patterns,
charset=charset,
restart=restart,
jobs=jobs,
fullscreen=fullscreen,
banners=banners,
keep_going=keep_going,
)
return path_to_log, event_handler, exclude_list
def watch(path_to_log: Path, event_handler: PigweedBuildWatcher,
exclude_list: List[Path]):
"""Watches files and runs Ninja commands when they change."""
try:
# It can take awhile to configure the filesystem watcher, so have the
# message reflect that with the "...". Run inside the try: to
# gracefully handle the user Ctrl-C'ing out during startup.
_LOG.info('Attaching filesystem watcher to %s/...', path_to_log)
# Observe changes for all files in the root directory. Whether the
# directory should be observed recursively or not is determined by the
# second element in subdirectories_to_watch.
observers = []
for path, rec in minimal_watch_directories(Path.cwd(), exclude_list):
observer = Observer()
observer.schedule(
event_handler,
str(path),
recursive=rec,
)
observer.start()
observers.append(observer)
event_handler.debouncer.press('Triggering initial build...')
for observer in observers:
while observer.is_alive():
observer.join(1)
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
except OSError as err:
if err.args[0] == _ERRNO_INOTIFY_LIMIT_REACHED:
_exit_due_to_inotify_watch_limit()
if err.errno == errno.EMFILE:
_exit_due_to_inotify_instance_limit()
raise err
_LOG.critical('Should never get here')
observer.join()
def main() -> None:
"""Watch files for changes and rebuild."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
add_parser_arguments(parser)
args = parser.parse_args()
path_to_log, event_handler, exclude_list = watch_setup(**vars(args))
if args.fullscreen:
watch_logfile = (pw_console.python_logging.create_temp_log_file(
prefix=__package__))
pw_cli.log.install(
level=logging.DEBUG,
use_color=True,
hide_timestamp=False,
log_file=watch_logfile,
)
pw_console.python_logging.setup_python_logging(
last_resort_filename=watch_logfile)
watch_thread = Thread(target=watch,
args=(path_to_log, event_handler, exclude_list),
daemon=True)
watch_thread.start()
watch_app = WatchApp(event_handler=event_handler,
debug_logging=args.debug_logging,
log_file_name=watch_logfile)
event_handler.watch_app = watch_app
watch_app.run()
else:
pw_cli.log.install(
level=logging.DEBUG if args.debug_logging else logging.INFO,
use_color=True,
hide_timestamp=False,
)
watch(Path(path_to_log), event_handler, exclude_list)
if __name__ == '__main__':
main()