pw_build: Parallel worker support

Adds --parallel to pw_build.project_builder and pw watch
This runs each out directory build in parallel.

Change-Id: Ia3b9dc1a05701890bf32f360916d7c3cc8ef2c50
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126520
Reviewed-by: Chad Norvell <chadnorvell@google.com>
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
diff --git a/pw_build/py/BUILD.gn b/pw_build/py/BUILD.gn
index 6322c84..16d6711 100644
--- a/pw_build/py/BUILD.gn
+++ b/pw_build/py/BUILD.gn
@@ -45,6 +45,7 @@
     "pw_build/pip_install_python_deps.py",
     "pw_build/project_builder.py",
     "pw_build/project_builder_argparse.py",
+    "pw_build/project_builder_context.py",
     "pw_build/project_builder_prefs.py",
     "pw_build/python_package.py",
     "pw_build/python_runner.py",
diff --git a/pw_build/py/pw_build/build_recipe.py b/pw_build/py/pw_build/build_recipe.py
index 3387afb..117d488 100644
--- a/pw_build/py/pw_build/build_recipe.py
+++ b/pw_build/py/pw_build/build_recipe.py
@@ -207,6 +207,9 @@
             return self.return_code != 0
         return False
 
+    def set_passed(self) -> None:
+        self.return_code = 0
+
     def passed(self) -> bool:
         if self.return_code is not None:
             return self.return_code == 0
@@ -249,6 +252,9 @@
     steps: List[BuildCommand] = field(default_factory=list)
     title: Optional[str] = None
 
+    def __hash__(self):
+        return hash((self.build_dir, self.title, len(self.steps)))
+
     def __post_init__(self) -> None:
         # Update all included steps to use this recipe's build_dir.
         for step in self.steps:
diff --git a/pw_build/py/pw_build/project_builder.py b/pw_build/py/pw_build/project_builder.py
index 6b91756..9830553 100644
--- a/pw_build/py/pw_build/project_builder.py
+++ b/pw_build/py/pw_build/project_builder.py
@@ -46,6 +46,7 @@
 import shlex
 import sys
 import subprocess
+import time
 from typing import (
     Callable,
     Dict,
@@ -57,16 +58,18 @@
     NamedTuple,
 )
 
-
-import pw_cli.log
 import pw_cli.env
+import pw_cli.log
+
 from pw_build.build_recipe import BuildRecipe, create_build_recipes
-from pw_build.project_builder_prefs import ProjectBuilderPrefs
 from pw_build.project_builder_argparse import add_project_builder_arguments
+from pw_build.project_builder_context import get_project_builder_context
+from pw_build.project_builder_prefs import ProjectBuilderPrefs
 
 _COLOR = pw_cli.color.colors()
 _LOG = logging.getLogger('pw_build')
 
