pw_build: Parallel worker support
Adds --parallel to pw_build.project_builder and pw watch
This runs each out directory build in parallel.
Change-Id: Ia3b9dc1a05701890bf32f360916d7c3cc8ef2c50
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126520
Reviewed-by: Chad Norvell <chadnorvell@google.com>
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
diff --git a/pw_build/py/BUILD.gn b/pw_build/py/BUILD.gn
index 6322c84..16d6711 100644
--- a/pw_build/py/BUILD.gn
+++ b/pw_build/py/BUILD.gn
@@ -45,6 +45,7 @@
"pw_build/pip_install_python_deps.py",
"pw_build/project_builder.py",
"pw_build/project_builder_argparse.py",
+ "pw_build/project_builder_context.py",
"pw_build/project_builder_prefs.py",
"pw_build/python_package.py",
"pw_build/python_runner.py",
diff --git a/pw_build/py/pw_build/build_recipe.py b/pw_build/py/pw_build/build_recipe.py
index 3387afb..117d488 100644
--- a/pw_build/py/pw_build/build_recipe.py
+++ b/pw_build/py/pw_build/build_recipe.py
@@ -207,6 +207,9 @@
return self.return_code != 0
return False
+ def set_passed(self) -> None:
+ self.return_code = 0
+
def passed(self) -> bool:
if self.return_code is not None:
return self.return_code == 0
@@ -249,6 +252,9 @@
steps: List[BuildCommand] = field(default_factory=list)
title: Optional[str] = None
+ def __hash__(self):
+ return hash((self.build_dir, self.title, len(self.steps)))
+
def __post_init__(self) -> None:
# Update all included steps to use this recipe's build_dir.
for step in self.steps:
diff --git a/pw_build/py/pw_build/project_builder.py b/pw_build/py/pw_build/project_builder.py
index 6b91756..9830553 100644
--- a/pw_build/py/pw_build/project_builder.py
+++ b/pw_build/py/pw_build/project_builder.py
@@ -46,6 +46,7 @@
import shlex
import sys
import subprocess
+import time
from typing import (
Callable,
Dict,
@@ -57,16 +58,18 @@
NamedTuple,
)
-
-import pw_cli.log
import pw_cli.env
+import pw_cli.log
+
from pw_build.build_recipe import BuildRecipe, create_build_recipes
-from pw_build.project_builder_prefs import ProjectBuilderPrefs
from pw_build.project_builder_argparse import add_project_builder_arguments
+from pw_build.project_builder_context import get_project_builder_context
+from pw_build.project_builder_prefs import ProjectBuilderPrefs
_COLOR = pw_cli.color.colors()
_LOG = logging.getLogger('pw_build')
+
PASS_MESSAGE = """
██████╗ █████╗ ███████╗███████╗██╗
██╔══██╗██╔══██╗██╔════╝██╔════╝██║
@@ -115,6 +118,7 @@
# a '^C' from the keyboard interrupt, add a newline before the log.
print()
_LOG.info('Got Ctrl-C; exiting...')
+ get_project_builder_context().terminate_and_wait()
sys.exit(1)
@@ -135,10 +139,18 @@
# pylint: enable=unused-argument
) -> bool:
print()
- current_build = subprocess.run(command, env=env, errors='replace')
+ proc = subprocess.Popen(command, env=env, errors='replace')
+ get_project_builder_context().register_process(recipe, proc)
+ returncode = None
+ while returncode is None:
+ if get_project_builder_context().should_abort():
+ proc.terminate()
+ returncode = proc.poll()
+ time.sleep(0.05)
print()
- recipe.status.return_code = current_build.returncode
- return current_build.returncode == 0
+ recipe.status.return_code = returncode
+
+ return proc.returncode == 0
def execute_command_with_logging(
@@ -159,6 +171,7 @@
stderr=subprocess.STDOUT,
errors='replace',
) as proc:
+ get_project_builder_context().register_process(recipe, proc)
# Empty line at the start.
logger.info('')
@@ -212,6 +225,9 @@
if line_processed_callback:
line_processed_callback(recipe)
+ if get_project_builder_context().should_abort():
+ proc.terminate()
+
recipe.status.return_code = returncode
# Empty line at the end.
logger.info('')
@@ -323,22 +339,17 @@
else:
self.extra_bazel_args.append('--color=no')
- if separate_build_file_logging and not root_logfile:
- raise MissingGlobalLogfile(
- '\n\nA logfile must be specified if using separate logs per '
- 'build directory.'
- )
-
self.separate_build_file_logging = separate_build_file_logging
self.default_log_level = log_level
self.default_logfile = root_logfile
- if root_logfile:
- timestamp_fmt = _COLOR.black_on_white('%(asctime)s') + ' '
- formatter = logging.Formatter(
- timestamp_fmt + '%(levelname)s %(message)s', '%Y%m%d %H:%M:%S'
- )
+ timestamp_fmt = _COLOR.black_on_white('%(asctime)s') + ' '
+ formatter = logging.Formatter(
+ timestamp_fmt + '%(levelname)s %(message)s', '%Y%m%d %H:%M:%S'
+ )
+ # Create a root logfile to save what is normally logged to stdout.
+ if root_logfile:
self.execute_command = execute_command_with_logging
build_log_filehandler = logging.FileHandler(
@@ -348,32 +359,45 @@
build_log_filehandler.setFormatter(formatter)
root_logger.addHandler(build_log_filehandler)
- if not separate_build_file_logging:
- for recipe in self.build_recipes:
- recipe.set_logger(root_logger)
- else:
- for recipe in self.build_recipes:
- new_logger = logging.getLogger(
- f'{root_logger.name}.{recipe.display_name}'
- )
- new_logger.setLevel(log_level)
- new_logger.propagate = False
- new_logfile = root_logfile.parent / (
- root_logfile.stem
- + '_'
- + recipe.display_name.replace(' ', '_')
- + root_logfile.suffix
+ if not separate_build_file_logging:
+ # Set each recipe to use the root logger.
+ for recipe in self.build_recipes:
+ recipe.set_logger(root_logger)
+ else:
+ # Create separate logfiles in out/log.txt
+ self.execute_command = execute_command_with_logging
+ for recipe in self.build_recipes:
+ new_logger = logging.getLogger(
+ f'{root_logger.name}.{recipe.display_name}'
+ )
+ new_logger.setLevel(log_level)
+ new_logger.propagate = False
+
+ new_logfile_dir = recipe.build_dir
+ new_logfile_name = Path('log.txt')
+ new_logfile_postfix = ''
+ if root_logfile:
+ new_logfile_dir = root_logfile.parent
+ new_logfile_name = root_logfile
+ new_logfile_postfix = '_' + recipe.display_name.replace(
+ ' ', '_'
)
- new_log_filehandler = logging.FileHandler(
- new_logfile, encoding='utf-8'
- )
- new_log_filehandler.setLevel(log_level)
- new_log_filehandler.setFormatter(formatter)
- new_logger.addHandler(new_log_filehandler)
+ new_logfile = new_logfile_dir / (
+ new_logfile_name.stem
+ + new_logfile_postfix
+ + new_logfile_name.suffix
+ )
- recipe.set_logger(new_logger)
- recipe.set_logfile(new_logfile)
+ new_log_filehandler = logging.FileHandler(
+ new_logfile, encoding='utf-8'
+ )
+ new_log_filehandler.setLevel(log_level)
+ new_log_filehandler.setFormatter(formatter)
+ new_logger.addHandler(new_log_filehandler)
+
+ recipe.set_logger(new_logger)
+ recipe.set_logfile(new_logfile)
def __len__(self) -> int:
return len(self.build_recipes)
@@ -441,6 +465,11 @@
if not build_succeded:
break
+ # If all steps were skipped the return code will not be set. Force
+ # status to passed in this case.
+ if build_succeded and not cfg.status.passed():
+ cfg.status.set_passed()
+
return build_succeded
def print_build_summary(
@@ -495,21 +524,18 @@
def run_recipe(
index: int, project_builder: ProjectBuilder, cfg: BuildRecipe, env
-) -> None:
+) -> bool:
num_builds = len(project_builder)
index_message = f'[{index}/{num_builds}]'
log_build_recipe_start(index_message, project_builder, cfg)
- try:
- project_builder.run_build(cfg, env, index_message=index_message)
- # Ctrl-C on Unix generates KeyboardInterrupt
- # Ctrl-Z on Windows generates EOFError
- except (KeyboardInterrupt, EOFError):
- _exit_due_to_interrupt()
+ result = project_builder.run_build(cfg, env, index_message=index_message)
log_build_recipe_finish(index_message, project_builder, cfg)
+ return result
+
def run_builds(project_builder: ProjectBuilder, workers: int = 1) -> None:
"""Execute build steps in the ProjectBuilder and print a summary."""
@@ -527,24 +553,48 @@
# Print status before starting
project_builder.print_build_summary()
- with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
- future_to_index = {}
- for i, cfg in enumerate(project_builder, 1):
- future_to_index[
- executor.submit(run_recipe, i, project_builder, cfg, env)
- ] = i
+ if workers > 1 and not project_builder.separate_build_file_logging:
+ _LOG.warning(
+ _COLOR.yellow(
+ 'Running in parallel without --separate-logfiles; All build '
+ 'output will be interleaved.'
+ )
+ )
+ get_project_builder_context().set_building()
+
+ if workers == 1:
try:
- for future in concurrent.futures.as_completed(future_to_index):
- future.result()
+ for i, cfg in enumerate(project_builder, 1):
+ run_recipe(i, project_builder, cfg, env)
# Ctrl-C on Unix generates KeyboardInterrupt
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
_exit_due_to_interrupt()
+ else:
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=workers
+ ) as executor:
+ future_to_index = {}
+ for i, cfg in enumerate(project_builder, 1):
+ future_to_index[
+ executor.submit(run_recipe, i, project_builder, cfg, env)
+ ] = i
+
+ try:
+ for future in concurrent.futures.as_completed(future_to_index):
+ future.result()
+ # Ctrl-C on Unix generates KeyboardInterrupt
+ # Ctrl-Z on Windows generates EOFError
+ except (KeyboardInterrupt, EOFError):
+ _exit_due_to_interrupt()
+
# Print status when finished
project_builder.print_build_summary()
+ get_project_builder_context().set_idle()
+
def main() -> None:
"""Build a Pigweed Project."""
@@ -588,7 +638,8 @@
log_level=log_level,
)
- run_builds(project_builder)
+ workers = len(project_builder) if args.parallel else 1
+ run_builds(project_builder, workers)
if __name__ == '__main__':
diff --git a/pw_build/py/pw_build/project_builder_argparse.py b/pw_build/py/pw_build/project_builder_argparse.py
index 603d114..81c2fb5 100644
--- a/pw_build/py/pw_build/project_builder_argparse.py
+++ b/pw_build/py/pw_build/project_builder_argparse.py
@@ -79,10 +79,7 @@
logfile_group.add_argument(
'--separate-logfiles',
action='store_true',
- help=(
- 'Create separate log files per build directory. Requires setting '
- 'the --logfile option.'
- ),
+ help='Create separate log files per build directory.',
)
logfile_group.add_argument(
@@ -146,4 +143,10 @@
),
)
+ parser.add_argument(
+ '--parallel',
+ action='store_true',
+ help='Run all builds in parallel.',
+ )
+
return parser
diff --git a/pw_build/py/pw_build/project_builder_context.py b/pw_build/py/pw_build/project_builder_context.py
new file mode 100644
index 0000000..b0c2cac
--- /dev/null
+++ b/pw_build/py/pw_build/project_builder_context.py
@@ -0,0 +1,115 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Fetch active Project Builder Context."""
+
+import concurrent.futures
+from contextvars import ContextVar
+from dataclasses import dataclass, field
+from enum import Enum
+import logging
+import subprocess
+from typing import Dict, Optional
+
+from pw_build.build_recipe import BuildRecipe
+
+
+_LOG = logging.getLogger('pw_build.watch')
+
+
+def _wait_for_terminate_then_kill(
+ proc: subprocess.Popen, timeout: int = 3
+) -> int:
+ """Wait for a process to end, then kill it if the timeout expires."""
+ returncode = 1
+ try:
+ returncode = proc.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ _LOG.debug('Killing %s', proc)
+ proc.kill()
+ return returncode
+
+
+class ProjectBuilderState(Enum):
+ IDLE = 'IDLE'
+ BUILDING = 'BUILDING'
+ ABORT = 'ABORT'
+
+
+@dataclass
+class ProjectBuilderContext:
+ """Maintains the state of running builds and active subproccesses."""
+
+ current_state: Optional[ProjectBuilderState] = ProjectBuilderState.IDLE
+ desired_state: Optional[ProjectBuilderState] = ProjectBuilderState.BUILDING
+ procs: Dict[BuildRecipe, Optional[subprocess.Popen]] = field(
+ default_factory=dict
+ )
+
+ def register_process(
+ self, recipe: BuildRecipe, proc: subprocess.Popen
+ ) -> None:
+ self.procs[recipe] = proc
+
+ def terminate_and_wait(self) -> None:
+ """End a subproces either cleanly or with a kill signal."""
+ if self.is_idle() or self.should_abort():
+ return
+
+ self._signal_abort()
+
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=len(self.procs)
+ ) as executor:
+ futures = []
+ for _recipe, proc in self.procs.items():
+ if proc is None:
+ continue
+ _LOG.debug('Wait for %s', proc)
+ futures.append(
+ executor.submit(_wait_for_terminate_then_kill, proc)
+ )
+ for future in concurrent.futures.as_completed(futures):
+ future.result()
+
+ _LOG.debug('Wait for terminate DONE')
+ self.set_idle()
+
+ def _signal_abort(self) -> None:
+ self.desired_state = ProjectBuilderState.ABORT
+
+ def should_abort(self) -> bool:
+ return self.desired_state == ProjectBuilderState.ABORT
+
+ def is_building(self) -> bool:
+ return self.current_state == ProjectBuilderState.BUILDING
+
+ def is_idle(self) -> bool:
+ return self.current_state == ProjectBuilderState.IDLE
+
+ def set_idle(self) -> None:
+ self.current_state = ProjectBuilderState.IDLE
+ self.desired_state = ProjectBuilderState.IDLE
+
+ def set_building(self) -> None:
+ self.current_state = ProjectBuilderState.BUILDING
+ self.desired_state = ProjectBuilderState.BUILDING
+
+
+PROJECT_BUILDER_CONTEXTVAR = ContextVar(
+ 'pw_build_project_builder_state', default=ProjectBuilderContext()
+)
+
+
+def get_project_builder_context():
+ return PROJECT_BUILDER_CONTEXTVAR.get()
diff --git a/pw_watch/py/pw_watch/watch.py b/pw_watch/py/pw_watch/watch.py
index 610bb5f..4e13dc5 100755
--- a/pw_watch/py/pw_watch/watch.py
+++ b/pw_watch/py/pw_watch/watch.py
@@ -42,6 +42,7 @@
"""
import argparse
+import concurrent.futures
import errno
import http.server
import logging
@@ -78,12 +79,14 @@
from pw_build.build_recipe import BuildRecipe, create_build_recipes
from pw_build.project_builder import (
ProjectBuilder,
+ execute_command_no_logging,
execute_command_with_logging,
log_build_recipe_start,
log_build_recipe_finish,
ASCII_CHARSET,
EMOJI_CHARSET,
)
+from pw_build.project_builder_context import get_project_builder_context
import pw_cli.branding
import pw_cli.color
import pw_cli.env
@@ -159,6 +162,7 @@
banners: bool = True,
use_logfile: bool = False,
separate_logfiles: bool = False,
+ parallel: bool = False,
):
super().__init__()
@@ -171,6 +175,7 @@
self.patterns = patterns
self.ignore_patterns = ignore_patterns
self.project_builder = project_builder
+ self.parallel_workers = len(project_builder) if parallel else 1
self.restart_on_changes = restart
self.fullscreen_enabled = fullscreen
@@ -178,9 +183,8 @@
self.use_logfile = use_logfile
self.separate_logfiles = separate_logfiles
-
- # Initialize self._current_build to an empty subprocess.
- self._current_build = subprocess.Popen('', shell=True, errors='replace')
+ if self.parallel_workers > 1:
+ self.separate_logfiles = True
self.debouncer = Debouncer(self)
@@ -196,8 +200,6 @@
def rebuild(self):
"""Rebuild command triggered from watch app."""
- self._current_build.terminate()
- self._current_build.wait()
self.debouncer.press('Manual build requested')
def _wait_for_enter(self) -> NoReturn:
@@ -209,8 +211,6 @@
# Ctrl-Z on Windows generates EOFError
except (KeyboardInterrupt, EOFError):
# Force stop any running ninja builds.
- if self._current_build:
- self._current_build.terminate()
_exit_due_to_interrupt()
def _path_matches(self, path: Path) -> bool:
@@ -265,7 +265,7 @@
# than on the main thread that's watching file events. This enables the
# watcher to continue receiving file change events during a build.
def run(self) -> None:
- """Run all the builds in serial and capture pass/fail for each."""
+ """Run all the builds and capture pass/fail for each."""
# Clear the screen and show a banner indicating the build is starting.
self._clear_screen()
@@ -307,13 +307,24 @@
# Force Ninja to output ANSI colors
env['CLICOLOR_FORCE'] = '1'
- # Reset status messages
+ # Reset status
+ get_project_builder_context().set_building()
+
for cfg in self.project_builder:
cfg.reset_status()
self.create_result_message()
- for i, cfg in enumerate(self.project_builder, 1):
- self.run_recipe(i, cfg, env)
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=self.parallel_workers
+ ) as executor:
+ futures = []
+ for i, cfg in enumerate(self.project_builder, 1):
+ futures.append(executor.submit(self.run_recipe, i, cfg, env))
+
+ for future in concurrent.futures.as_completed(futures):
+ future.result()
+
+ get_project_builder_context().set_idle()
def run_recipe(self, index: int, cfg: BuildRecipe, env) -> None:
num_builds = len(self.project_builder)
@@ -361,13 +372,13 @@
'Failed'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH),
)
)
- # NOTE: This condition assumes each build dir is run in serial
elif first_building_target_found:
self.result_message.append(
('', ''.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
)
else:
- first_building_target_found = True
+ if self.parallel_workers == 1:
+ first_building_target_found = True
self.result_message.append(
(
'class:theme-fg-yellow',
@@ -401,19 +412,13 @@
return execute_command_with_logging(
command, env, recipe, logger=recipe.log
)
+
if self.use_logfile:
return execute_command_with_logging(
command, env, recipe, logger=_LOG
)
- print()
- self._current_build = subprocess.Popen(
- command, env=env, errors='replace'
- )
- returncode = self._current_build.wait()
- print()
- recipe.status.return_code = returncode
- return returncode == 0
+ return execute_command_no_logging(command, env, recipe)
def _execute_command_watch_app(
self,
@@ -450,8 +455,7 @@
# Implementation of DebouncedFunction.cancel()
def cancel(self) -> bool:
if self.restart_on_changes:
- self._current_build.terminate()
- self._current_build.wait()
+ get_project_builder_context().terminate_and_wait()
return True
return False
@@ -513,6 +517,7 @@
# a '^C' from the keyboard interrupt, add a newline before the log.
print('')
_LOG.info('Got Ctrl-C; exiting...')
+ get_project_builder_context().terminate_and_wait()
_exit(0)
@@ -767,6 +772,7 @@
banners: bool = True,
logfile: Optional[Path] = None,
separate_logfiles: bool = False,
+ parallel: bool = False,
# pylint: disable=unused-argument
default_build_targets: Optional[List[str]] = None,
build_directories: Optional[List[str]] = None,
@@ -823,6 +829,14 @@
else []
)
+ # Add project_builder logfiles to ignore_patterns
+ if project_builder.default_logfile:
+ ignore_patterns.append(str(project_builder.default_logfile))
+ if project_builder.separate_build_file_logging:
+ for recipe in project_builder:
+ if recipe.logfile:
+ ignore_patterns.append(str(recipe.logfile))
+
event_handler = PigweedBuildWatcher(
project_builder=project_builder,
patterns=patterns.split(WATCH_PATTERN_DELIMITER),
@@ -832,6 +846,7 @@
banners=banners,
use_logfile=bool(logfile),
separate_logfiles=bool(separate_logfiles),
+ parallel=parallel,
)
project_builder.execute_command = event_handler.execute_command
@@ -935,6 +950,11 @@
else:
charset = ASCII_CHARSET
+ # Force separate-logfiles for split window panes if running in parallel.
+ separate_logfiles = args.separate_logfiles
+ if args.parallel:
+ separate_logfiles = True
+
project_builder = ProjectBuilder(
build_recipes=build_recipes,
jobs=args.jobs,
@@ -942,7 +962,7 @@
keep_going=args.keep_going,
colors=args.colors,
charset=charset,
- separate_build_file_logging=args.separate_logfiles,
+ separate_build_file_logging=separate_logfiles,
root_logfile=args.logfile,
root_logger=_LOG,
log_level=logging.DEBUG if args.debug_logging else logging.INFO,
diff --git a/pw_watch/py/pw_watch/watch_app.py b/pw_watch/py/pw_watch/watch_app.py
index 3886f80..b8e53c7 100644
--- a/pw_watch/py/pw_watch/watch_app.py
+++ b/pw_watch/py/pw_watch/watch_app.py
@@ -62,6 +62,7 @@
import pw_console.widgets.border
from pw_build.project_builder_prefs import ProjectBuilderPrefs
+from pw_build.project_builder_context import get_project_builder_context
class WatchAppPrefs(ProjectBuilderPrefs):
@@ -536,6 +537,7 @@
print('Logs saved to: {}'.format(log_file.resolve()))
sys.exit(future.result())
+ get_project_builder_context().terminate_and_wait()
if self.application.future:
self.application.future.add_done_callback(_really_exit)
self.application.exit(result=exit_code)