| # Copyright 2021 The Pigweed Authors | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); you may not | 
 | # use this file except in compliance with the License. You may obtain a copy of | 
 | # the License at | 
 | # | 
 | #     https://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
 | # License for the specific language governing permissions and limitations under | 
 | # the License. | 
 | """Do different things based on the status of builders in CI.""" | 
 |  | 
 | import re | 
 | from typing import Sequence | 
 |  | 
 | from recipe_engine import recipe_api | 
 | from PB.go.chromium.org.luci.buildbucket.proto import common | 
 | from PB.recipe_engine import result | 
 |  | 
 |  | 
 | class CiStatusApi(recipe_api.RecipeApi): | 
 |     """Calls to build code.""" | 
 |  | 
 |     def transform_bucket_name(self, bucket: str | None = None) -> str: | 
 |         if bucket is None: | 
 |             bucket = self.m.buildbucket.build.builder.bucket | 
 |         parts: Sequence[str] = bucket.split('.') | 
 |         parts = ['ci' if x == 'try' else x for x in parts] | 
 |         parts = [x for x in parts if x != 'shadow'] | 
 |         return '.'.join(parts) | 
 |  | 
 |     def exit_early_in_recipe_testing_if_failing( | 
 |         self, | 
 |     ) -> result.RawResult | None: | 
 |         if not self.m.recipe_testing.enabled: | 
 |             return None | 
 |  | 
 |         with self.m.step.nest('checking CI status'): | 
 |             bucket: str = self.transform_bucket_name() | 
 |  | 
 |             status: self.m.builder_status.BuilderStatus | 
 |             status = self.m.builder_status.retrieve( | 
 |                 bucket=bucket, | 
 |                 include_incomplete=False, | 
 |             ) | 
 |             if self.m.builder_status.has_recently_passed(status): | 
 |                 self.m.step('builder recently passed in CI', None) | 
 |                 return None | 
 |  | 
 |             full_name: str = '/'.join((status.project, bucket, status.builder)) | 
 |             return result.RawResult( | 
 |                 summary_markdown=( | 
 |                     f'Exiting early since [{full_name}]({status.link}) has not ' | 
 |                     'recently passed.' | 
 |                 ), | 
 |                 status=common.SUCCESS, | 
 |             ) |