| # Copyright 2025 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """This module checks for approvals on Gerrit changes.""" |
| |
| from __future__ import annotations |
| |
| import contextlib |
| from collections.abc import Sequence |
| |
| from recipe_engine import recipe_api |
| |
| from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb |
| from PB.recipe_engine import result as result_pb |
| |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| |
| _TRUSTED_DOMAINS = ( |
| 'google.com', |
| 'pigweed-service-accounts.iam.gserviceaccount.com', |
| 'pw-internal-service-accounts.iam.gserviceaccount.com', |
| ) |
| |
| |
| class LimitedCqApi(recipe_api.RecipeApi): |
| def _change_approved(self, change: checkout_api.Change) -> bool: |
| with self.m.step.nest(change.name): |
| for req, status in change.submit_requirements.items(): |
| if req in ('Code-Owners', 'Code-Review', 'Review-Enforcement'): |
| if status in ('SATISFIED', 'OVERRIDDEN'): |
| with self.m.step.nest('continue'): |
| pres = self.m.step.empty(req).presentation |
| pres.step_summary_text = status |
| return True |
| |
| if not change.uploader: |
| return False # pragma: no cover |
| |
| _, domain = change.uploader.split('@') |
| if domain in _TRUSTED_DOMAINS: |
| with self.m.step.nest('continue'): |
| pres = self.m.step.empty('uploader').presentation |
| pres.step_summary_text = change.uploader |
| return True |
| |
| return False |
| |
| def exit_early_if_not_approved( |
| self, |
| changes: Sequence[checkout_api.Change], |
| *, |
| verbose: bool = False, |
| ) -> result_pb.RawResult | None: |
| """Exit the build early if the change isn't sufficiently approved. |
| |
| Continue the build (by returning None) if any of the following |
| conditions are met: |
| * The build is in the pigweed LUCI project |
| * The build is not a tryjob |
| * The change has one of the following submit requirements met: |
| * Code-Review (somebody voted CR+2, only Googlers can do this) |
| * Review-Enforcement (meaning two Googlers approved it) |
| * Code-Owners (one Googler approved it or it's a pure revert) |
| * The change owner is a Googler or a trusted service account |
| * The CQ triggerer is a Googler or a trusted service account |
| |
| Otherwise, return a RawResult. The caller will also return the |
| RawResult, exiting the build. In dry run mode, the result will be a |
| success. In full run mode, it will be a failure. |
| |
| If returning a RawResult, disallow reuse of this build in future dry |
| runs and full runs. |
| """ |
| with contextlib.ExitStack() as stack: |
| build_proto = self.m.buildbucket.build |
| |
| # If verbose, make sure everything is under this nested step. |
| if verbose: |
| stack.enter_context( |
| self.m.step.nest('checking change approvals'), |
| ) |
| |
| # If this builder is in a public project, do nothing and continue. |
| project = build_proto.builder.project |
| if project == 'pigweed': |
| if verbose: |
| pres = self.m.step.empty('project is public').presentation |
| pres.step_summary_text = project |
| return None |
| |
| bucket = build_proto.builder.bucket |
| if not self.m.buildbucket_util.is_tryjob: |
| if verbose: |
| pres = self.m.step.empty('not a tryjob').presentation |
| pres.step_summary_text = bucket |
| return None |
| |
| # If not verbose, only make this nested step appear if there's |
| # something to show under it. (If we got here, there will always be |
| # a step created.) |
| if not verbose: |
| stack.enter_context( # pragma: no cover |
| self.m.step.nest('checking change approvals'), |
| ) |
| |
| # If any submit requirements that indicate Googlers have approved |
| # the change, let the build continue. If there are multiple changes, |
| # make sure all of them are approved. |
| if all(self._change_approved(change) for change in changes): |
| return None |
| |
| # If CQ was triggered by a Googler then allow the build to continue. |
| for tag_pair in build_proto.tags: |
| tag, value = tag_pair.key, tag_pair.value |
| if tag == 'cq_triggerer': |
| _, domain = value.split('@') |
| if domain in _TRUSTED_DOMAINS: |
| with self.m.step.nest('continue'): |
| pres = self.m.step.empty('triggerer').presentation |
| pres.step_summary_text = value |
| return None |
| |
| # If the build was launched by a Googler outside of CV allow the |
| # build to continue. |
| created_by = build_proto.created_by |
| if '@' in created_by: |
| _, domain = created_by.removeprefix('user:').split('@') |
| if domain in _TRUSTED_DOMAINS: |
| with self.m.step.nest('continue'): |
| pres = self.m.step.empty('created by').presentation |
| pres.step_summary_text = created_by |
| return None |
| |
| # If we got here, the change is not approved and we cannot continue. |
| self.m.step.empty('exit early') |
| |
| # Only allow reuse for new patchset runs. Really, this is |
| # disallowing reuse for dry runs and full runs, so short-circuited |
| # builds won't be treated as passing builds in later CQ+1/+2 runs. |
| # (Pigweed makes limited use of new patchset runs.) |
| self.m.cv.allow_reuse_for(self.m.cv.NEW_PATCHSET_RUN) |
| |
| status: common_pb.Status = common_pb.SUCCESS |
| # If CV is not active, any builds that exit early should fail. |
| if not self.m.cv.active: |
| status = common_pb.FAILURE |
| |
| # If we got here, the build will not continue. It shouldn't be |
| # possible to get here via CQ+2 without submit requirements being |
| # met, but just in case we'll fail those builders. |
| elif self.m.cv.run_mode == self.m.cv.FULL_RUN: |
| status = common_pb.FAILURE |
| |
| return result_pb.RawResult( |
| summary_markdown=( |
| 'Exiting early since not all triggering changes have been ' |
| 'approved.' |
| ), |
| status=status, |
| ) |