| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Bazel-related functions.""" |
| |
| from __future__ import annotations |
| |
| import dataclasses |
| import re |
| from typing import Any, Sequence, TypeVar, TYPE_CHECKING |
| |
| from PB.recipe_modules.pigweed.bazel.options import Options |
| from recipe_engine import recipe_api |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from recipe_engine import config_types |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| |
| |
| @dataclasses.dataclass |
| class BazelRunner: |
| api: recipe_api.RecipeApi |
| checkout_root: config_types.Path |
| options: Options |
| _bazel: config_types.Path | None = None |
| |
| def _ensure_bazelisk(self) -> config_types.Path: |
| ensure_file = self.api.cipd.EnsureFile() |
| ensure_file.add_package( |
| 'fuchsia/third_party/bazelisk/${platform}', |
| self.options.bazelisk_version or 'latest', |
| ) |
| |
| root = self.api.path.mkdtemp() |
| self.api.cipd.ensure(root, ensure_file, name='ensure bazelisk') |
| return root / 'bazelisk' |
| |
| def ensure(self) -> config_types.Path: |
| if self._bazel: |
| return self._bazel |
| |
| self._bazel = self._ensure_bazelisk() |
| |
| self.api.step('bazel version', [self._bazel, 'version']) |
| return self._bazel |
| |
| def run(self, **kwargs) -> None: |
| name: str = ' '.join(['bazel'] + list(self.options.args)) |
| with self.api.context(cwd=self.checkout_root): |
| if self.options.args: |
| self.api.step( |
| name, [self.ensure(), *self.options.args], **kwargs |
| ) |
| |
| for invocation in self.options.invocations: |
| assert invocation.args |
| name: str = ' '.join(['bazel'] + list(invocation.args)) |
| self.api.step(name, [self.ensure(), *invocation.args], **kwargs) |
| |
| |
| def nwise(iterable, n): |
| # nwise('ABCDEFG', 3) → ABC BCD CDE DEF EFG |
| # See also |
| # https://docs.python.org/3/library/itertools.html#itertools.pairwise |
| iterator = iter(iterable) |
| initial_items = [None] |
| for i in range(1, n): |
| initial_items.append(next(iterator, None)) |
| items = tuple(initial_items) |
| for x in iterator: |
| items = (*items[1:], x) |
| yield items |
| |
| |
| T = TypeVar('T') |
| |
| |
| def proximity_sort_nearby_lines(lines: Sequence[T]) -> list[T]: |
| # Shift the order to be center-out instead of ascending. |
| lines = [(abs(len(lines) // 2 - i), x) for i, x in enumerate(lines)] |
| return [x[1] for x in sorted(lines)] |
| |
| |
| @dataclasses.dataclass |
| class UpdateCommitHashResult: |
| old_revision: str |
| project_name: str | None |
| |
| |
| class BazelApi(recipe_api.RecipeApi): |
| """Bazel utilities.""" |
| |
| BazelRunner = BazelRunner |
| UpdateCommitHashResult = UpdateCommitHashResult |
| |
| def new_runner( |
| self, |
| checkout: checkout_api.CheckoutContext, |
| options: Options | None, |
| ) -> BazelRunner: |
| return BazelRunner(self.m, checkout.root, options) |
| |
| def update_commit_hash( |
| self, |
| *, |
| checkout: checkout_api.CheckoutContext, |
| project_remote: str, |
| new_revision: str, |
| num_nearby_lines: int = 6, |
| path: config_types.Path | None, |
| ) -> UpdateCommitHashResult: |
| if not path: |
| path = checkout.root / 'WORKSPACE' |
| |
| lines = [''] * num_nearby_lines |
| lines.extend( |
| self.m.file.read_text( |
| f'read old {path.name}', |
| path, |
| test_data=self.m.bazel.test_api.TEST_WORKSPACE_FILE, |
| ) |
| .strip() |
| .splitlines() |
| ) |
| lines.extend([''] * num_nearby_lines) |
| |
| for nearby_lines in nwise(enumerate(lines), num_nearby_lines * 2 + 1): |
| i, curr = nearby_lines[len(nearby_lines) // 2] |
| match = re.search( |
| r'^\s*remote\s*=\s*"(?P<remote>[^"]+)",?\s*$', curr |
| ) |
| if not match: |
| continue |
| |
| match_remote = match.group('remote') |
| |
| step = self.m.step.empty(f'found remote {match_remote!r}') |
| if checkout.remotes_equivalent(match_remote, project_remote): |
| step.presentation.step_summary_text = 'equivalent' |
| break |
| step.presentation.step_summary_text = 'not equivalent' |
| |
| else: |
| self.m.step.empty( |
| f'could not find remote {project_remote} in {path}', |
| status='FAILURE', |
| ) |
| |
| nearby_lines = proximity_sort_nearby_lines(nearby_lines) |
| |
| commit_rx = re.compile( |
| r'^(?P<prefix>\s*commit\s*=\s*")' |
| r'(?P<commit>[0-9a-f]{40})' |
| r'(?P<suffix>",?\s*)$' |
| ) |
| |
| for i, line in nearby_lines: |
| if match := commit_rx.search(line): |
| idx = i |
| break |
| else: |
| self.m.step.empty( |
| f'could not find commit line adjacent to {curr!r} in {path}', |
| status='FAILURE', |
| ) |
| |
| old_revision = match.group('commit') |
| |
| prefix = match.group("prefix") |
| suffix = match.group("suffix") |
| lines[idx] = f'{prefix}{new_revision}{suffix}' |
| |
| # Remove all existing metadata lines in this git_repository() entry. |
| idx2 = idx - 1 |
| while lines[idx2].strip().startswith('# ROLL: '): |
| lines[idx2] = None |
| idx2 -= 1 |
| |
| ws_prefix = re.search(r'^\s*', prefix).group(0) |
| comment_prefix = f'{ws_prefix}# ROLL: ' |
| |
| now = self.m.time.utcnow().strftime('%Y-%m-%d') |
| comment_lines = ( |
| f'{comment_prefix}Warning: this entry is automatically updated.', |
| f'{comment_prefix}Last updated {now}.', |
| f'{comment_prefix}By {self.m.buildbucket.build_url()}.', |
| ) |
| |
| # We no longer need idx to be consistent so it's ok to insert lines. |
| lines.insert(idx, '\n'.join(comment_lines)) |
| |
| project_name: str | None = None |
| for _, line in nearby_lines: |
| if match := re.search( |
| r'^\s*name\s*=\s*"(?P<name>[^"]+)",?\s*$', line |
| ): |
| project_name = match.group('name') |
| break |
| |
| self.m.file.write_text( |
| f'write new {path.name}', |
| path, |
| ''.join( |
| f'{x}\n' |
| for x in lines[num_nearby_lines:-num_nearby_lines] |
| if x is not None |
| ), |
| ) |
| |
| return UpdateCommitHashResult( |
| old_revision=old_revision, |
| project_name=project_name, |
| ) |