blob: 79091623a8a682cbdb929e2f50e5725f793d8b44 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Watch files for changes and rebuild.
pw watch runs Ninja in a build directory when source files change. It works with
any Ninja project (GN or CMake).
Usage examples:
# Find a build directory and build the default target
pw watch
# Find a build directory and build the stm32f429i target
pw watch python.lint stm32f429i
# Build pw_run_tests.modules in the out/cmake directory
pw watch -C out/cmake pw_run_tests.modules
# Build the default target in out/ and pw_apps in out/cmake
pw watch -C out -C out/cmake pw_apps
# Find a directory and build python.tests, and build pw_apps in out/cmake
pw watch python.tests -C out/cmake pw_apps
"""
import argparse
from dataclasses import dataclass
import logging
import os
from pathlib import Path
import shlex
import subprocess
import sys
import threading
from typing import (Iterable, List, NamedTuple, NoReturn, Optional, Sequence,
Tuple)
from watchdog.events import FileSystemEventHandler # type: ignore[import]
from watchdog.observers import Observer # type: ignore[import]
import pw_cli.branding
import pw_cli.color
import pw_cli.env
import pw_cli.plugins
from pw_watch.debounce import DebouncedFunction, Debouncer
_COLOR = pw_cli.color.colors()
_LOG = logging.getLogger(__name__)
_ERRNO_INOTIFY_LIMIT_REACHED = 28
# Suppress events under 'fsevents', generated by watchdog on every file
# event on MacOS.
# TODO(b/182281481): Fix file ignoring, rather than just suppressing logs
_FSEVENTS_LOG = logging.getLogger('fsevents')
_FSEVENTS_LOG.setLevel(logging.WARNING)
_PASS_MESSAGE = """
██████╗ █████╗ ███████╗███████╗██╗
██╔══██╗██╔══██╗██╔════╝██╔════╝██║
██████╔╝███████║███████╗███████╗██║
██╔═══╝ ██╔══██║╚════██║╚════██║╚═╝
██║ ██║ ██║███████║███████║██╗
╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝
"""
# Pick a visually-distinct font from "PASS" to ensure that readers can't
# possibly mistake the difference between the two states.
_FAIL_MESSAGE = """
▄██████▒░▄▄▄ ██▓ ░██▓
▓█▓ ░▒████▄ ▓██▒ ░▓██▒
▒████▒ ░▒█▀ ▀█▄ ▒██▒ ▒██░
░▓█▒ ░░██▄▄▄▄██ ░██░ ▒██░
░▒█░ ▓█ ▓██▒░██░░ ████████▒
▒█░ ▒▒ ▓▒█░░▓ ░ ▒░▓ ░
░▒ ▒ ▒▒ ░ ▒ ░░ ░ ▒ ░
░ ░ ░ ▒ ▒ ░ ░ ░
░ ░ ░ ░ ░
"""
# TODO(keir): Figure out a better strategy for exiting. The problem with the
# watcher is that doing a "clean exit" is slow. However, by directly exiting,
# we remove the possibility of the wrapper script doing anything on exit.
def _die(*args) -> NoReturn:
_LOG.critical(*args)
sys.exit(1)
class WatchCharset(NamedTuple):
slug_ok: str
slug_fail: str
_ASCII_CHARSET = WatchCharset(_COLOR.green('OK '), _COLOR.red('FAIL'))
_EMOJI_CHARSET = WatchCharset('✔️ ', '💥')
@dataclass(frozen=True)
class BuildCommand:
build_dir: Path
targets: Tuple[str, ...] = ()
def args(self) -> Tuple[str, ...]:
return (str(self.build_dir), *self.targets)
def __str__(self) -> str:
return ' '.join(shlex.quote(arg) for arg in self.args())
def git_ignored(file: Path) -> bool:
"""Returns true if this file is in a Git repo and ignored by that repo.
Returns true for ignored files that were manually added to a repo.
"""
file = file.resolve()
directory = file.parent
# Run the Git command from file's parent so that the correct repo is used.
while True:
try:
returncode = subprocess.run(
['git', 'check-ignore', '--quiet', '--no-index', file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=directory).returncode
return returncode in (0, 128)
except FileNotFoundError:
# If the directory no longer exists, try parent directories until
# an existing directory is found or all directories have been
# checked. This approach makes it possible to check if a deleted
# path is ignored in the repo it was originally created in.
if directory == directory.parent:
return False
directory = directory.parent
class PigweedBuildWatcher(FileSystemEventHandler, DebouncedFunction):
"""Process filesystem events and launch builds if necessary."""
def __init__(
self,
patterns: Sequence[str] = (),
ignore_patterns: Sequence[str] = (),
build_commands: Sequence[BuildCommand] = (),
charset: WatchCharset = _ASCII_CHARSET,
restart: bool = True,
):
super().__init__()
self.patterns = patterns
self.ignore_patterns = ignore_patterns
self.build_commands = build_commands
self.charset: WatchCharset = charset
self.restart_on_changes = restart
self._current_build: subprocess.Popen
self.debouncer = Debouncer(self)
# Track state of a build. These need to be members instead of locals
# due to the split between dispatch(), run(), and on_complete().
self.matching_path: Optional[Path] = None
self.builds_succeeded: List[bool] = []
self.wait_for_keypress_thread = threading.Thread(
None, self._wait_for_enter)
self.wait_for_keypress_thread.start()
def _wait_for_enter(self) -> NoReturn:
try:
while True:
_ = input()
self._current_build.kill()
self.debouncer.press('Manual build requested...')
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
def _path_matches(self, path: Path) -> bool:
"""Returns true if path matches according to the watcher patterns"""
return (not any(path.match(x) for x in self.ignore_patterns)
and any(path.match(x) for x in self.patterns))
def dispatch(self, event) -> None:
# There isn't any point in triggering builds on new directory creation.
# It's the creation or modification of files that indicate something
# meaningful enough changed for a build.
if event.is_directory:
return
# Collect paths of interest from the event.
paths: List[str] = []
if hasattr(event, 'dest_path'):
paths.append(os.fsdecode(event.dest_path))
if event.src_path:
paths.append(os.fsdecode(event.src_path))
for raw_path in paths:
_LOG.debug('File event: %s', raw_path)
# Check whether Git cares about any of these paths.
for path in (Path(p).resolve() for p in paths):
if not git_ignored(path) and self._path_matches(path):
self._handle_matched_event(path)
return
def _handle_matched_event(self, matching_path: Path) -> None:
if self.matching_path is None:
self.matching_path = matching_path
self.debouncer.press(
f'File change detected: {os.path.relpath(matching_path)}')
# Implementation of DebouncedFunction.run()
#
# Note: This will run on the timer thread created by the Debouncer, rather
# than on the main thread that's watching file events. This enables the
# watcher to continue receiving file change events during a build.
def run(self) -> None:
"""Run all the builds in serial and capture pass/fail for each."""
# Clear the screen and show a banner indicating the build is starting.
print('\033c', end='') # TODO(pwbug/38): Not Windows compatible.
print(pw_cli.branding.banner())
print(
_COLOR.green(
' Watching for changes. Ctrl-C to exit; enter to rebuild'))
print()
_LOG.info('Change detected: %s', self.matching_path)
self.builds_succeeded = []
num_builds = len(self.build_commands)
_LOG.info('Starting build with %d directories', num_builds)
env = os.environ.copy()
# Force colors in Pigweed subcommands run through the watcher.
env['PW_USE_COLOR'] = '1'
for i, cmd in enumerate(self.build_commands, 1):
_LOG.info('[%d/%d] Starting build: %s', i, num_builds, cmd)
# Run the build. Put a blank before/after for visual separation.
print()
self._current_build = subprocess.Popen(
['ninja', '-C', *cmd.args()], env=env)
returncode = self._current_build.wait()
print()
build_ok = (returncode == 0)
if build_ok:
level = logging.INFO
tag = '(OK)'
else:
level = logging.ERROR
tag = '(FAIL)'
_LOG.log(level, '[%d/%d] Finished build: %s %s', i, num_builds,
cmd, tag)
self.builds_succeeded.append(build_ok)
# Implementation of DebouncedFunction.cancel()
def cancel(self) -> bool:
if self.restart_on_changes:
self._current_build.kill()
return True
return False
# Implementation of DebouncedFunction.run()
def on_complete(self, cancelled: bool = False) -> None:
# First, use the standard logging facilities to report build status.
if cancelled:
_LOG.error('Finished; build was interrupted')
elif all(self.builds_succeeded):
_LOG.info('Finished; all successful')
else:
_LOG.info('Finished; some builds failed')
# Then, show a more distinct colored banner.
if not cancelled:
# Write out build summary table so you can tell which builds passed
# and which builds failed.
print()
print(' .------------------------------------')
print(' |')
for (succeeded, cmd) in zip(self.builds_succeeded,
self.build_commands):
slug = (self.charset.slug_ok
if succeeded else self.charset.slug_fail)
print(f' | {slug} {cmd}')
print(' |')
print(" '------------------------------------")
else:
# Build was interrupted.
print()
print(' .------------------------------------')
print(' |')
print(' | ', self.charset.slug_fail, '- interrupted')
print(' |')
print(" '------------------------------------")
# Show a large color banner so it is obvious what the overall result is.
if all(self.builds_succeeded) and not cancelled:
print(_COLOR.green(_PASS_MESSAGE))
else:
print(_COLOR.red(_FAIL_MESSAGE))
self.matching_path = None
# Implementation of DebouncedFunction.on_keyboard_interrupt()
def on_keyboard_interrupt(self) -> NoReturn:
_exit_due_to_interrupt()
_WATCH_PATTERN_DELIMITER = ','
_WATCH_PATTERNS = (
'*.bloaty',
'*.c',
'*.cc',
'*.css',
'*.cpp',
'*.cmake',
'CMakeLists.txt',
'*.gn',
'*.gni',
'*.go',
'*.h',
'*.hpp',
'*.ld',
'*.md',
'*.options',
'*.proto',
'*.py',
'*.rst',
)
def add_parser_arguments(parser: argparse.ArgumentParser) -> None:
"""Sets up an argument parser for pw watch."""
parser.add_argument('--patterns',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'watch to trigger recompile'),
default=_WATCH_PATTERN_DELIMITER.join(_WATCH_PATTERNS))
parser.add_argument('--ignore_patterns',
dest='ignore_patterns_string',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'ignore events from'))
parser.add_argument('--exclude_list',
nargs='+',
type=Path,
help='directories to ignore during pw watch',
default=[])
parser.add_argument('--no-restart',
dest='restart',
action='store_false',
help='do not restart ongoing builds if files change')
parser.add_argument(
'default_build_targets',
nargs='*',
metavar='target',
default=[],
help=('Automatically locate a build directory and build these '
'targets. For example, `host docs` searches for a Ninja '
'build directory (starting with out/) and builds the '
'`host` and `docs` targets. To specify one or more '
'directories, ust the -C / --build_directory option.'))
parser.add_argument(
'-C',
'--build_directory',
dest='build_directories',
nargs='+',
action='append',
default=[],
metavar=('directory', 'target'),
help=('Specify a build directory and optionally targets to '
'build. `pw watch -C out tgt` is equivalent to `ninja '
'-C out tgt`'))
def _exit(code: int) -> NoReturn:
# Note: The "proper" way to exit is via observer.stop(), then
# running a join. However it's slower, so just exit immediately.
#
# Additionally, since there are several threads in the watcher, the usual
# sys.exit approach doesn't work. Instead, run the low level exit which
# kills all threads.
os._exit(code) # pylint: disable=protected-access
def _exit_due_to_interrupt() -> NoReturn:
# To keep the log lines aligned with each other in the presence of
# a '^C' from the keyboard interrupt, add a newline before the log.
print()
print()
_LOG.info('Got Ctrl-C; exiting...')
_exit(0)
def _exit_due_to_inotify_limit():
# Show information and suggested commands in OSError: inotify limit reached.
_LOG.error('Inotify limit reached: run this in your terminal if you '
'are in Linux to temporarily increase inotify limit. \n')
print(
_COLOR.green(' sudo sysctl fs.inotify.max_user_watches='
'$NEW_LIMIT$\n'))
print(' Change $NEW_LIMIT$ with an integer number, '
'e.g., 1000 should be enough.')
_exit(0)
def _exit_due_to_pigweed_not_installed():
# Show information and suggested commands when pigweed environment variable
# not found.
_LOG.error('Environment variable $PW_ROOT not defined or is defined '
'outside the current directory.')
_LOG.error('Did you forget to activate the Pigweed environment? '
'Try source ./activate.sh')
_LOG.error('Did you forget to install the Pigweed environment? '
'Try source ./bootstrap.sh')
_exit(1)
# Go over each directory inside of the current directory.
# If it is not on the path of elements in directories_to_exclude, add
# (directory, True) to subdirectories_to_watch and later recursively call
# Observer() on them.
# Otherwise add (directory, False) to subdirectories_to_watch and later call
# Observer() with recursion=False.
def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]):
"""Determine which subdirectory to watch recursively"""
try:
to_watch = Path(to_watch)
except TypeError:
assert False, "Please watch one directory at a time."
# Reformat to_exclude.
directories_to_exclude: List[Path] = [
to_watch.joinpath(directory_to_exclude)
for directory_to_exclude in to_exclude
if to_watch.joinpath(directory_to_exclude).is_dir()
]
# Split the relative path of directories_to_exclude (compared to to_watch),
# and generate all parent paths needed to be watched without recursion.
exclude_dir_parents = {to_watch}
for directory_to_exclude in directories_to_exclude:
parts = list(
Path(directory_to_exclude).relative_to(to_watch).parts)[:-1]
dir_tmp = to_watch
for part in parts:
dir_tmp = Path(dir_tmp, part)
exclude_dir_parents.add(dir_tmp)
# Go over all layers of directory. Append those that are the parents of
# directories_to_exclude to the list with recursion==False, and others
# with recursion==True.
for directory in exclude_dir_parents:
dir_path = Path(directory)
yield dir_path, False
for item in Path(directory).iterdir():
if (item.is_dir() and item not in exclude_dir_parents
and item not in directories_to_exclude):
yield item, True
def get_common_excludes() -> List[Path]:
"""Find commonly excluded directories, and return them as a [Path]"""
exclude_list: List[Path] = []
typical_ignored_directories: List[str] = [
'.environment', # Legacy bootstrap-created CIPD and Python venv.
'.presubmit', # Presubmit-created CIPD and Python venv.
'.git', # Pigweed's git repo.
'.mypy_cache', # Python static analyzer.
'.cargo', # Rust package manager.
'environment', # Bootstrap-created CIPD and Python venv.
'out', # Typical build directory.
]
# Preset exclude list for Pigweed's upstream directories.
pw_root_dir = Path(os.environ['PW_ROOT'])
exclude_list.extend(pw_root_dir / ignored_directory
for ignored_directory in typical_ignored_directories)
# Preset exclude for common downstream project structures.
#
# If watch is invoked outside of the Pigweed root, exclude common
# directories.
pw_project_root_dir = Path(os.environ['PW_PROJECT_ROOT'])
if pw_project_root_dir != pw_root_dir:
exclude_list.extend(
pw_project_root_dir / ignored_directory
for ignored_directory in typical_ignored_directories)
# Check for and warn about legacy directories.
legacy_directories = [
'.cipd', # Legacy CIPD location.
'.python3-venv', # Legacy Python venv location.
]
found_legacy = False
for legacy_directory in legacy_directories:
full_legacy_directory = pw_root_dir / legacy_directory
if full_legacy_directory.is_dir():
_LOG.warning('Legacy environment directory found: %s',
str(full_legacy_directory))
exclude_list.append(full_legacy_directory)
found_legacy = True
if found_legacy:
_LOG.warning('Found legacy environment directory(s); these '
'should be deleted')
return exclude_list
def _find_build_dir(default_build_dir: Path = Path('out')) -> Optional[Path]:
"""Searches for a build directory, returning the first it finds."""
# Give priority to out/, then something under out/.
if default_build_dir.joinpath('build.ninja').exists():
return default_build_dir
for path in default_build_dir.glob('**/build.ninja'):
return path.parent
for path in Path.cwd().glob('**/build.ninja'):
return path.parent
return None
def watch(default_build_targets: List[str], build_directories: List[str],
patterns: str, ignore_patterns_string: str, exclude_list: List[Path],
restart: bool):
"""Watches files and runs Ninja commands when they change."""
_LOG.info('Starting Pigweed build watcher')
# Get pigweed directory information from environment variable PW_ROOT.
if os.environ['PW_ROOT'] is None:
_exit_due_to_pigweed_not_installed()
pw_root = Path(os.environ['PW_ROOT']).resolve()
if Path.cwd().resolve() not in [pw_root, *pw_root.parents]:
_exit_due_to_pigweed_not_installed()
# Preset exclude list for pigweed directory.
exclude_list += get_common_excludes()
build_commands = [
BuildCommand(Path(build_dir[0]), tuple(build_dir[1:]))
for build_dir in build_directories
]
# If no build directory was specified, search the tree for a build.ninja.
if default_build_targets or not build_directories:
build_dir = _find_build_dir()
# Make sure we found something; if not, bail.
if build_dir is None:
_die("No build dirs found. Did you forget to run 'gn gen out'?")
build_commands.append(
BuildCommand(build_dir, tuple(default_build_targets)))
# Verify that the build output directories exist.
for i, build_target in enumerate(build_commands, 1):
if not build_target.build_dir.is_dir():
_die("Build directory doesn't exist: %s", build_target)
else:
_LOG.info('Will build [%d/%d]: %s', i, len(build_commands),
build_target)
_LOG.debug('Patterns: %s', patterns)
# Try to make a short display path for the watched directory that has
# "$HOME" instead of the full home directory. This is nice for users
# who have deeply nested home directories.
path_to_log = str(Path().resolve()).replace(str(Path.home()), '$HOME')
# Ignore the user-specified patterns.
ignore_patterns = (ignore_patterns_string.split(_WATCH_PATTERN_DELIMITER)
if ignore_patterns_string else [])
env = pw_cli.env.pigweed_environment()
if env.PW_EMOJI:
charset = _EMOJI_CHARSET
else:
charset = _ASCII_CHARSET
event_handler = PigweedBuildWatcher(
patterns=patterns.split(_WATCH_PATTERN_DELIMITER),
ignore_patterns=ignore_patterns,
build_commands=build_commands,
charset=charset,
restart=restart,
)
try:
# It can take awhile to configure the filesystem watcher, so have the
# message reflect that with the "...". Run inside the try: to
# gracefully handle the user Ctrl-C'ing out during startup.
_LOG.info('Attaching filesystem watcher to %s/...', path_to_log)
# Observe changes for all files in the root directory. Whether the
# directory should be observed recursively or not is determined by the
# second element in subdirectories_to_watch.
observers = []
for path, rec in minimal_watch_directories(Path.cwd(), exclude_list):
observer = Observer()
observer.schedule(
event_handler,
str(path),
recursive=rec,
)
observer.start()
observers.append(observer)
event_handler.debouncer.press('Triggering initial build...')
for observer in observers:
while observer.is_alive():
observer.join(1)
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
except OSError as err:
if err.args[0] == _ERRNO_INOTIFY_LIMIT_REACHED:
_exit_due_to_inotify_limit()
else:
raise err
_LOG.critical('Should never get here')
observer.join()
def main() -> None:
"""Watch files for changes and rebuild."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
add_parser_arguments(parser)
watch(**vars(parser.parse_args()))
if __name__ == '__main__':
main()