| # Copyright 2025 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """This module provides an API for running 'pw presubmit'. |
| |
| The `pw presubmit` tool is used to run a series of checks and tests on a |
| codebase to ensure that it meets certain quality standards before changes are |
| submitted. This recipe module wraps the execution of `pw presubmit`, providing |
| functionality to: |
| |
| - Initialize a presubmit context with various options. |
| - Discover available presubmit steps and programs. |
| - Run specific presubmit steps or entire programs. |
| - Handle logging, artifact uploading (e.g., to CIPD), and Gerrit commenting. |
| - Process metadata generated by presubmit steps. |
| """ |
| |
| from __future__ import annotations |
| |
| import contextlib |
| import dataclasses |
| |
| from recipe_engine import config_types, recipe_api, util |
| |
| from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb |
| from PB.recipe_engine import result as result_pb |
| |
| from RECIPE_MODULES.recipe_engine.futures import api as futures_api |
| |
| |
| @dataclasses.dataclass |
| class GcsUploadContext: |
| _api: recipe_api.RecipeApi |
| gcs_bucket: str |
| namespace: str | None |
| _futures: list[futures_api.Future] = dataclasses.field(default_factory=list) |
| |
| def __post_init__(self): |
| if not self.namespace: |
| self.namespace = self._api.buildbucket.build.id |
| |
| def subcontext(self, subdirectory: str) -> GcsUploadContext: |
| return GcsUploadContext( |
| _api=self._api, |
| gcs_bucket=self.gcs_bucket, |
| namespace=f'{self.namespace}/{subdirectory.strip("/")}', |
| _futures=self._futures, |
| ) |
| |
| def upload( |
| self, |
| source: config_types.Path | util.InputPlaceholder, |
| dest: str, |
| ) -> None: |
| with self._api.step.nest(f'upload {dest}'): |
| if not self._api.path.exists(source): |
| self._api.step.empty('does not exist') # pragma: no cover |
| |
| elif self._api.path.isfile(source): |
| self._api.step.empty('file') |
| |
| # Copy any files to another location immediately. Future steps |
| # might reuse the same output directory. |
| temp = self._api.path.mkdtemp() / source.name |
| self._api.file.copy('copy', source, temp) |
| |
| self._futures.append( |
| self._api.futures.spawn( |
| self._api.gsutil.upload_namespaced_file, |
| bucket=self.gcs_bucket, |
| namespace=self.namespace, |
| source=temp, |
| subpath=dest, |
| ), |
| ) |
| |
| elif self._api.path.isdir(source): |
| self._api.step.empty('directory') |
| |
| # Copy any files to another location immediately. Future steps |
| # might reuse the same output directory. |
| temp = self._api.path.mkdtemp() / source.name |
| self._api.file.copytree('copytree', source, temp) |
| |
| self._futures.append( |
| self._api.futures.spawn( |
| self._api.file.listdir, |
| 'ls', |
| temp, |
| recursive=True, |
| ) |
| ) |
| self._futures.append( |
| self._api.futures.spawn( |
| self._api.gsutil.upload_namespaced_directory, |
| bucket=self.gcs_bucket, |
| namespace=self.namespace, |
| source=temp, |
| subpath=dest, |
| ), |
| ) |
| |
| def wait(self): |
| self._api.futures.wait(self._futures) |
| |
| @property |
| def link(self): |
| return self._api.gsutil.namespaced_directory_url(self.gcs_bucket) |
| |
| def raw_result(self) -> result_pb.RawResult: |
| return result_pb.RawResult( |
| summary_markdown=f'[artifacts]({self.link})', |
| status=common_pb.SUCCESS, |
| ) |
| |
| |
| class GcsUploadApi(recipe_api.RecipeApi): |
| """Calls to checkout code.""" |
| |
| @contextlib.contextmanager |
| def __call__( |
| self, |
| *, |
| gcs_bucket: str | None = None, |
| namespace: str | None = None, |
| parent: GcsUploadContext | None = None, |
| subdirectory: str | None = None, |
| ) -> GcsUploadContext: |
| """Enter a GcsUploadContext. See also init().""" |
| ctx: GcsUploadContext | None = None |
| try: |
| ctx = self.init( |
| gcs_bucket=gcs_bucket, |
| namespace=namespace, |
| parent=parent, |
| subdirectory=subdirectory, |
| ) |
| yield ctx |
| finally: |
| if ctx: |
| ctx.wait() |
| |
| def init( |
| self, |
| *, |
| gcs_bucket: str | None = None, |
| namespace: str | None = None, |
| parent: GcsUploadContext | None = None, |
| subdirectory: str | None = None, |
| ) -> GcsUploadContext: |
| """Create a GcsUploadContext object. |
| |
| Callers can pass gcs_bucket (with an optional namespace), or they can |
| pass parent and subdirectory. Other combinations are not allowed. |
| |
| Args: |
| gcs_bucket: Destination bucket. |
| namespace: Prefix in destination bucket. |
| parent: Parent context object. |
| subdirectory: Path relative to parent. |
| |
| Returns: |
| Context object. |
| """ |
| assert parent or gcs_bucket, f'parent={parent} gcs_bucket={gcs_bucket}' |
| |
| if parent: |
| assert subdirectory |
| assert not gcs_bucket |
| assert not namespace |
| return parent.subcontext(subdirectory) |
| |
| if gcs_bucket: |
| assert not parent |
| assert not subdirectory |
| return GcsUploadContext( |
| _api=self.m, |
| gcs_bucket=gcs_bucket, |
| namespace=namespace, |
| ) |