+
 PASS_MESSAGE = """
   ██████╗  █████╗ ███████╗███████╗██╗
   ██╔══██╗██╔══██╗██╔════╝██╔════╝██║
@@ -115,6 +118,7 @@
     # a '^C' from the keyboard interrupt, add a newline before the log.
     print()
     _LOG.info('Got Ctrl-C; exiting...')
+    get_project_builder_context().terminate_and_wait()
     sys.exit(1)
 
 
@@ -135,10 +139,18 @@
     # pylint: enable=unused-argument
 ) -> bool:
     print()
-    current_build = subprocess.run(command, env=env, errors='replace')
+    proc = subprocess.Popen(command, env=env, errors='replace')
+    get_project_builder_context().register_process(recipe, proc)
+    returncode = None
+    while returncode is None:
+        if get_project_builder_context().should_abort():
+            proc.terminate()
+        returncode = proc.poll()
+        time.sleep(0.05)
     print()
-    recipe.status.return_code = current_build.returncode
-    return current_build.returncode == 0
+    recipe.status.return_code = returncode
+
+    return proc.returncode == 0
 
 
 def execute_command_with_logging(
@@ -159,6 +171,7 @@
         stderr=subprocess.STDOUT,
         errors='replace',
     ) as proc:
+        get_project_builder_context().register_process(recipe, proc)
         # Empty line at the start.
         logger.info('')
 
@@ -212,6 +225,9 @@
             if line_processed_callback:
                 line_processed_callback(recipe)
 
+            if get_project_builder_context().should_abort():
+                proc.terminate()
+
         recipe.status.return_code = returncode
         # Empty line at the end.
         logger.info('')
@@ -323,22 +339,17 @@
         else:
             self.extra_bazel_args.append('--color=no')
 
-        if separate_build_file_logging and not root_logfile:
-            raise MissingGlobalLogfile(
-                '\n\nA logfile must be specified if using separate logs per '
-                'build directory.'
-            )
-
         self.separate_build_file_logging = separate_build_file_logging
         self.default_log_level = log_level
         self.default_logfile = root_logfile
 
-        if root_logfile:
-            timestamp_fmt = _COLOR.black_on_white('%(asctime)s') + ' '
-            formatter = logging.Formatter(
-                timestamp_fmt + '%(levelname)s %(message)s', '%Y%m%d %H:%M:%S'
-            )
+        timestamp_fmt = _COLOR.black_on_white('%(asctime)s') + ' '
+        formatter = logging.Formatter(
+            timestamp_fmt + '%(levelname)s %(message)s', '%Y%m%d %H:%M:%S'
+        )
 
+        # Create a root logfile to save what is normally logged to stdout.
+        if root_logfile:
             self.execute_command = execute_command_with_logging
 
             build_log_filehandler = logging.FileHandler(
@@ -348,32 +359,45 @@
             build_log_filehandler.setFormatter(formatter)
             root_logger.addHandler(build_log_filehandler)
 
-            if not separate_build_file_logging:
-                for recipe in self.build_recipes:
-                    recipe.set_logger(root_logger)
-            else:
-                for recipe in self.build_recipes:
-                    new_logger = logging.getLogger(
-                        f'{root_logger.name}.{recipe.display_name}'
-                    )
-                    new_logger.setLevel(log_level)
-                    new_logger.propagate = False
-                    new_logfile = root_logfile.parent / (
-                        root_logfile.stem
-                        + '_'
-                        + recipe.display_name.replace(' ', '_')
-                        + root_logfile.suffix
+        if not separate_build_file_logging:
+            # Set each recipe to use the root logger.
+            for recipe in self.build_recipes:
+                recipe.set_logger(root_logger)
+        else:
+            # Create separate logfiles in out/log.txt
+            self.execute_command = execute_command_with_logging
+            for recipe in self.build_recipes:
+                new_logger = logging.getLogger(
+                    f'{root_logger.name}.{recipe.display_name}'
+                )
+                new_logger.setLevel(log_level)
+                new_logger.propagate = False
+
+                new_logfile_dir = recipe.build_dir
+                new_logfile_name = Path('log.txt')
+                new_logfile_postfix = ''
+                if root_logfile:
+                    new_logfile_dir = root_logfile.parent
+                    new_logfile_name = root_logfile
+                    new_logfile_postfix = '_' + recipe.display_name.replace(
+                        ' ', '_'
                     )
 
-                    new_log_filehandler = logging.FileHandler(
-                        new_logfile, encoding='utf-8'
-                    )
-                    new_log_filehandler.setLevel(log_level)
-                    new_log_filehandler.setFormatter(formatter)
-                    new_logger.addHandler(new_log_filehandler)
+                new_logfile = new_logfile_dir / (
+                    new_logfile_name.stem
+                    + new_logfile_postfix
+                    + new_logfile_name.suffix
+                )
 
-                    recipe.set_logger(new_logger)
-                    recipe.set_logfile(new_logfile)
+                new_log_filehandler = logging.FileHandler(
+                    new_logfile, encoding='utf-8'
+                )
+                new_log_filehandler.setLevel(log_level)
+                new_log_filehandler.setFormatter(formatter)
+                new_logger.addHandler(new_log_filehandler)
+
+                recipe.set_logger(new_logger)
+                recipe.set_logfile(new_logfile)
 
     def __len__(self) -> int:
         return len(self.build_recipes)
@@ -441,6 +465,11 @@
             if not build_succeded:
                 break
 
+        # If all steps were skipped the return code will not be set. Force
+        # status to passed in this case.
+        if build_succeded and not cfg.status.passed():
+            cfg.status.set_passed()
+
         return build_succeded
 
     def print_build_summary(
@@ -495,21 +524,18 @@
 
 def run_recipe(
     index: int, project_builder: ProjectBuilder, cfg: BuildRecipe, env
-) -> None:
+) -> bool:
     num_builds = len(project_builder)
     index_message = f'[{index}/{num_builds}]'
 
     log_build_recipe_start(index_message, project_builder, cfg)
 
-    try:
-        project_builder.run_build(cfg, env, index_message=index_message)
-    # Ctrl-C on Unix generates KeyboardInterrupt
-    # Ctrl-Z on Windows generates EOFError
-    except (KeyboardInterrupt, EOFError):
-        _exit_due_to_interrupt()
+    result = project_builder.run_build(cfg, env, index_message=index_message)
 
     log_build_recipe_finish(index_message, project_builder, cfg)
 
+    return result
+
 
 def run_builds(project_builder: ProjectBuilder, workers: int = 1) -> None:
     """Execute build steps in the ProjectBuilder and print a summary."""
@@ -527,24 +553,48 @@
     # Print status before starting
     project_builder.print_build_summary()
 
-    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
-        future_to_index = {}
-        for i, cfg in enumerate(project_builder, 1):
-            future_to_index[
-                executor.submit(run_recipe, i, project_builder, cfg, env)
-            ] = i
+    if workers > 1 and not project_builder.separate_build_file_logging:
+        _LOG.warning(
+            _COLOR.yellow(
+                'Running in parallel without --separate-logfiles; All build '
+                'output will be interleaved.'
+            )
+        )
 
+    get_project_builder_context().set_building()
+
+    if workers == 1:
         try:
-            for future in concurrent.futures.as_completed(future_to_index):
-                future.result()
+            for i, cfg in enumerate(project_builder, 1):
+                run_recipe(i, project_builder, cfg, env)
         # Ctrl-C on Unix generates KeyboardInterrupt
         # Ctrl-Z on Windows generates EOFError
         except (KeyboardInterrupt, EOFError):
             _exit_due_to_interrupt()
 
+    else:
+        with concurrent.futures.ThreadPoolExecutor(
+            max_workers=workers
+        ) as executor:
+            future_to_index = {}
+            for i, cfg in enumerate(project_builder, 1):
+                future_to_index[
+                    executor.submit(run_recipe, i, project_builder, cfg, env)
+                ] = i
+
+            try:
+                for future in concurrent.futures.as_completed(future_to_index):
+                    future.result()
+            # Ctrl-C on Unix generates KeyboardInterrupt
+            # Ctrl-Z on Windows generates EOFError
+            except (KeyboardInterrupt, EOFError):
+                _exit_due_to_interrupt()
+
     # Print status when finished
     project_builder.print_build_summary()
 
+    get_project_builder_context().set_idle()
+
 
 def main() -> None:
     """Build a Pigweed Project."""
@@ -588,7 +638,8 @@
         log_level=log_level,
     )
 
-    run_builds(project_builder)
+    workers = len(project_builder) if args.parallel else 1
+    run_builds(project_builder, workers)
 
 
 if __name__ == '__main__':
diff --git a/pw_build/py/pw_build/project_builder_argparse.py b/pw_build/py/pw_build/project_builder_argparse.py
index 603d114..81c2fb5 100644
--- a/pw_build/py/pw_build/project_builder_argparse.py
+++ b/pw_build/py/pw_build/project_builder_argparse.py
@@ -79,10 +79,7 @@
     logfile_group.add_argument(
         '--separate-logfiles',
         action='store_true',
-        help=(
-            'Create separate log files per build directory. Requires setting '
-            'the --logfile option.'
-        ),
+        help='Create separate log files per build directory.',
     )
 
     logfile_group.add_argument(
@@ -146,4 +143,10 @@
         ),
     )
 
+    parser.add_argument(
+        '--parallel',
+        action='store_true',
+        help='Run all builds in parallel.',
+    )
+
     return parser
diff --git a/pw_build/py/pw_build/project_builder_context.py b/pw_build/py/pw_build/project_builder_context.py
new file mode 100644
index 0000000..b0c2cac
--- /dev/null
+++ b/pw_build/py/pw_build/project_builder_context.py
@@ -0,0 +1,115 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Fetch active Project Builder Context."""
+
+import concurrent.futures
+from contextvars import ContextVar
+from dataclasses import dataclass, field
+from enum import Enum
+import logging
+import subprocess
+from typing import Dict, Optional
+
+from pw_build.build_recipe import BuildRecipe
+
+
+_LOG = logging.getLogger('pw_build.watch')
+
+
+def _wait_for_terminate_then_kill(
+    proc: subprocess.Popen, timeout: int = 3
+) -> int:
+    """Wait for a process to end, then kill it if the timeout expires."""
+    returncode = 1
+    try:
+        returncode = proc.wait(timeout=timeout)
+    except subprocess.TimeoutExpired:
+        _LOG.debug('Killing %s', proc)
+        proc.kill()
+    return returncode
+
+
+class ProjectBuilderState(Enum):
+    IDLE = 'IDLE'
+    BUILDING = 'BUILDING'
+    ABORT = 'ABORT'
+
+
+@dataclass
+class ProjectBuilderContext:
+    """Maintains the state of running builds and active subproccesses."""
+
+    current_state: Optional[ProjectBuilderState] = ProjectBuilderState.IDLE
+    desired_state: Optional[ProjectBuilderState] = ProjectBuilderState.BUILDING
+    procs: Dict[BuildRecipe, Optional[subprocess.Popen]] = field(
+        default_factory=dict
+    )
+
+    def register_process(
+        self, recipe: BuildRecipe, proc: subprocess.Popen
+    ) -> None:
+        self.procs[recipe] = proc
+
+    def terminate_and_wait(self) -> None:
+        """End a subproces either cleanly or with a kill signal."""
+        if self.is_idle() or self.should_abort():
+            return
+
+        self._signal_abort()
+
+        with concurrent.futures.ThreadPoolExecutor(
+            max_workers=len(self.procs)
+        ) as executor:
+            futures = []
+            for _recipe, proc in self.procs.items():
+                if proc is None:
+                    continue
+                _LOG.debug('Wait for %s', proc)
+                futures.append(
+                    executor.submit(_wait_for_terminate_then_kill, proc)
+                )
+            for future in concurrent.futures.as_completed(futures):
+                future.result()
+
+        _LOG.debug('Wait for terminate DONE')
+        self.set_idle()
+
+    def _signal_abort(self) -> None:
+        self.desired_state = ProjectBuilderState.ABORT
+
+    def should_abort(self) -> bool:
+        return self.desired_state == ProjectBuilderState.ABORT
+
+    def is_building(self) -> bool:
+        return self.current_state == ProjectBuilderState.BUILDING
+
+    def is_idle(self) -> bool:
+        return self.current_state == ProjectBuilderState.IDLE
+
+    def set_idle(self) -> None:
+        self.current_state = ProjectBuilderState.IDLE
+        self.desired_state = ProjectBuilderState.IDLE
+
+    def set_building(self) -> None:
+        self.current_state = ProjectBuilderState.BUILDING
+        self.desired_state = ProjectBuilderState.BUILDING
+
+
+PROJECT_BUILDER_CONTEXTVAR = ContextVar(
+    'pw_build_project_builder_state', default=ProjectBuilderContext()
+)
+
+
+def get_project_builder_context():
+    return PROJECT_BUILDER_CONTEXTVAR.get()
diff --git a/pw_watch/py/pw_watch/watch.py b/pw_watch/py/pw_watch/watch.py
index 610bb5f..4e13dc5 100755
--- a/pw_watch/py/pw_watch/watch.py
+++ b/pw_watch/py/pw_watch/watch.py
@@ -42,6 +42,7 @@
 """
 
 import argparse
+import concurrent.futures
 import errno
 import http.server
 import logging
@@ -78,12 +79,14 @@
 from pw_build.build_recipe import BuildRecipe, create_build_recipes
 from pw_build.project_builder import (
     ProjectBuilder,
+    execute_command_no_logging,
     execute_command_with_logging,
     log_build_recipe_start,
     log_build_recipe_finish,
     ASCII_CHARSET,
     EMOJI_CHARSET,
 )
+from pw_build.project_builder_context import get_project_builder_context
 import pw_cli.branding
 import pw_cli.color
 import pw_cli.env
@@ -159,6 +162,7 @@
         banners: bool = True,
         use_logfile: bool = False,
         separate_logfiles: bool = False,
+        parallel: bool = False,
     ):
         super().__init__()
 
@@ -171,6 +175,7 @@
         self.patterns = patterns
         self.ignore_patterns = ignore_patterns
         self.project_builder = project_builder
+        self.parallel_workers = len(project_builder) if parallel else 1
 
         self.restart_on_changes = restart
         self.fullscreen_enabled = fullscreen
@@ -178,9 +183,8 @@
 
         self.use_logfile = use_logfile
         self.separate_logfiles = separate_logfiles
-
-        # Initialize self._current_build to an empty subprocess.
-        self._current_build = subprocess.Popen('', shell=True, errors='replace')
+        if self.parallel_workers > 1:
+            self.separate_logfiles = True
 
         self.debouncer = Debouncer(self)
 
@@ -196,8 +200,6 @@
 
     def rebuild(self):
         """Rebuild command triggered from watch app."""
-        self._current_build.terminate()
-        self._current_build.wait()
         self.debouncer.press('Manual build requested')
 
     def _wait_for_enter(self) -> NoReturn:
@@ -209,8 +211,6 @@
         # Ctrl-Z on Windows generates EOFError
         except (KeyboardInterrupt, EOFError):
             # Force stop any running ninja builds.
-            if self._current_build:
-                self._current_build.terminate()
             _exit_due_to_interrupt()
 
     def _path_matches(self, path: Path) -> bool:
@@ -265,7 +265,7 @@
     # than on the main thread that's watching file events. This enables the
     # watcher to continue receiving file change events during a build.
     def run(self) -> None:
-        """Run all the builds in serial and capture pass/fail for each."""
+        """Run all the builds and capture pass/fail for each."""
 
         # Clear the screen and show a banner indicating the build is starting.
         self._clear_screen()
@@ -307,13 +307,24 @@
         # Force Ninja to output ANSI colors
         env['CLICOLOR_FORCE'] = '1'
 
-        # Reset status messages
+        # Reset status
+        get_project_builder_context().set_building()
+
         for cfg in self.project_builder:
             cfg.reset_status()
         self.create_result_message()
 
-        for i, cfg in enumerate(self.project_builder, 1):
-            self.run_recipe(i, cfg, env)
+        with concurrent.futures.ThreadPoolExecutor(
+            max_workers=self.parallel_workers
+        ) as executor:
+            futures = []
+            for i, cfg in enumerate(self.project_builder, 1):
+                futures.append(executor.submit(self.run_recipe, i, cfg, env))
+
+            for future in concurrent.futures.as_completed(futures):
+                future.result()
+
+        get_project_builder_context().set_idle()
 
     def run_recipe(self, index: int, cfg: BuildRecipe, env) -> None:
         num_builds = len(self.project_builder)
@@ -361,13 +372,13 @@
                         'Failed'.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH),
                     )
                 )
-            # NOTE: This condition assumes each build dir is run in serial
             elif first_building_target_found:
                 self.result_message.append(
                     ('', ''.rjust(_FULLSCREEN_STATUS_COLUMN_WIDTH))
                 )
             else:
-                first_building_target_found = True
+                if self.parallel_workers == 1:
+                    first_building_target_found = True
                 self.result_message.append(
                     (
                         'class:theme-fg-yellow',
@@ -401,19 +412,13 @@
             return execute_command_with_logging(
                 command, env, recipe, logger=recipe.log
             )
+
         if self.use_logfile:
             return execute_command_with_logging(
                 command, env, recipe, logger=_LOG
             )
 
-        print()
-        self._current_build = subprocess.Popen(
-            command, env=env, errors='replace'
-        )
-        returncode = self._current_build.wait()
-        print()
-        recipe.status.return_code = returncode
-        return returncode == 0
+        return execute_command_no_logging(command, env, recipe)
 
     def _execute_command_watch_app(
         self,
@@ -450,8 +455,7 @@
     # Implementation of DebouncedFunction.cancel()
     def cancel(self) -> bool:
         if self.restart_on_changes:
-            self._current_build.terminate()
-            self._current_build.wait()
+            get_project_builder_context().terminate_and_wait()
             return True
 
         return False
@@ -513,6 +517,7 @@
     # a '^C' from the keyboard interrupt, add a newline before the log.
     print('')
     _LOG.info('Got Ctrl-C; exiting...')
+    get_project_builder_context().terminate_and_wait()
     _exit(0)
 
 
@@ -767,6 +772,7 @@
     banners: bool = True,
     logfile: Optional[Path] = None,
     separate_logfiles: bool = False,
+    parallel: bool = False,
     # pylint: disable=unused-argument
     default_build_targets: Optional[List[str]] = None,
     build_directories: Optional[List[str]] = None,
@@ -823,6 +829,14 @@
         else []
     )
 
+    # Add project_builder logfiles to ignore_patterns
+    if project_builder.default_logfile:
+        ignore_patterns.append(str(project_builder.default_logfile))
+    if project_builder.separate_build_file_logging:
+        for recipe in project_builder:
+            if recipe.logfile:
+                ignore_patterns.append(str(recipe.logfile))
+
     event_handler = PigweedBuildWatcher(
         project_builder=project_builder,
         patterns=patterns.split(WATCH_PATTERN_DELIMITER),
@@ -832,6 +846,7 @@
         banners=banners,
         use_logfile=bool(logfile),
         separate_logfiles=bool(separate_logfiles),
+        parallel=parallel,
     )
 
     project_builder.execute_command = event_handler.execute_command
@@ -935,6 +950,11 @@
     else:
         charset = ASCII_CHARSET
 
+    # Force separate-logfiles for split window panes if running in parallel.
+    separate_logfiles = args.separate_logfiles
+    if args.parallel:
+        separate_logfiles = True
+
     project_builder = ProjectBuilder(
         build_recipes=build_recipes,
         jobs=args.jobs,
@@ -942,7 +962,7 @@
         keep_going=args.keep_going,
         colors=args.colors,
         charset=charset,
-        separate_build_file_logging=args.separate_logfiles,
+        separate_build_file_logging=separate_logfiles,
         root_logfile=args.logfile,
         root_logger=_LOG,
         log_level=logging.DEBUG if args.debug_logging else logging.INFO,
diff --git a/pw_watch/py/pw_watch/watch_app.py b/pw_watch/py/pw_watch/watch_app.py
index 3886f80..b8e53c7 100644
--- a/pw_watch/py/pw_watch/watch_app.py
+++ b/pw_watch/py/pw_watch/watch_app.py
@@ -62,6 +62,7 @@
 import pw_console.widgets.border
 
 from pw_build.project_builder_prefs import ProjectBuilderPrefs
+from pw_build.project_builder_context import get_project_builder_context
 
 
 class WatchAppPrefs(ProjectBuilderPrefs):
@@ -536,6 +537,7 @@
                 print('Logs saved to: {}'.format(log_file.resolve()))
             sys.exit(future.result())
 
+        get_project_builder_context().terminate_and_wait()
         if self.application.future:
             self.application.future.add_done_callback(_really_exit)
         self.application.exit(result=exit_code)