blob: 7f029806f3aff91931bfbc13b1d79807efe48961 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Rebuild every time a file is changed."""
import argparse
from dataclasses import dataclass
import glob
import logging
import os
import pathlib
import shlex
import subprocess
import sys
import threading
from typing import List, NamedTuple, Optional, Sequence, Tuple
from watchdog.events import FileSystemEventHandler # type: ignore
from watchdog.observers import Observer # type: ignore
from watchdog.utils import has_attribute, unicode_paths # type: ignore
import pw_cli.branding
import pw_cli.color
import pw_cli.env
import pw_cli.plugins
from pw_watch.debounce import DebouncedFunction, Debouncer
_COLOR = pw_cli.color.colors()
_LOG = logging.getLogger(__name__)
_ERRNO_INOTIFY_LIMIT_REACHED = 28
_PASS_MESSAGE = """
██████╗ █████╗ ███████╗███████╗██╗
██╔══██╗██╔══██╗██╔════╝██╔════╝██║
██████╔╝███████║███████╗███████╗██║
██╔═══╝ ██╔══██║╚════██║╚════██║╚═╝
██║ ██║ ██║███████║███████║██╗
╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝
"""
# Pick a visually-distinct font from "PASS" to ensure that readers can't
# possibly mistake the difference between the two states.
_FAIL_MESSAGE = """
▄██████▒░▄▄▄ ██▓ ░██▓
▓█▓ ░▒████▄ ▓██▒ ░▓██▒
▒████▒ ░▒█▀ ▀█▄ ▒██▒ ▒██░
░▓█▒ ░░██▄▄▄▄██ ░██░ ▒██░
░▒█░ ▓█ ▓██▒░██░░ ████████▒
▒█░ ▒▒ ▓▒█░░▓ ░ ▒░▓ ░
░▒ ▒ ▒▒ ░ ▒ ░░ ░ ▒ ░
░ ░ ░ ▒ ▒ ░ ░ ░
░ ░ ░ ░ ░
"""
# TODO(keir): Figure out a better strategy for exiting. The problem with the
# watcher is that doing a "clean exit" is slow. However, by directly exiting,
# we remove the possibility of the wrapper script doing anything on exit.
def _die(*args):
_LOG.fatal(*args)
sys.exit(1)
# pylint: disable=logging-format-interpolation
class WatchCharset(NamedTuple):
slug_ok: str
slug_fail: str
_ASCII_CHARSET = WatchCharset(_COLOR.green('OK '), _COLOR.red('FAIL'))
_EMOJI_CHARSET = WatchCharset('✔️ ', '💥')
@dataclass(frozen=True)
class BuildCommand:
build_dir: pathlib.Path
targets: Tuple[str, ...] = ()
def args(self) -> Tuple[str, ...]:
return (str(self.build_dir), *self.targets)
def __str__(self) -> str:
return ' '.join(shlex.quote(arg) for arg in self.args())
class PigweedBuildWatcher(FileSystemEventHandler, DebouncedFunction):
"""Process filesystem events and launch builds if necessary."""
def __init__(
self,
patterns: Sequence[str] = (),
ignore_patterns: Sequence[str] = (),
build_commands: Sequence[BuildCommand] = (),
ignore_dirs=Optional[List[str]],
charset: WatchCharset = _ASCII_CHARSET,
restart: bool = False,
):
super().__init__()
self.patterns = patterns
self.ignore_patterns = ignore_patterns
self.build_commands = build_commands
self.ignore_dirs = ignore_dirs or []
self.ignore_dirs.extend(cmd.build_dir for cmd in self.build_commands)
self.charset: WatchCharset = charset
self.restart_on_changes = restart
self._current_build: Optional[subprocess.Popen] = None
self.debouncer = Debouncer(self)
# Track state of a build. These need to be members instead of locals
# due to the split between dispatch(), run(), and on_complete().
self.matching_path = None
self.builds_succeeded: List[bool] = []
self.wait_for_keypress_thread = threading.Thread(
None, self._wait_for_enter)
self.wait_for_keypress_thread.start()
def _wait_for_enter(self):
try:
while True:
_ = input()
self.debouncer.press('Manual build requested...')
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
def path_matches(self, raw_path):
"""Returns true if path matches according to the watcher patterns"""
modified_path = pathlib.Path(raw_path).resolve()
# Check for modifications inside the ignore directories, and skip them.
# Ideally these events would never hit the watcher, but selectively
# watching directories at the OS level is not trivial due to limitations
# of the watchdog module.
for ignore_dir in self.ignore_dirs:
resolved_ignore_dir = pathlib.Path(ignore_dir).resolve()
try:
modified_path.relative_to(resolved_ignore_dir)
# If no ValueError is raised by the .relative_to() call, then
# this file is inside the ignore directory; so skip it.
return False
except ValueError:
# Otherwise, the file isn't in the ignore directory, so run the
# normal pattern checks below.
pass
return ((not any(modified_path.match(x) for x in self.ignore_patterns))
and any(modified_path.match(x) for x in self.patterns))
def dispatch(self, event):
# There isn't any point in triggering builds on new directory creation.
# It's the creation or modification of files that indicate something
# meaningful enough changed for a build.
if event.is_directory:
return
# Collect paths of interest from the event.
paths = []
if has_attribute(event, 'dest_path'):
paths.append(unicode_paths.decode(event.dest_path))
if event.src_path:
paths.append(unicode_paths.decode(event.src_path))
for path in paths:
_LOG.debug('File event: %s', path)
# Check for matching paths among the one or two in the event.
matching_path = None
for path in paths:
if self.path_matches(path):
_LOG.debug('Detected event: %s', path)
matching_path = path
break
if matching_path:
self.handle_matched_event(matching_path)
def handle_matched_event(self, matching_path):
if self.matching_path is None:
self.matching_path = matching_path
self.debouncer.press(
f'File change detected: {os.path.relpath(matching_path)}')
# Implementation of DebouncedFunction.run()
#
# Note: This will run on the timer thread created by the Debouncer, rather
# than on the main thread that's watching file events. This enables the
# watcher to continue receiving file change events during a build.
def run(self):
"""Run all the builds in serial and capture pass/fail for each."""
# Clear the screen and show a banner indicating the build is starting.
print('\033c', end='') # TODO(pwbug/38): Not Windows compatible.
print(pw_cli.branding.banner())
print(
_COLOR.green(
' Watching for changes. Ctrl-C to exit; enter to rebuild'))
print()
_LOG.info('Change detected: %s', self.matching_path)
self.builds_succeeded = []
num_builds = len(self.build_commands)
_LOG.info('Starting build with %d directories', num_builds)
env = os.environ.copy()
# Force colors in Pigweed subcommands run through the watcher.
env['PW_USE_COLOR'] = '1'
for i, cmd in enumerate(self.build_commands, 1):
_LOG.info('[%d/%d] Starting build: %s', i, num_builds, cmd)
# Run the build. Put a blank before/after for visual separation.
print()
self._current_build = subprocess.Popen(
['ninja', '-C', *cmd.args()], env=env)
returncode = self._current_build.wait()
print()
build_ok = (returncode == 0)
if build_ok:
level = logging.INFO
tag = '(OK)'
else:
level = logging.ERROR
tag = '(FAIL)'
_LOG.log(level, '[%d/%d] Finished build: %s %s', i, num_builds,
cmd, tag)
self.builds_succeeded.append(build_ok)
# Implementation of DebouncedFunction.cancel()
def cancel(self):
if self.restart_on_changes:
self._current_build.terminate()
return True
return False
# Implementation of DebouncedFunction.run()
def on_complete(self, cancelled=False):
# First, use the standard logging facilities to report build status.
if cancelled:
_LOG.error('Finished; build was interrupted')
elif all(self.builds_succeeded):
_LOG.info('Finished; all successful')
else:
_LOG.info('Finished; some builds failed')
# Then, show a more distinct colored banner.
if not cancelled:
# Write out build summary table so you can tell which builds passed
# and which builds failed.
print()
print(' .------------------------------------')
print(' |')
for (succeeded, cmd) in zip(self.builds_succeeded,
self.build_commands):
slug = (self.charset.slug_ok
if succeeded else self.charset.slug_fail)
print(f' | {slug} {cmd}')
print(' |')
print(" '------------------------------------")
else:
# Build was interrupted.
print()
print(' .------------------------------------')
print(' |')
print(' | ', self.charset.slug_fail, '- interrupted')
print(' |')
print(" '------------------------------------")
# Show a large color banner so it is obvious what the overall result is.
if all(self.builds_succeeded) and not cancelled:
print(_COLOR.green(_PASS_MESSAGE))
else:
print(_COLOR.red(_FAIL_MESSAGE))
self.matching_path = None
# Implementation of DebouncedFunction.on_keyboard_interrupt()
def on_keyboard_interrupt(self):
_exit_due_to_interrupt()
_WATCH_PATTERN_DELIMITER = ','
_WATCH_PATTERNS = (
'*.bloaty',
'*.c',
'*.cc',
'*.cpp',
'*.cmake',
'CMakeLists.txt',
'*.gn',
'*.gni',
'*.go',
'*.h',
'*.hpp',
'*.ld',
'*.md',
'*.options',
'*.proto',
'*.py',
'*.rst',
)
def add_parser_arguments(parser):
parser.add_argument('--patterns',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'watch to trigger recompile'),
default=_WATCH_PATTERN_DELIMITER.join(_WATCH_PATTERNS))
parser.add_argument('--ignore_patterns',
help=(_WATCH_PATTERN_DELIMITER +
'-delimited list of globs to '
'ignore events from'))
parser.add_argument('--exclude_list',
nargs='+',
help=('directories to ignore during pw watch'),
default=[])
parser.add_argument('--restart',
action='store_true',
help='restart an ongoing build if files change')
parser.add_argument(
'build_targets',
nargs='*',
default=[],
help=('A Ninja directory to build, followed by specific targets to '
'build. For example, `out host docs` builds the `host` and '
'`docs` Ninja targets in the `out` directory. To build '
'additional directories, use `--build-directory`.'))
parser.add_argument(
'--build-directory',
nargs='+',
action='append',
default=[],
metavar=('dir', 'target'),
help=('Allows additional build directories to be specified. Uses the '
'same syntax as `build_targets`.'))
def _exit(code):
# Note: The "proper" way to exit is via observer.stop(), then
# running a join. However it's slower, so just exit immediately.
#
# Additionally, since there are several threads in the watcher, the usual
# sys.exit approach doesn't work. Instead, run the low level exit which
# kills all threads.
os._exit(code) # pylint: disable=protected-access
def _exit_due_to_interrupt():
# To keep the log lines aligned with each other in the presence of
# a '^C' from the keyboard interrupt, add a newline before the log.
print()
print()
_LOG.info('Got Ctrl-C; exiting...')
_exit(0)
def _exit_due_to_inotify_limit():
# Show information and suggested commands in OSError: inotify limit reached.
_LOG.error('Inotify limit reached: run this in your terminal if you '
'are in Linux to temporarily increase inotify limit. \n')
print(
_COLOR.green(' sudo sysctl fs.inotify.max_user_watches='
'$NEW_LIMIT$\n'))
print(' Change $NEW_LIMIT$ with an integer number, '
'e.g., 1000 should be enough.')
_exit(0)
def _exit_due_to_pigweed_not_installed():
# Show information and suggested commands when pigweed environment variable
# not found.
_LOG.error('Environment variable $PW_ROOT not defined or is defined '
'outside the current directory.')
_LOG.error('Did you forget to activate the Pigweed environment? '
'Try source ./activate.sh')
_LOG.error('Did you forget to install the Pigweed environment? '
'Try source ./bootstrap.sh')
_exit(1)
def is_subdirectory(child, parent):
return (pathlib.Path(parent).resolve()
in pathlib.Path(pathlib.Path(child).resolve()).parents)
# Go over each directory inside of the current directory.
# If it is not on the path of elements in directories_to_exclude, add
# (directory, True) to subdirectories_to_watch and later recursively call
# Observer() on them.
# Otherwise add (directory, False) to subdirectories_to_watch and later call
# Observer() with recursion=False.
def minimal_watch_directories(directory_to_watch, directories_to_exclude):
"""Determine which subdirectory to watch recursively"""
try:
cur_dir = pathlib.Path(directory_to_watch)
except TypeError:
assert False, "Please watch one directory at a time."
subdirectories_to_watch = []
# Reformat directories_to_exclude.
directories_to_exclude = [
pathlib.Path(cur_dir, directory_to_exclude)
for directory_to_exclude in directories_to_exclude
if pathlib.Path(cur_dir, directory_to_exclude).is_dir()
]
# Split the relative path of directories_to_exclude (compared to
# directory_to_watch), and generate all parent paths needed to be
# watched without recursion.
exclude_dir_parents = {cur_dir}
for directory_to_exclude in directories_to_exclude:
parts = list(
pathlib.Path(directory_to_exclude).relative_to(cur_dir).parts)[:-1]
dir_tmp = cur_dir
for part in parts:
dir_tmp = pathlib.Path(dir_tmp, part)
exclude_dir_parents.add(dir_tmp)
# Go over all layers of directory. Append those that are the parents of
# directories_to_exclude to the list with recursion==False, and others
# with recursion==True.
for directory in exclude_dir_parents:
dir_path = pathlib.Path(directory)
subdirectories_to_watch.append((dir_path, False))
for item in pathlib.Path(directory).iterdir():
if (item.is_dir() and item not in exclude_dir_parents
and item not in directories_to_exclude):
subdirectories_to_watch.append((item, True))
return subdirectories_to_watch
def gitignore_patterns():
"""Load patterns in pw_root_dir/.gitignore and return as [str]"""
pw_root_dir = pathlib.Path(os.environ['PW_ROOT'])
# Get top level .gitignore entries
gitignore_path = pw_root_dir / pathlib.Path('.gitignore')
if gitignore_path.exists():
for line in gitignore_path.read_text().splitlines():
globname = line.strip()
# If line is empty or a comment.
if not globname or globname.startswith('#'):
continue
yield line
def get_common_excludes():
"""Find commonly excluded directories, and return them as a [Path]"""
exclude_list = []
# Preset exclude list for Pigweed's upstream directories.
pw_root_dir = pathlib.Path(os.environ['PW_ROOT'])
exclude_list.extend([
pw_root_dir / ignored_directory for ignored_directory in [
'.environment', # Bootstrap-created CIPD and Python venv.
'.presubmit', # Presubmit-created CIPD and Python venv.
'.git', # Pigweed's git repo.
'.mypy_cache', # Python static analyzer.
'.cargo', # Rust package manager.
'out', # Typical build directory.
]
])
# Preset exclude for common downstream project structures.
#
# By convention, Pigweed projects use "out" as a build directory, so if
# watch is invoked outside the Pigweed root, also ignore the local out
# directory.
cur_dir = pathlib.Path.cwd()
if cur_dir != pw_root_dir:
exclude_list.append(cur_dir / 'out')
# Check for and warn about legacy directories.
legacy_directories = [
'.cipd', # Legacy CIPD location.
'.python3-venv', # Legacy Python venv location.
]
found_legacy = False
for legacy_directory in legacy_directories:
full_legacy_directory = pw_root_dir / legacy_directory
if full_legacy_directory.is_dir():
_LOG.warning('Legacy environment directory found: %s',
str(full_legacy_directory))
exclude_list.append(full_legacy_directory)
found_legacy = True
if found_legacy:
_LOG.warning('Found legacy environment directory(s); these '
'should be deleted')
return exclude_list
def watch(build_targets, build_directory, patterns, ignore_patterns,
exclude_list, restart: bool):
"""TODO(keir) docstring"""
_LOG.info('Starting Pigweed build watcher')
# Get pigweed directory information from environment variable PW_ROOT.
if os.environ['PW_ROOT'] is None:
_exit_due_to_pigweed_not_installed()
path_of_pigweed = pathlib.Path(os.environ['PW_ROOT'])
cur_dir = pathlib.Path.cwd()
if (not (is_subdirectory(path_of_pigweed, cur_dir)
or path_of_pigweed == cur_dir)):
_exit_due_to_pigweed_not_installed()
# Preset exclude list for pigweed directory.
exclude_list += get_common_excludes()
subdirectories_to_watch = minimal_watch_directories(cur_dir, exclude_list)
# If no build directory was specified, search the tree for GN build
# directories and try to build them all. In the future this may cause
# slow startup, but for now this is fast enough.
build_commands = []
if not build_targets and not build_directory:
_LOG.info('Searching for GN build dirs...')
gn_args_files = []
if os.path.isfile('out/args.gn'):
gn_args_files += ['out/args.gn']
gn_args_files += glob.glob('out/*/args.gn')
for gn_args_file in gn_args_files:
gn_build_dir = pathlib.Path(gn_args_file).parent
gn_build_dir = gn_build_dir.resolve().relative_to(cur_dir)
if gn_build_dir.is_dir():
build_commands.append(BuildCommand(gn_build_dir))
else:
if build_targets:
build_directory.append(build_targets)
# Reformat the directory of build commands to be relative to the
# currently directory.
for build_target in build_directory:
build_commands.append(
BuildCommand(pathlib.Path(build_target[0]),
tuple(build_target[1:])))
# Make sure we found something; if not, bail.
if not build_commands:
_die("No build dirs found. Did you forget to 'gn gen out'?")
# Verify that the build output directories exist.
for i, build_target in enumerate(build_commands, 1):
if not build_target.build_dir.is_dir():
_die("Build directory doesn't exist: %s", build_target)
else:
_LOG.info('Will build [%d/%d]: %s', i, len(build_commands),
build_target)
_LOG.debug('Patterns: %s', patterns)
# Try to make a short display path for the watched directory that has
# "$HOME" instead of the full home directory. This is nice for users
# who have deeply nested home directories.
path_to_log = str(pathlib.Path().resolve()).replace(
str(pathlib.Path.home()), '$HOME')
# Ignore the user-specified patterns.
ignore_patterns = (ignore_patterns.split(_WATCH_PATTERN_DELIMITER)
if ignore_patterns else [])
# Ignore top level pw_root_dir/.gitignore patterns.
ignore_patterns += gitignore_patterns()
ignore_dirs = ['.presubmit', '.python3-env']
env = pw_cli.env.pigweed_environment()
if env.PW_EMOJI:
charset = _EMOJI_CHARSET
else:
charset = _ASCII_CHARSET
event_handler = PigweedBuildWatcher(
patterns=patterns.split(_WATCH_PATTERN_DELIMITER),
ignore_patterns=ignore_patterns,
build_commands=build_commands,
ignore_dirs=ignore_dirs,
charset=charset,
restart=restart,
)
try:
# It can take awhile to configure the filesystem watcher, so have the
# message reflect that with the "...". Run inside the try: to
# gracefully handle the user Ctrl-C'ing out during startup.
_LOG.info('Attaching filesystem watcher to %s/...', path_to_log)
# Observe changes for all files in the root directory. Whether the
# directory should be observed recursively or not is determined by the
# second element in subdirectories_to_watch.
observers = []
for directory, rec in subdirectories_to_watch:
observer = Observer()
observer.schedule(
event_handler,
str(directory),
recursive=rec,
)
observer.start()
observers.append(observer)
event_handler.debouncer.press('Triggering initial build...')
for observer in observers:
while observer.isAlive():
observer.join(1)
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
except OSError as err:
if err.args[0] == _ERRNO_INOTIFY_LIMIT_REACHED:
_exit_due_to_inotify_limit()
else:
raise err
_LOG.critical('Should never get here')
observer.join()
def main():
"""Watch files for changes and rebuild."""
parser = argparse.ArgumentParser(description=main.__doc__)
add_parser_arguments(parser)
watch(**vars(parser.parse_args()))
if __name__ == '__main__':
main()