| # Copyright 2024 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| from __future__ import annotations |
| |
| import configparser |
| import dataclasses |
| import io |
| import re |
| from typing import Callable, Generator, Sequence, TYPE_CHECKING |
| |
| from PB.recipe_modules.pigweed.bazel.options import Options as BazelOptions |
| from recipe_engine import recipe_api |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from PB.recipe_modules.pigweed.bazel_roll.git_repository import ( |
| GitRepository, |
| ) |
| from recipe_engine import config_types |
| from RECIPE_MODULES.fuchsia.git_roll_util import api as git_roll_util_api |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| |
| |
| def nwise(iterable, n): |
| # nwise('ABCDEFG', 3) → ABC BCD CDE DEF EFG |
| # See also |
| # https://docs.python.org/3/library/itertools.html#itertools.pairwise |
| iterator = iter(iterable) |
| initial_items = [None] |
| for i in range(1, n): |
| initial_items.append(next(iterator, None)) |
| items = tuple(initial_items) |
| for x in iterator: |
| items = (*items[1:], x) |
| yield items |
| |
| |
| def trim_nearby_lines(lines: Sequence[LineProxy]) -> list[LineProxy]: |
| # If there's a blank line in the second half, remove any following lines. |
| for i in range(len(lines) // 2, len(lines)): |
| text = lines[i].text or '' |
| if not text.strip(): |
| lines = lines[0:i] |
| break |
| |
| # If there's a blank line in the first half, remove any preceding lines. |
| for i in range(len(lines) // 2, 0, -1): |
| text = lines[i].text or '' |
| if not text.strip(): |
| lines = lines[i:] |
| break |
| |
| return lines |
| |
| |
| def proximity_sort_nearby_lines( |
| lines: Sequence[LineProxy], |
| center: int, |
| ) -> list[LineProxy]: |
| # Shift the order to be center-out instead of ascending. |
| lines = [(abs(center - x.idx), x.idx, x) for x in lines] |
| return [x[-1] for x in sorted(lines)] |
| |
| |
| @dataclasses.dataclass |
| class UpdateResult: |
| old_value: str |
| project_name: str | None |
| finalize: Callable[[], None] | None = None |
| |
| |
| GIT_HASH_PATTERN = '[0-9a-fA-F]{40}' |
| |
| |
| class LineProxy: |
| def __init__(self, lines, idx): |
| self._lines = lines |
| self._idx = idx |
| |
| @property |
| def idx(self): |
| return self._idx |
| |
| @property |
| def text(self): |
| return self._lines[self._idx] |
| |
| @text.setter |
| def text(self, value): |
| self._lines[self._idx] = value |
| |
| def __repr__(self): |
| return f'{self.idx} {self.text}' |
| |
| |
| def proxy(lines): |
| return [LineProxy(lines, i) for i in range(len(lines))] |
| |
| |
| class BazelRollApi(recipe_api.RecipeApi): |
| |
| UpdateResult = UpdateResult |
| GIT_HASH_PATTERN = GIT_HASH_PATTERN |
| |
| def workspace_path( |
| self, |
| root: config_types.Path, |
| value: str, |
| ) -> config_types.Path: |
| """Figure out the location of the WORKSPACE or MODULE.bazel file. |
| |
| If value is '', look for root / 'WORKSPACE' and root / 'MODULE.bazel'. |
| If exactly one exists, return it. If not, error out. |
| |
| If root / value is a file, return it. |
| |
| If root / value is a directory, set root = root / value and apply the |
| above logic. This enables applying this logic to subdirectories. |
| |
| Args: |
| api: Recipe API object. |
| root: Checkout root. |
| value: Relative path specified in properties. |
| |
| Returns: |
| Path to the WORKSPACE or MODULE.bazel file. |
| """ |
| if value: |
| value_path = root / value |
| self.m.path.mock_add_file(value_path) |
| if self.m.path.isfile(value_path): |
| return value_path |
| elif self.m.path.isdir(value_path): # pragma: no cover |
| root = value_path |
| else: |
| self.m.step.empty( # pragma: no cover |
| f'{value_path} does not exist', |
| status='FAILURE', |
| ) |
| |
| workspace = root / 'WORKSPACE' |
| module_bazel = root / 'MODULE.bazel' |
| |
| self.m.path.mock_add_file(workspace) |
| |
| if self.m.path.isfile(module_bazel) and self.m.path.isfile(workspace): |
| self.m.step.empty( # pragma: no cover |
| f'{module_bazel} and {workspace} both exist', |
| status='FAILURE', |
| ) |
| |
| if self.m.path.isfile(module_bazel): |
| return module_bazel # pragma: no cover |
| |
| if self.m.path.isfile(workspace): |
| return workspace |
| |
| self.m.step.empty( # pragma: no cover |
| 'no WORKSPACE or MODULE.bazel file found', |
| status='FAILURE', |
| ) |
| |
| def update_git_repository( |
| self, |
| checkout: checkout_self.m.CheckoutContext, |
| git_repository: GitRepository, |
| ) -> list[git_roll_util_api.Roll]: |
| project_name = git_repository.name |
| if not project_name: |
| project_name = git_repository.remote.split('/')[-1] |
| project_name = project_name.removesuffix('.git') |
| |
| with self.m.step.nest(project_name): |
| workspace_path = self.workspace_path( |
| checkout.root, |
| git_repository.workspace_path, |
| ) |
| git_repository.branch = git_repository.branch or 'main' |
| |
| new_revision = self.m.git_roll_util.resolve_new_revision( |
| git_repository.remote, |
| git_repository.branch, |
| checkout.remotes_equivalent, |
| ) |
| |
| update_result = self.update_value( |
| project_value=git_repository.remote, |
| new_value=new_revision, |
| path=workspace_path, |
| delay_write=True, |
| search_key='remote', |
| value_key='commit', |
| value_pattern=GIT_HASH_PATTERN, |
| matches=checkout.remotes_equivalent, |
| ) |
| if not update_result: |
| self.m.step.empty( |
| 'failed to update commit hash', |
| status='FAILURE', |
| ) |
| |
| try: |
| roll = self.m.git_roll_util.get_roll( |
| repo_url=git_repository.remote, |
| repo_short_name=project_name, |
| old_rev=update_result.old_value, |
| new_rev=new_revision, |
| ) |
| |
| except self.m.git_roll_util.BackwardsRollError: |
| return [] |
| |
| update_result.finalize() |
| |
| with self.m.step.nest('update MODULE.bazel.lock'): |
| runner = self.m.bazel.new_runner( |
| checkout=checkout, |
| options=BazelOptions(), |
| ) |
| |
| with self.m.context(cwd=checkout.root): |
| self.m.step( |
| 'bazelisk mod deps --lockfile_mode=update', |
| [ |
| runner.ensure(), |
| 'mod', |
| 'deps', |
| '--lockfile_mode=update', |
| ], |
| ) |
| |
| return [roll] |
| |
| def _read(self, path: config_types.Path, num_nearby_lines: int): |
| lines = [''] * num_nearby_lines |
| lines.extend( |
| self.m.file.read_text( |
| f'read old {path.name}', |
| path, |
| test_data=self.m.bazel_roll.test_api.TEST_WORKSPACE_FILE, |
| ) |
| .strip() |
| .splitlines() |
| ) |
| lines.extend([''] * num_nearby_lines) |
| return lines |
| |
| def _get_matching_groups( |
| self, |
| lines: Sequence[str], |
| num_nearby_lines: int, |
| project_value: str, |
| search_key: str, |
| replace: bool = False, |
| matches: Callable[[str, str], bool] | None = None, |
| ) -> list[tuple[LineProxy, list[LineProxy]]]: |
| matching_groups: list[tuple[LineProxy, list[LineProxy]]] = [] |
| |
| # This is more constraining than necessary, but ensures there aren't |
| # special characters that confuse the regex. |
| assert search_key.isidentifier(), search_key |
| |
| for nearby_lines in nwise(proxy(lines), num_nearby_lines * 2 + 1): |
| curr = nearby_lines[len(nearby_lines) // 2] |
| match = re.search( |
| rf'^\s*{search_key}\s*=\s*"(?P<search>[^"]+)",?\s*$', |
| curr.text, |
| ) |
| if not match: |
| continue |
| |
| match_value = match.group('search') |
| |
| if not matches: |
| # TODO: b/376915917 - Remove no cover pragma. |
| matches = lambda x, y: x == y # pragma: no cover |
| |
| if matches(match_value, project_value): |
| pres = self.m.step.empty( |
| f'found matching {search_key} {match_value!r}' |
| ).presentation |
| pres.logs['lines'] = [repr(x) for x in nearby_lines] |
| matching_groups.append((curr, nearby_lines)) |
| |
| if replace and match_value != project_value: |
| curr.text = curr.text.replace(match_value, project_value) |
| |
| else: |
| pres = self.m.step.empty( |
| f'found other {search_key} {match_value!r}' |
| ).presentation |
| pres.logs['lines'] = [repr(x) for x in nearby_lines] |
| |
| return matching_groups |
| |
| def _process_nearby_lines(self, nearby_lines): |
| pres = self.m.step.empty('lines').presentation |
| center = nearby_lines[len(nearby_lines) // 2].idx |
| pres.logs['0_center'] = [str(center)] |
| |
| pres.logs['1_orig'] = [repr(x) for x in nearby_lines] |
| |
| nearby_lines = trim_nearby_lines(nearby_lines) |
| pres.logs['2_trimmed'] = [repr(x) for x in nearby_lines] |
| |
| nearby_lines = proximity_sort_nearby_lines(nearby_lines, center) |
| pres.logs['3_sorted'] = [repr(x) for x in nearby_lines] |
| |
| return nearby_lines |
| |
| def retrieve_attributes( |
| self, |
| project_value: str, |
| path: config_types.Path, |
| num_nearby_lines: int = 10, |
| title_keys: str | Sequence[str] = ('name', 'module_name'), |
| matches: Callable[[str, str], bool] | None = None, |
| ) -> list[dict[str, str]]: |
| lines = self._read(path, num_nearby_lines) |
| |
| assert isinstance(title_keys, (list, tuple)), repr(title_keys) |
| |
| matching_groups = self._get_matching_groups( |
| lines=lines, |
| num_nearby_lines=num_nearby_lines, |
| project_value=project_value, |
| search_key='remote', |
| matches=matches, |
| ) |
| |
| results: list[dict[str, str]] = [] |
| for _, nearby_lines in matching_groups: |
| entry = {} |
| nearby_lines = self._process_nearby_lines(nearby_lines) |
| |
| for line in nearby_lines: |
| if match := re.search( |
| r'^\s*([\w_]+)\s*=\s*(\S.*),\s*$', |
| line.text, |
| ): |
| entry[match.group(1)] = match.group(2).strip('"') |
| if match.group(1) in title_keys: |
| entry['name'] = match.group(2).strip('"') |
| |
| if entry: |
| results.append(entry) |
| |
| return results |
| |
| def update_value( |
| self, |
| *, |
| project_value: str, |
| new_value: str, |
| path: config_types.Path, |
| search_key: str, |
| value_key: str, |
| num_nearby_lines: int = 10, |
| replace: bool = False, |
| delay_write: bool = False, |
| title_keys: Sequence[str] = ('name', 'module_name'), |
| value_pattern: str | None = None, |
| matches: Callable[[str, str], bool] | None = None, |
| ) -> UpdateResult | None: |
| lines = self._read(path, num_nearby_lines) |
| |
| matching_groups = self._get_matching_groups( |
| lines=lines, |
| num_nearby_lines=num_nearby_lines, |
| project_value=project_value, |
| replace=replace, |
| search_key=search_key, |
| matches=matches, |
| ) |
| |
| if not matching_groups: |
| self.m.step.empty( |
| f'could not find {search_key} {project_value} in {path}', |
| ) |
| return None |
| |
| project_names: list[str] = [] |
| if value_pattern is None: |
| # TODO: b/376915917 - Remove no cover pragma. |
| value_pattern = '[^"]+' # pragma: no cover |
| |
| for matching_line, nearby_lines in matching_groups: |
| nearby_lines = self._process_nearby_lines(nearby_lines) |
| |
| value_rx = re.compile( |
| rf'^(?P<prefix>\s*{value_key}\s*=\s*")' |
| rf'(?P<value>{value_pattern})' |
| r'(?P<suffix>",?\s*)$' |
| ) |
| |
| for line in nearby_lines: |
| if match := value_rx.search(line.text): |
| idx = line.idx |
| break |
| else: |
| pres = self.m.step.empty( |
| f'could not find {value_key} line adjacent to ' |
| f'{matching_line.text!r} in {path}', |
| ).presentation |
| pres.step_summary_text = repr(nearby_lines) |
| return None |
| |
| old_value = match.group('value') |
| |
| prefix = match.group("prefix") |
| suffix = match.group("suffix") |
| lines[idx] = f'{prefix}{new_value}{suffix}' |
| |
| # Remove all existing metadata lines in this git_repository() entry. |
| idx2 = idx - 1 |
| while lines[idx2].strip().startswith('# ROLL: '): |
| lines[idx2] = None |
| idx2 -= 1 |
| |
| ws_prefix = re.search(r'^\s*', prefix).group(0) |
| comment_prefix = f'{ws_prefix}# ROLL: ' |
| |
| now = self.m.time.utcnow().strftime('%Y-%m-%d') |
| comment_lines = ( |
| f'{comment_prefix}Warning: this entry is automatically ' |
| 'updated.', |
| f'{comment_prefix}Last updated {now}.', |
| f'{comment_prefix}By {self.m.buildbucket.build_url()}.', |
| ) |
| |
| lines[idx] = '\n'.join(comment_lines + (lines[idx],)) |
| |
| assert isinstance(title_keys, (list, tuple)), repr(title_keys) |
| |
| # This is more restrictive than necessary but helps keep the regex |
| # simple. |
| for key in title_keys: |
| assert isinstance(key, str), repr(key) |
| assert key.isidentifier(), repr(key) |
| |
| for line in nearby_lines: |
| keys = "|".join(title_keys) |
| if match := re.search( |
| rf'^\s*(?:{keys})\s*=\s*"(?P<value>[^"]+)",?\s*$', |
| line.text or '', |
| ): |
| project_names.append(match.group('value')) |
| break |
| |
| def write(): |
| self.m.file.write_text( |
| f'write new {path.name}', |
| path, |
| ''.join( |
| f'{x}\n' |
| for x in lines[num_nearby_lines:-num_nearby_lines] |
| if x is not None |
| ), |
| ) |
| |
| if delay_write: |
| return UpdateResult( |
| old_value=old_value, |
| project_name=', '.join(project_names), |
| finalize=write, |
| ) |
| |
| else: |
| write() |
| |
| return UpdateResult( |
| old_value=old_value, |
| project_name=', '.join(project_names), |
| ) |