blob: 710cee0f9cf5bf8d41eb9b9557fa105df3d912e5 [file] [log] [blame] [edit]
# Copyright 2025 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""This module provides an API for running 'pw presubmit'.
The `pw presubmit` tool is used to run a series of checks and tests on a
codebase to ensure that it meets certain quality standards before changes are
submitted. This recipe module wraps the execution of `pw presubmit`, providing
functionality to:
- Initialize a presubmit context with various options.
- Discover available presubmit steps and programs.
- Run specific presubmit steps or entire programs.
- Handle logging, artifact uploading (e.g., to CIPD), and Gerrit commenting.
- Process metadata generated by presubmit steps.
"""
from __future__ import annotations
import contextlib
import dataclasses
from recipe_engine import config_types, recipe_api, util
from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb
from PB.recipe_engine import result as result_pb
from RECIPE_MODULES.recipe_engine.futures import api as futures_api
@dataclasses.dataclass
class GcsUploadContext:
_api: recipe_api.RecipeApi
gcs_bucket: str
namespace: str | None
_futures: list[futures_api.Future] = dataclasses.field(default_factory=list)
def __post_init__(self):
if not self.namespace:
self.namespace = self._api.buildbucket.build.id
def subcontext(self, subdirectory: str) -> GcsUploadContext:
return GcsUploadContext(
_api=self._api,
gcs_bucket=self.gcs_bucket,
namespace=f'{self.namespace}/{subdirectory.strip("/")}',
_futures=self._futures,
)
def upload(
self,
source: config_types.Path | util.InputPlaceholder,
dest: str,
) -> None:
with self._api.step.nest(f'upload {dest}'):
if not self._api.path.exists(source):
self._api.step.empty('does not exist') # pragma: no cover
elif self._api.path.isfile(source):
self._api.step.empty('file')
# Copy any files to another location immediately. Future steps
# might reuse the same output directory.
temp = self._api.path.mkdtemp() / source.name
self._api.file.copy('copy', source, temp)
self._futures.append(
self._api.futures.spawn(
self._api.gsutil.upload_namespaced_file,
bucket=self.gcs_bucket,
namespace=self.namespace,
source=temp,
subpath=dest,
),
)
elif self._api.path.isdir(source):
self._api.step.empty('directory')
# Copy any files to another location immediately. Future steps
# might reuse the same output directory.
temp = self._api.path.mkdtemp() / source.name
self._api.file.copytree('copytree', source, temp)
self._futures.append(
self._api.futures.spawn(
self._api.file.listdir,
'ls',
temp,
recursive=True,
)
)
self._futures.append(
self._api.futures.spawn(
self._api.gsutil.upload_namespaced_directory,
bucket=self.gcs_bucket,
namespace=self.namespace,
source=temp,
subpath=dest,
),
)
def wait(self):
self._api.futures.wait(self._futures)
@property
def link(self):
return self._api.gsutil.namespaced_directory_url(self.gcs_bucket)
def raw_result(self) -> result_pb.RawResult:
return result_pb.RawResult(
summary_markdown=f'[artifacts]({self.link})',
status=common_pb.SUCCESS,
)
class GcsUploadApi(recipe_api.RecipeApi):
"""Calls to checkout code."""
@contextlib.contextmanager
def __call__(
self,
*,
gcs_bucket: str | None = None,
namespace: str | None = None,
parent: GcsUploadContext | None = None,
subdirectory: str | None = None,
) -> GcsUploadContext:
"""Enter a GcsUploadContext. See also init()."""
ctx: GcsUploadContext | None = None
try:
ctx = self.init(
gcs_bucket=gcs_bucket,
namespace=namespace,
parent=parent,
subdirectory=subdirectory,
)
yield ctx
finally:
if ctx:
ctx.wait()
def init(
self,
*,
gcs_bucket: str | None = None,
namespace: str | None = None,
parent: GcsUploadContext | None = None,
subdirectory: str | None = None,
) -> GcsUploadContext:
"""Create a GcsUploadContext object.
Callers can pass gcs_bucket (with an optional namespace), or they can
pass parent and subdirectory. Other combinations are not allowed.
Args:
gcs_bucket: Destination bucket.
namespace: Prefix in destination bucket.
parent: Parent context object.
subdirectory: Path relative to parent.
Returns:
Context object.
"""
assert parent or gcs_bucket, f'parent={parent} gcs_bucket={gcs_bucket}'
if parent:
assert subdirectory
assert not gcs_bucket
assert not namespace
return parent.subcontext(subdirectory)
if gcs_bucket:
assert not parent
assert not subdirectory
return GcsUploadContext(
_api=self.m,
gcs_bucket=gcs_bucket,
namespace=namespace,
)