| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Recipe module for managing and rolling CIPD package versions. |
| |
| This module provides the `CipdRollApi` for automating the update of CIPD |
| package versions, typically defined in a JSON manifest file (e.g., |
| `pw_env_setup/py/pw_env_setup/cipd_setup/pigweed.json`). |
| Key functionalities include: |
| - Reading CIPD package definitions from a JSON file. |
| - Determining the latest available version for a package based on a specified |
| ref (e.g., 'latest') and tag (e.g., 'git_revision'). |
| - Handling platform-specific package variants and finding common versions |
| across all specified platforms. |
| - Optionally allowing for mismatched refs across platforms if a common tag |
| can still be identified. |
| - Verifying that new versions are not pre-releases and represent a forward roll. |
| - Updating the version tag in the JSON manifest file. |
| - Generating `Roll` objects for standardized commit messages. |
| """ |
| |
| from __future__ import annotations |
| |
| import collections |
| import dataclasses |
| import datetime |
| import json |
| import re |
| from collections.abc import Sequence |
| from typing import TYPE_CHECKING |
| |
| from recipe_engine import recipe_api |
| |
| from PB.recipe_modules.pigweed.cipd_roll.package import Package |
| |
| from RECIPE_MODULES.fuchsia.roll_commit_message import ( |
| api as roll_commit_message_api, |
| ) |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| |
| |
| @dataclasses.dataclass(order=True) |
| class Roll(roll_commit_message_api.BaseRoll): |
| """Represents a roll of a CIPD package version. |
| |
| Attributes: |
| package_name: The human-readable name of the CIPD package. |
| package_spec: The CIPD package specification string (e.g., |
| 'foo/bar/${platform}'). |
| old_version: The previous version tag (e.g., 'git_revision:abcdef'). |
| new_version: The new version tag the package is being rolled to. |
| """ |
| |
| package_name: str |
| package_spec: str |
| old_version: str |
| new_version: str |
| |
| def short_name(self) -> str: |
| """Returns a short name for the roll, typically the package name.""" |
| return self.package_name |
| |
| def message_header(self, force_summary_version: bool = False) -> str: |
| """Generates the header for the commit message. |
| |
| Args: |
| force_summary_version: If True, a more concise header is generated. |
| |
| Returns: |
| The commit message header string. |
| """ |
| del force_summary_version # Unused. |
| return self.package_name |
| |
| def message_body( |
| self, |
| *, |
| force_summary_version: bool = False, |
| escape_tags: Sequence[str] = (), |
| filter_tags: Sequence[str] = (), |
| ) -> str: |
| """Generates the body of the commit message. |
| |
| Args: |
| force_summary_version: If True, a more concise body is generated. |
| escape_tags: A sequence of tags to escape in the message. |
| filter_tags: A sequence of tags to filter from the message. |
| |
| Returns: |
| The commit message body string, indicating the old and new versions. |
| """ |
| del force_summary_version, escape_tags, filter_tags # Unused. |
| return f'From {self.old_version}\nTo {self.new_version}' |
| |
| def message_footer(self, *, send_comment: bool) -> str: |
| """Generates the footer for the commit message. |
| |
| Args: |
| send_comment: If True, indicates a comment should be sent. |
| |
| Returns: |
| An empty string, as CIPD rolls typically don't have footers. |
| """ |
| del send_comment # Unused. |
| return '' |
| |
| def output_property(self) -> dict[str, str]: |
| """Generates a dictionary of properties representing the roll. |
| |
| Returns: |
| A dictionary containing details of the roll, such as package name, |
| spec, old version, and new version. |
| """ |
| return { |
| 'name': self.package_name, |
| 'spec': self.package_spec, |
| 'old': self.old_version, |
| 'new': self.new_version, |
| } |
| |
| |
| class CipdRollApi(recipe_api.RecipeApi): |
| """API for managing and rolling CIPD package versions in a JSON file.""" |
| |
| Roll = Roll |
| |
| def is_platform(self, part: str) -> bool: |
| """Return true if a string segment looks like a CIPD platform specifier. |
| |
| Helps distinguish parts of a package path that define the platform |
| (e.g., "linux-amd64", "${platform}") from actual package name parts. |
| |
| Args: |
| part: A segment of a CIPD package path. |
| |
| Returns: |
| True if the part appears to be a platform specifier, False |
| otherwise. |
| """ |
| if '{' in part: |
| return True |
| |
| try: |
| os, arch = part.split('-') |
| return os in ('linux', 'mac', 'windows') |
| except ValueError: |
| return False |
| |
| def package_name(self, path: str) -> str: |
| """Extracts a likely package name from a CIPD package path. |
| |
| For example, 'foo/bar/${platform}' or 'foo/bar/${os=mac}-${arch}' |
| would result in 'bar'. |
| |
| Args: |
| path: The CIPD package path string. |
| |
| Returns: |
| The extracted package name. |
| """ |
| parts = path.split('/') |
| if self.is_platform(parts[-1]): |
| parts.pop() |
| return parts[-1] |
| |
| def find_shared_tags( |
| self, |
| package_tags: dict[str, set[str]], |
| tag: str, |
| ) -> set[str]: |
| """Attempts to find a version tag shared by all platform-specific packages. |
| |
| This is used when `allow_mismatched_refs` is true and the initial |
| `cipd describe` calls (based on a common `ref`) yield packages with |
| no common version tag (e.g., `git_revision:xxx`). |
| |
| The goal is to find a `tag_candidate` (e.g., `git_revision:yyy`) such |
| that: |
| 1. For *some* platform, the package instance at the desired `ref` has |
| `tag_candidate`. |
| 2. A package instance with `tag_candidate` exists for *all* platforms. |
| |
| Args: |
| package_tags: A dictionary mapping platform-specific package paths |
| to a set of their version tags (e.g., `{'pkg/linux-amd64': |
| {'git_rev:1'}, 'pkg/mac-amd64': {'git_rev:2'}}`). These are the |
| tags from packages resolved via the `ref`. |
| tag: The base tag key (e.g., "git_revision"). |
| |
| Returns: |
| A set of shared version tags if successful, or an empty set if no |
| such common tag can be found. |
| """ |
| # Find the most common tags. We use the sorted dict keys for |
| # determinism. |
| package_paths = sorted(package_tags.keys()) |
| counter = collections.Counter() |
| for path in package_paths: |
| counter.update(package_tags[path]) |
| most_common_tags = counter.most_common() |
| |
| with self.m.step.nest('find shared tag'): |
| for tag_candidate, _ in most_common_tags: |
| # There is at least one package for which the version with the |
| # specified 'ref' does not have this tag. See if there exists a |
| # version of this package that *does* have this tag. If so, use |
| # that version. |
| updated_tags = dict() |
| for package_path in package_paths: |
| if tag_candidate in package_tags[package_path]: |
| # For this package we already have a version with this |
| # tag, nothing to do. |
| continue |
| try: |
| package_data = self.m.cipd.describe( |
| package_path, tag_candidate |
| ) |
| except self.m.step.StepFailure: |
| # No luck: there exists no version with this tag. |
| break |
| updated_tags[package_path] = set( |
| x.tag |
| for x in package_data.tags |
| if x.tag.startswith(tag + ':') |
| ) |
| |
| else: |
| # We found a version of each package with the tag_candidate. |
| merged_tags = dict() |
| merged_tags.update(package_tags) |
| merged_tags.update(updated_tags) |
| tags = set.intersection(*merged_tags.values()) |
| # Should always succeed. |
| assert len(tags) > 0 |
| # Update package_tags to be consistent with the returned |
| # tags. |
| package_tags.update(updated_tags) |
| return tags |
| |
| # We failed to find any tag that meets our criteria. |
| return set() |
| |
| def select_version( |
| self, |
| spec: str, |
| ref: str, |
| tag: str, |
| allow_mismatched_refs: bool, |
| platforms: Sequence[str], |
| old_version: str | None, |
| ) -> str | None: |
| """Selects the new version tag for a CIPD package roll. |
| |
| This function determines the appropriate new version tag for a package |
| based on its specification, desired ref, tag key, and existing |
| platforms. It handles cases with platform-specific packages, checks for |
| common tags, and optionally allows for mismatched refs across |
| platforms. It also verifies that the new version is not a pre-release |
| and is newer than the old version. |
| |
| Args: |
| spec: The CIPD package specification (e.g., |
| 'foo/bar/${platform}'). |
| ref: The CIPD ref to query (e.g., 'latest', 'git_revision:abcdef'). |
| tag: The tag key to use for versioning (e.g., 'git_revision', |
| 'version'). |
| allow_mismatched_refs: If True, allows finding a common tag even if |
| not all platforms share the exact same ref initially. |
| platforms: A list of platform strings (e.g., ['linux-amd64', |
| 'mac-amd64']). |
| old_version: The current version tag of the package, if known. |
| |
| Returns: |
| The selected new version tag as a string, or None if the package is |
| already up-to-date or if a roll would be backwards. |
| |
| Raises: |
| StepFailure: If no common tags can be found across platforms, or if |
| a pre-release version is detected. |
| """ |
| base, name = spec.rstrip('/').rsplit('/', 1) |
| package_paths: list[str] |
| if self.is_platform(name): |
| package_paths = [f'{base}/{x}' for x in platforms] |
| else: |
| package_paths = [spec] |
| |
| package_tags = {} |
| package_data = {} |
| tags = None |
| for package_path in package_paths: |
| try: |
| package_data[package_path] = self.m.cipd.describe( |
| package_path, |
| ref, |
| ) |
| |
| except self.m.step.StepFailure: |
| # If we got here this package doesn't have the correct ref. |
| # This is likely because it's a new platform for an existing |
| # package. In that case ignore this platform when checking |
| # that refs agree on package versions. We still need at |
| # least one platform to have the ref or the checks below |
| # will fail. |
| pass |
| |
| else: |
| package_tags[package_path] = set( |
| x.tag |
| for x in package_data[package_path].tags |
| if x.tag.startswith(tag + ':') |
| ) |
| if tags is None: |
| tags = set(package_tags[package_path]) |
| else: |
| tags.intersection_update(package_tags[package_path]) |
| |
| if not tags and allow_mismatched_refs: |
| # The package with the requested ref has non-overlapping tag |
| # values for different platforms. Try relaxing the requirement |
| # that all packages come from the same ref, and see if this |
| # allows us to find a set with shared tag values. |
| tags = self.find_shared_tags(package_tags, tag) |
| |
| with self.m.step.nest('common tags') as presentation: |
| presentation.step_summary_text = '\n'.join(sorted(tags)) |
| |
| if not tags: |
| err_lines = [f'no common tags across "{ref}" refs of packages'] |
| for pkg_path, pkg_tags in sorted(package_tags.items()): |
| err_lines.append('') |
| err_lines.append( |
| f'[{pkg_path}](http://chrome-infra-packages.appspot.com' |
| f'/p/{pkg_path})' |
| ) |
| for tag in pkg_tags: |
| err_lines.append(tag) |
| |
| raise self.m.step.StepFailure('<br>'.join(err_lines)) |
| |
| # Deterministically pick one of the common tags. |
| new_version = sorted(tags)[0] |
| |
| version_part = new_version.split(':', 1)[1] |
| match = re.search( |
| r'(?:\d|\b)(rc|pre|beta|alpha)(?:\d|\b)', |
| version_part, |
| ) |
| if match: |
| raise self.m.step.StepFailure( |
| f'found pre-release indicator {match.group(1)!r} in ' |
| f'{version_part!r}' |
| ) |
| |
| # Verify there's only one instance of each platform package with |
| # this tag. |
| with self.m.step.nest('check number of instances'): |
| for package_path in package_paths: |
| self.m.cipd.describe(package_path, new_version) |
| |
| with self.m.step.nest('new version') as presentation: |
| presentation.step_summary_text = new_version |
| presentation.logs['old version'] = old_version |
| |
| if old_version and ':' in old_version: |
| with self.m.step.nest('checking against old version'): |
| if old_version in tags: |
| pres = self.m.step.empty('already up-to-date').presentation |
| pres.step_summary_text = ( |
| f'current version {old_version} in common tags' |
| ) |
| return None |
| |
| for package_path in package_paths: |
| with self.m.step.nest(package_path): |
| old_package_data = self.m.cipd.describe( |
| package_path, |
| old_version, |
| ) |
| |
| old_ts = datetime.datetime.fromtimestamp( |
| old_package_data.registered_ts, |
| ) |
| new_ts = datetime.datetime.fromtimestamp( |
| package_data[package_path].registered_ts, |
| ) |
| |
| if old_ts > new_ts: |
| pres = self.m.step.empty( |
| 'new package is older than current package', |
| ).presentation |
| pres.step_summary_text = ( |
| f'* current: {old_ts.isoformat()}\n' |
| f'* new: {new_ts.isoformat()}\n' |
| ) |
| return None |
| |
| # Verify that the specified packages at this version were uploaded by a |
| # service account, raising a StepFailure exception otherwise. |
| self.m.cipd_util.check_uploader(package_paths, new_version) |
| |
| return new_version |
| |
| def update_package( |
| self, |
| checkout: checkout_api.CheckoutContext, |
| pkg: Package, |
| ) -> list[Roll]: |
| """Updates a CIPD package definition in a JSON file to a new version. |
| |
| This method reads a JSON file (typically `pigweed.json`) that defines |
| CIPD packages. It finds the specified package, determines the new |
| version using `select_version`, and if an update is needed, modifies |
| the JSON file and returns a `Roll` object. |
| |
| Args: |
| checkout: The checkout context. |
| pkg: A protobuf message defining the package to update, including |
| its name, spec, JSON file path, ref, tag, etc. |
| |
| Returns: |
| A list containing a `Roll` object if the package was updated, |
| or an empty list if no update was needed. |
| |
| Raises: |
| StepFailure: If the specified package cannot be found in the JSON |
| file. |
| """ |
| if not pkg.name: |
| pkg.name = self.package_name(pkg.spec) |
| |
| with self.m.step.nest(pkg.name): |
| json_path = checkout.root.joinpath(pkg.json_path) |
| |
| basename = self.m.path.basename(json_path) |
| cipd_json = self.m.file.read_json(f'read {basename}', json_path) |
| packages = cipd_json |
| if isinstance(cipd_json, dict): |
| packages = cipd_json['packages'] |
| old_version = None |
| package = None |
| for package in packages: |
| if package['path'] == pkg.spec: |
| old_version = package['tags'][0] |
| break |
| else: |
| raise self.m.step.StepFailure( |
| f"couldn't find package {pkg.spec} in {json_path}" |
| ) |
| |
| assert package.get('platforms'), 'platforms empty in json' |
| platforms = package.get('platforms') |
| base, name = pkg.spec.rstrip('/').rsplit('/', 1) |
| |
| new_version = self.select_version( |
| spec=pkg.spec, |
| ref=pkg.ref, |
| tag=pkg.tag, |
| allow_mismatched_refs=pkg.allow_mismatched_refs, |
| platforms=platforms, |
| old_version=old_version, |
| ) |
| if not new_version: |
| return [] |
| package['tags'] = [new_version] |
| |
| self.m.file.write_text( |
| f'write {basename}', |
| json_path, |
| json.dumps( |
| cipd_json, |
| indent=2, |
| separators=(',', ': '), |
| ) |
| + '\n', |
| ) |
| return [ |
| Roll( |
| package_name=pkg.name, |
| package_spec=pkg.spec, |
| old_version=old_version, |
| new_version=new_version, |
| ), |
| ] |