blob: 7c49f4491a29555fcfdd3dfcf48084621fd266ba [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Utility functions for uploading a cipd package."""
from __future__ import annotations
import collections
import dataclasses
import json
import re
from typing import Sequence, TYPE_CHECKING
from PB.recipe_modules.fuchsia.cipd_util.upload_manifest import (
CIPDUploadManifest,
)
from recipe_engine import recipe_api
if TYPE_CHECKING: # pragma: no cover
from recipe_engine import config_types
from RECIPE_MODULES.pigweed.checkout import api as checkout_api
@dataclasses.dataclass(order=True)
class Roll:
package_name: str
old_version: str
new_version: str
def message(self):
return f'From {self.old_version}\nTo {self.new_version}'
@dataclasses.dataclass
class Commit:
rolls: List[Roll] = dataclasses.field(default_factory=list)
def message(self, name: str | None = None):
rolls = sorted(self.rolls)
if not name:
name = ", ".join(x.package_name for x in rolls)
result = []
result.append(f'roll: {name}')
if len(rolls) == 1:
result.append('')
result.append(rolls[0].message())
else:
for roll in rolls:
result.append('')
result.append(roll.package_name)
result.append(roll.message())
result.append('')
return '\n'.join(result)
def __bool__(self):
return bool(self.rolls)
class CipdRollApi(recipe_api.RecipeApi):
"""Utility functions for uploading a cipd package."""
Roll = Roll
Commit = Commit
def is_platform(self, part):
"""Return true for platform-style strings.
Example matches: "linux-amd64", "${platform}", "${os}-amd64", "cp38".
Example non-matches: "clang", "amd64", "linux".
"""
if '{' in part:
return True
# Match Python version indicators.
if re.match(r'cp\d+', part):
return True
try:
os, arch = part.split('-')
return os in ('linux', 'mac', 'windows')
except ValueError:
return False
def find_shared_tags(self, package_tags, tag):
"""Attempts to find a tag shared by all packages.
This function can be used if the intersection of the sets of tags
associated with different-platform packages with the same 'ref' is
empty. It finds a tag shared by all packages, with as many of them as
possible matching 'ref'.
"""
# Find the most common tags. We use the sorted dict keys for
# determinism.
package_paths = sorted(package_tags.keys())
counter = collections.Counter()
for path in package_paths:
counter.update(package_tags[path])
most_common_tags = counter.most_common()
with self.m.step.nest("find shared tag"):
for tag_candidate, _ in most_common_tags:
# There is at least one package for which the version with the
# specified 'ref' does not have this tag. See if there exists a
# version of this package that *does* have this tag. If so, use
# that version.
updated_tags = dict()
for package_path in package_paths:
if tag_candidate in package_tags[package_path]:
# For this package we already have a version with this
# tag, nothing to do.
continue
try:
package_data = self.m.cipd.describe(
package_path, tag_candidate
)
except self.m.step.StepFailure:
# No luck: there exists no version with this tag.
break
updated_tags[package_path] = set(
x.tag
for x in package_data.tags
if x.tag.startswith(tag + ':')
)
else:
# We found a version of each package with the tag_candidate.
merged_tags = dict()
merged_tags.update(package_tags)
merged_tags.update(updated_tags)
tags = set.intersection(*merged_tags.values())
# Should always succeed.
assert len(tags) > 0
# Update package_tags to be consistent with the returned
# tags.
package_tags.update(updated_tags)
return tags
# We failed to find any tag that meets our criteria.
return set()
def process_package(self, checkout_root, pkg):
json_path = checkout_root.joinpath(*re.split(r'[\\/]+', pkg.json_path))
if not pkg.name:
# Turn foo/bar/baz/${platform} and foo/bar/baz/${os=mac}-${arch}
# into 'baz'.
pkg.name = [
part
for part in pkg.spec.split('/')
if not self.is_platform(part)
][-1]
basename = self.m.path.basename(json_path)
cipd_json = self.m.file.read_json(f'read {basename}', json_path)
packages = cipd_json
if isinstance(cipd_json, dict):
packages = cipd_json['packages']
old_version = None
package = None
for package in packages:
if package['path'] == pkg.spec:
old_version = package['tags'][0]
break
else:
raise self.m.step.StepFailure(
f"couldn't find package {pkg.spec} in {json_path}"
)
assert package.get('platforms'), 'platforms empty in json'
platforms = package.get('platforms')
base, name = pkg.spec.rstrip('/').rsplit('/', 1)
if self.is_platform(name):
package_paths = [f'{base}/{x}' for x in platforms]
else:
package_paths = [pkg.spec]
package_tags = {}
tags = None
for package_path in package_paths:
try:
package_data = self.m.cipd.describe(package_path, pkg.ref)
except self.m.step.StepFailure:
# If we got here this package doesn't have the correct ref. This
# is likely because it's a new platform for an existing package.
# In that case ignore this platform when checking that refs
# agree on package versions. We still need at least one platform
# to have the ref or the checks below will fail.
pass
else:
package_tags[package_path] = set(
x.tag
for x in package_data.tags
if x.tag.startswith(pkg.tag + ':')
)
if tags is None:
tags = set(package_tags[package_path])
else:
tags.intersection_update(package_tags[package_path])
if not tags and pkg.allow_mismatched_refs:
# The package with the requested ref has non-overlapping tag values
# for different platforms. Try relaxing the requirement that all
# packages come from the same ref, and see if this allows us to find
# a set with shared tag values.
tags = self.find_shared_tags(package_tags, pkg.tag)
with self.m.step.nest('common tags') as presentation:
presentation.step_summary_text = '\n'.join(sorted(tags))
if not tags:
err_lines = [f'no common tags across "{pkg.ref}" refs of packages']
for package_path, package_tags in sorted(package_tags.items()):
err_lines.append('')
err_lines.append(package_path)
for tag in package_tags:
err_lines.append(tag)
raise self.m.step.StepFailure('<br>'.join(err_lines))
# Deterministically pick one of the common tags.
new_version = sorted(tags)[0]
package['tags'] = [new_version]
version_part = new_version.split(':', 1)[1]
match = re.search(
r'(?:\d|\b)(rc|pre|beta|alpha)(?:\d|\b)',
version_part,
)
if match:
raise self.m.step.StepFailure(
f'found pre-release indicator {match.group(1)!r} in '
f'{version_part!r}'
)
# Verify there's only one instance of each platform package with this
# tag.
with self.m.step.nest('check number of instances'):
for package_path in package_paths:
self.m.cipd.describe(package_path, new_version)
with self.m.step.nest('new_version') as presentation:
presentation.step_summary_text = new_version
if old_version in tags:
with self.m.step.nest('already up-to-date') as presentation:
presentation.step_summary_text = (
'current version {} in common tags'
).format(old_version)
return None
else:
self.m.file.write_text(
f'write {basename}',
json_path,
json.dumps(cipd_json, indent=2, separators=(',', ': ')) + '\n',
)
return Roll(
package_name=pkg.name,
old_version=old_version,
new_version=new_version,
)