blob: a4fc66f74657d873edfe1efed5e769cce5bbc5be [file] [log] [blame] [edit]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Recipe module for managing and rolling CIPD package versions.
This module provides the `CipdRollApi` for automating the update of CIPD
package versions, typically defined in a JSON manifest file (e.g.,
`pw_env_setup/py/pw_env_setup/cipd_setup/pigweed.json`).
Key functionalities include:
- Reading CIPD package definitions from a JSON file.
- Determining the latest available version for a package based on a specified
ref (e.g., 'latest') and tag (e.g., 'git_revision').
- Handling platform-specific package variants and finding common versions
across all specified platforms.
- Optionally allowing for mismatched refs across platforms if a common tag
can still be identified.
- Verifying that new versions are not pre-releases and represent a forward roll.
- Updating the version tag in the JSON manifest file.
- Generating `Roll` objects for standardized commit messages.
"""
from __future__ import annotations
import collections
import dataclasses
import datetime
import json
import re
from collections.abc import Sequence
from typing import TYPE_CHECKING
from recipe_engine import recipe_api
from PB.recipe_modules.pigweed.cipd_roll.package import Package
from RECIPE_MODULES.fuchsia.roll_commit_message import (
api as roll_commit_message_api,
)
if TYPE_CHECKING: # pragma: no cover
from RECIPE_MODULES.pigweed.checkout import api as checkout_api
@dataclasses.dataclass(order=True)
class Roll(roll_commit_message_api.BaseRoll):
"""Represents a roll of a CIPD package version.
Attributes:
package_name: The human-readable name of the CIPD package.
package_spec: The CIPD package specification string (e.g.,
'foo/bar/${platform}').
old_version: The previous version tag (e.g., 'git_revision:abcdef').
new_version: The new version tag the package is being rolled to.
"""
package_name: str
package_spec: str
old_version: str
new_version: str
def short_name(self) -> str:
"""Returns a short name for the roll, typically the package name."""
return self.package_name
def message_header(self, force_summary_version: bool = False) -> str:
"""Generates the header for the commit message.
Args:
force_summary_version: If True, a more concise header is generated.
Returns:
The commit message header string.
"""
del force_summary_version # Unused.
return self.package_name
def message_body(
self,
*,
force_summary_version: bool = False,
escape_tags: Sequence[str] = (),
filter_tags: Sequence[str] = (),
) -> str:
"""Generates the body of the commit message.
Args:
force_summary_version: If True, a more concise body is generated.
escape_tags: A sequence of tags to escape in the message.
filter_tags: A sequence of tags to filter from the message.
Returns:
The commit message body string, indicating the old and new versions.
"""
del force_summary_version, escape_tags, filter_tags # Unused.
return f'From {self.old_version}\nTo {self.new_version}'
def message_footer(self, *, send_comment: bool) -> str:
"""Generates the footer for the commit message.
Args:
send_comment: If True, indicates a comment should be sent.
Returns:
An empty string, as CIPD rolls typically don't have footers.
"""
del send_comment # Unused.
return ''
def output_property(self) -> dict[str, str]:
"""Generates a dictionary of properties representing the roll.
Returns:
A dictionary containing details of the roll, such as package name,
spec, old version, and new version.
"""
return {
'name': self.package_name,
'spec': self.package_spec,
'old': self.old_version,
'new': self.new_version,
}
class CipdRollApi(recipe_api.RecipeApi):
"""API for managing and rolling CIPD package versions in a JSON file."""
Roll = Roll
def is_platform(self, part: str) -> bool:
"""Return true if a string segment looks like a CIPD platform specifier.
Helps distinguish parts of a package path that define the platform
(e.g., "linux-amd64", "${platform}") from actual package name parts.
Args:
part: A segment of a CIPD package path.
Returns:
True if the part appears to be a platform specifier, False
otherwise.
"""
if '{' in part:
return True
try:
os, arch = part.split('-')
return os in ('linux', 'mac', 'windows')
except ValueError:
return False
def package_name(self, path: str) -> str:
"""Extracts a likely package name from a CIPD package path.
For example, 'foo/bar/${platform}' or 'foo/bar/${os=mac}-${arch}'
would result in 'bar'.
Args:
path: The CIPD package path string.
Returns:
The extracted package name.
"""
parts = path.split('/')
if self.is_platform(parts[-1]):
parts.pop()
return parts[-1]
def find_shared_tags(
self,
package_tags: dict[str, set[str]],
tag: str,
) -> set[str]:
"""Attempts to find a version tag shared by all platform-specific packages.
This is used when `allow_mismatched_refs` is true and the initial
`cipd describe` calls (based on a common `ref`) yield packages with
no common version tag (e.g., `git_revision:xxx`).
The goal is to find a `tag_candidate` (e.g., `git_revision:yyy`) such
that:
1. For *some* platform, the package instance at the desired `ref` has
`tag_candidate`.
2. A package instance with `tag_candidate` exists for *all* platforms.
Args:
package_tags: A dictionary mapping platform-specific package paths
to a set of their version tags (e.g., `{'pkg/linux-amd64':
{'git_rev:1'}, 'pkg/mac-amd64': {'git_rev:2'}}`). These are the
tags from packages resolved via the `ref`.
tag: The base tag key (e.g., "git_revision").
Returns:
A set of shared version tags if successful, or an empty set if no
such common tag can be found.
"""
# Find the most common tags. We use the sorted dict keys for
# determinism.
package_paths = sorted(package_tags.keys())
counter = collections.Counter()
for path in package_paths:
counter.update(package_tags[path])
most_common_tags = counter.most_common()
with self.m.step.nest('find shared tag'):
for tag_candidate, _ in most_common_tags:
# There is at least one package for which the version with the
# specified 'ref' does not have this tag. See if there exists a
# version of this package that *does* have this tag. If so, use
# that version.
updated_tags = dict()
for package_path in package_paths:
if tag_candidate in package_tags[package_path]:
# For this package we already have a version with this
# tag, nothing to do.
continue
try:
package_data = self.m.cipd.describe(
package_path, tag_candidate
)
except self.m.step.StepFailure:
# No luck: there exists no version with this tag.
break
updated_tags[package_path] = set(
x.tag
for x in package_data.tags
if x.tag.startswith(tag + ':')
)
else:
# We found a version of each package with the tag_candidate.
merged_tags = dict()
merged_tags.update(package_tags)
merged_tags.update(updated_tags)
tags = set.intersection(*merged_tags.values())
# Should always succeed.
assert len(tags) > 0
# Update package_tags to be consistent with the returned
# tags.
package_tags.update(updated_tags)
return tags
# We failed to find any tag that meets our criteria.
return set()
def select_version(
self,
spec: str,
ref: str,
tag: str,
allow_mismatched_refs: bool,
platforms: Sequence[str],
old_version: str | None,
) -> str | None:
"""Selects the new version tag for a CIPD package roll.
This function determines the appropriate new version tag for a package
based on its specification, desired ref, tag key, and existing
platforms. It handles cases with platform-specific packages, checks for
common tags, and optionally allows for mismatched refs across
platforms. It also verifies that the new version is not a pre-release
and is newer than the old version.
Args:
spec: The CIPD package specification (e.g.,
'foo/bar/${platform}').
ref: The CIPD ref to query (e.g., 'latest', 'git_revision:abcdef').
tag: The tag key to use for versioning (e.g., 'git_revision',
'version').
allow_mismatched_refs: If True, allows finding a common tag even if
not all platforms share the exact same ref initially.
platforms: A list of platform strings (e.g., ['linux-amd64',
'mac-amd64']).
old_version: The current version tag of the package, if known.
Returns:
The selected new version tag as a string, or None if the package is
already up-to-date or if a roll would be backwards.
Raises:
StepFailure: If no common tags can be found across platforms, or if
a pre-release version is detected.
"""
base, name = spec.rstrip('/').rsplit('/', 1)
package_paths: list[str]
if self.is_platform(name):
package_paths = [f'{base}/{x}' for x in platforms]
else:
package_paths = [spec]
package_tags = {}
package_data = {}
tags = None
for package_path in package_paths:
try:
package_data[package_path] = self.m.cipd.describe(
package_path,
ref,
)
except self.m.step.StepFailure:
# If we got here this package doesn't have the correct ref.
# This is likely because it's a new platform for an existing
# package. In that case ignore this platform when checking
# that refs agree on package versions. We still need at
# least one platform to have the ref or the checks below
# will fail.
pass
else:
package_tags[package_path] = set(
x.tag
for x in package_data[package_path].tags
if x.tag.startswith(tag + ':')
)
if tags is None:
tags = set(package_tags[package_path])
else:
tags.intersection_update(package_tags[package_path])
if not tags and allow_mismatched_refs:
# The package with the requested ref has non-overlapping tag
# values for different platforms. Try relaxing the requirement
# that all packages come from the same ref, and see if this
# allows us to find a set with shared tag values.
tags = self.find_shared_tags(package_tags, tag)
with self.m.step.nest('common tags') as presentation:
presentation.step_summary_text = '\n'.join(sorted(tags))
if not tags:
err_lines = [f'no common tags across "{ref}" refs of packages']
for pkg_path, pkg_tags in sorted(package_tags.items()):
err_lines.append('')
err_lines.append(
f'[{pkg_path}](http://chrome-infra-packages.appspot.com'
f'/p/{pkg_path})'
)
for tag in pkg_tags:
err_lines.append(tag)
raise self.m.step.StepFailure('<br>'.join(err_lines))
# Deterministically pick one of the common tags.
new_version = sorted(tags)[0]
version_part = new_version.split(':', 1)[1]
match = re.search(
r'(?:\d|\b)(rc|pre|beta|alpha)(?:\d|\b)',
version_part,
)
if match:
raise self.m.step.StepFailure(
f'found pre-release indicator {match.group(1)!r} in '
f'{version_part!r}'
)
# Verify there's only one instance of each platform package with
# this tag.
with self.m.step.nest('check number of instances'):
for package_path in package_paths:
self.m.cipd.describe(package_path, new_version)
with self.m.step.nest('new version') as presentation:
presentation.step_summary_text = new_version
presentation.logs['old version'] = old_version
if old_version and ':' in old_version:
with self.m.step.nest('checking against old version'):
if old_version in tags:
pres = self.m.step.empty('already up-to-date').presentation
pres.step_summary_text = (
f'current version {old_version} in common tags'
)
return None
for package_path in package_paths:
with self.m.step.nest(package_path):
old_package_data = self.m.cipd.describe(
package_path,
old_version,
)
old_ts = datetime.datetime.fromtimestamp(
old_package_data.registered_ts,
)
new_ts = datetime.datetime.fromtimestamp(
package_data[package_path].registered_ts,
)
if old_ts > new_ts:
pres = self.m.step.empty(
'new package is older than current package',
).presentation
pres.step_summary_text = (
f'* current: {old_ts.isoformat()}\n'
f'* new: {new_ts.isoformat()}\n'
)
return None
# Verify that the specified packages at this version were uploaded by a
# service account, raising a StepFailure exception otherwise.
self.m.cipd_util.check_uploader(package_paths, new_version)
return new_version
def update_package(
self,
checkout: checkout_api.CheckoutContext,
pkg: Package,
) -> list[Roll]:
"""Updates a CIPD package definition in a JSON file to a new version.
This method reads a JSON file (typically `pigweed.json`) that defines
CIPD packages. It finds the specified package, determines the new
version using `select_version`, and if an update is needed, modifies
the JSON file and returns a `Roll` object.
Args:
checkout: The checkout context.
pkg: A protobuf message defining the package to update, including
its name, spec, JSON file path, ref, tag, etc.
Returns:
A list containing a `Roll` object if the package was updated,
or an empty list if no update was needed.
Raises:
StepFailure: If the specified package cannot be found in the JSON
file.
"""
if not pkg.name:
pkg.name = self.package_name(pkg.spec)
with self.m.step.nest(pkg.name):
json_path = checkout.root.joinpath(pkg.json_path)
basename = self.m.path.basename(json_path)
cipd_json = self.m.file.read_json(f'read {basename}', json_path)
packages = cipd_json
if isinstance(cipd_json, dict):
packages = cipd_json['packages']
old_version = None
package = None
for package in packages:
if package['path'] == pkg.spec:
old_version = package['tags'][0]
break
else:
raise self.m.step.StepFailure(
f"couldn't find package {pkg.spec} in {json_path}"
)
assert package.get('platforms'), 'platforms empty in json'
platforms = package.get('platforms')
base, name = pkg.spec.rstrip('/').rsplit('/', 1)
new_version = self.select_version(
spec=pkg.spec,
ref=pkg.ref,
tag=pkg.tag,
allow_mismatched_refs=pkg.allow_mismatched_refs,
platforms=platforms,
old_version=old_version,
)
if not new_version:
return []
package['tags'] = [new_version]
self.m.file.write_text(
f'write {basename}',
json_path,
json.dumps(
cipd_json,
indent=2,
separators=(',', ': '),
)
+ '\n',
)
return [
Roll(
package_name=pkg.name,
package_spec=pkg.spec,
old_version=old_version,
new_version=new_version,
),
]