blob: a71a8942c0ff019a992d56c06fdccc35276b0afc [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.
Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""
from __future__ import annotations
import collections
import contextlib
import re
from typing import TYPE_CHECKING
import urllib
import xml.etree.ElementTree
import attrs
from PB.go.chromium.org.luci.buildbucket.proto import (
build as build_pb2,
common as common_pb2,
)
from PB.go.chromium.org.luci.scheduler.api.scheduler.v1 import (
triggers as triggers_pb2,
)
from PB.recipe_modules.pigweed.checkout.options import Options
from recipe_engine import recipe_api
if TYPE_CHECKING: # pragma: no cover
from typing import Any, Sequence
from recipe_engine import config_types
PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'
def to_dict(obj) -> dict[str, Any]:
try:
# Modifications to the dict returned by the built-in vars() function
# modify the original data structure. Always create a copy for this
# function to return.
return __builtins__['vars'](obj).copy()
except TypeError:
keys = [x for x in obj.__slots__ if not x.startswith('__')]
return {k: getattr(obj, k) for k in keys}
@attrs.define
class Manifest:
remotes: dict[str, 'Remote'] = attrs.Factory(dict)
projects: list['Project'] = attrs.Factory(list)
def dict(self) -> dict[str, Any]:
return {
'remotes': {k: v.dict() for k, v in self.remotes.items()},
'projects': [x.dict() for x in self.projects],
}
class Url:
def __init__(self, url: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url: str = url
self.https: str | None = None
def dict(self) -> dict[str, Any]:
return to_dict(self)
@attrs.define
class Remote:
"""Remote config from manifest."""
name: str
fetch: Url
review: str | None = None
revision: str | None = None
alias: str | None = None
def dict(self) -> dict[str, Any]:
res = to_dict(self)
res['fetch'] = res['fetch'].dict()
return res
@attrs.define
class Project:
"""Key variables describing a repository/project."""
name: str
path: str
remote: str
revision: str
upstream: str
url: str | None = None
def path_object(self, root: config_types.Path) -> config_types.Path:
return root / self.path
def dict(self) -> dict[str, Any]:
return to_dict(self)
def _str_or_none(x: Any | None) -> str | None:
if x is None:
return x
return str(x)
def _int_or_none(x: Any | None) -> int | None:
if x is None:
return x
return int(x)
@attrs.define
class Change:
"""Data from buildbucket."""
number: int = attrs.field(converter=int)
remote: str | None = attrs.field(converter=_str_or_none)
ref: str | None = attrs.field(converter=_str_or_none)
rebase: bool | None = None
project: str | None = None
branch: str | None = attrs.field(converter=_str_or_none, default=None)
gerrit_name: str | None = attrs.field(converter=_str_or_none, default=None)
submitted: bool = False
patchset: int | None = attrs.field(converter=_int_or_none, default=None)
applied: bool = attrs.field(default=False, repr=False)
path: str | None = None
base: str | None = attrs.field(converter=_str_or_none, default=None)
base_type: str | None = attrs.field(converter=_str_or_none, default=None)
is_merge: bool = attrs.field(default=False)
commit_message: str = attrs.field(default='')
topic: str | None = None
current_revision: str | None = None
@property
def gerrit_host(self) -> str:
return f'https://{self.gerrit_name}-review.googlesource.com'
@property
def gerrit_url(self) -> str:
if not self.number:
return self.gitiles_url
return f'{self.gerrit_host}/c/{self.number}'
@property
def gitiles_url(self) -> str:
return f'{self.remote}/+/{self.ref}'
@property
def name(self) -> str:
return f'{self.gerrit_name}:{self.number}'
@property
def name_with_path(self) -> str:
return f'{self.name} ({self.path})'
@attrs.define
class Submodule:
"""Submodule properties."""
api: recipe_api.RecipeApi = attrs.field(repr=False)
hash: str
relative_path: str
path: config_types.Path
name: str
describe: str
remote: str
initialized: bool
modified: bool
conflict: bool
branch: str
url: str
update: str
ignore: str
shallow: bool
fetchRecurseSubmodules: bool
describe: str
def __lt__(self, other: 'Submodule') -> bool:
return (self.relative_path, self.url) < (other.relative_path, other.url)
@attrs.define
class StatusOfChanges:
"""Changes that were applied or not applied."""
applied: tuple[Change, ...]
not_applied: tuple[Change, ...]
@attrs.define(slots=False)
class CheckoutContext:
_api: recipe_api.RecipeApi = attrs.field(repr=False)
options: Options | None = None
changes: list[Change] | None = None # List of triggering changes.
top: config_types.Path = None # Actual checkout root.
# Logical checkout root. Usually identical to 'top', but occasionally a
# subdirectory instead.
root: config_types.Path = None
# Which triggering changes were applied or not applied.
status: StatusOfChanges | None = None
# Remotes that should be treated identically.
equivalent_remotes: dict[str, list[str]] | None = attrs.field(factory=dict)
manifest: Manifest | None = None # Parsed repo manifest.
# Path to a JSON file containing metadata about the triggering changes.
changes_json: config_types.Path | None = None
bazel_overrides: dict[str, config_types.Path] = attrs.field(factory=dict)
# Current revision number.
def revision(self) -> str:
if hasattr(self, '_revision'):
return self._revision
self._revision = self._api.checkout.get_revision(self.root)
return self._revision
@property
def manifest_path(self) -> config_types.Path:
return self.root / self.options.manifest_file
def applied_changes(self) -> list[Change]:
return [x for x in self.changes if x.applied]
# Repo manifest with all projects pinned.
def manifest_snapshot(self):
if not self.options.use_repo:
return None
if hasattr(self, '_manifest_snapshot'):
return self._manifest_snapshot
with self._api.context(cwd=self.top):
self._manifest_snapshot = self._api.repo.manifest_snapshot()
return self._manifest_snapshot
# Equivalent of manifest_snapshot() but not as strictly formatted.
def submodule_snapshot(self):
if self.options.use_repo:
return None
if hasattr(self, '_submodule_snapshot'):
return self._submodule_snapshot
with self._api.context(cwd=self.root):
# To get step_test_data line to pass pylint.
raw_io_stream_output = self._api.raw_io.test_api.stream_output_text
self._submodule_snapshot = (
self._api.git(
'submodule-status',
'submodule',
'status',
'--recursive',
stdout=self._api.raw_io.output_text(),
step_test_data=lambda: raw_io_stream_output(
'submodule status filler text',
),
ok_ret='any',
).stdout.strip()
or ''
)
return self._submodule_snapshot
def snapshot_to_dir(self, directory: config_types.Path) -> None:
self._api.file.ensure_directory('mkdir', directory)
if self.manifest_snapshot():
self._api.file.write_text(
'write manifest.xml',
directory / 'manifest.xml',
self.manifest_snapshot(),
)
if self.submodule_snapshot():
self._api.file.write_text(
'write submodule snapshot',
directory / 'submodules.log',
self.submodule_snapshot(),
)
with self._api.context(cwd=self.root):
log = self._api.git(
'log',
'log',
'--oneline',
'-n',
'10',
stdout=self._api.raw_io.output_text(),
ok_ret='any',
).stdout
self._api.file.write_text(
'write git log',
directory / 'git.log',
log,
)
def submodules(self, recursive: bool = False) -> list[Submodule]:
"""Return data about all submodules."""
cmd = [
'python3',
self._api.checkout.resource('submodule_status.py'),
self.root,
self._api.json.output(),
]
if recursive:
cmd.append('--recursive')
submodules = []
submodule_status = self._api.step(
'submodule status',
cmd,
step_test_data=lambda: self._api.json.test_api.output({}),
).json.output
for sub in submodule_status.values():
sub['remote'] = self._api.sso.sso_to_https(sub['remote'])
if sub['remote'].endswith('.git'):
sub['remote'] = sub['remote'][:-4]
sub['relative_path'] = sub['path']
sub['path'] = self.root / sub['path']
submodules.append(Submodule(self._api, **sub))
return submodules
_REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')
def gerrit_host(self) -> str | None:
match = self._REMOTE_REGEX.match(self.options.remote)
if not match:
return # pragma: no cover
gerrit_review_host = f"{match.group('host')}"
if '-review' not in gerrit_review_host:
gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
return gerrit_review_host
def gerrit_project(self) -> str | None:
match = self._REMOTE_REGEX.match(self.options.remote)
if not match:
return # pragma: no cover
return match.group('project')
def remotes_equivalent(self, remote1: str, remote2: str) -> bool:
# Sometimes remote1 or remote2 is None. In that case we shouldn't
# convert sso to https.
if remote1:
remote1 = self._api.sso.sso_to_https(remote1).removesuffix('.git')
if remote2:
remote2 = self._api.sso.sso_to_https(remote2).removesuffix('.git')
if remote1 == remote2:
return True
return remote1 in self.equivalent_remotes.get(remote2, ())
class CheckoutApi(recipe_api.RecipeApi):
"""Calls to checkout code."""
Change = Change
CheckoutContext = CheckoutContext
def fake_context(self): # pragma: no cover
ctx = CheckoutContext(api=self.m)
ctx.top = ctx.root = self.m.path.start_dir / 'checkout'
ctx.options = Options(remote=self.test_api.pigweed_repo)
return ctx
def _read_manifest(
self, manifest_remote: str, manifest_file: str
) -> Manifest:
"""Reads manifest file to get git repo locations."""
with self.m.step.nest('read manifest') as read_step:
manifest_text: str = self.m.file.read_text(
'read file', manifest_file
)
read_step.logs['raw'] = manifest_text
xml_tree = xml.etree.ElementTree.fromstring(manifest_text)
manifest = Manifest()
for remote in xml_tree.iter('remote'):
with self.m.step.nest('log') as pres:
pres.step_summary_text = repr(remote.attrib)
remote = Remote(**remote.attrib)
if remote.fetch.startswith('..'):
rest = re.sub(r'^(..)(/..)*', '', remote.fetch)
parsed = urllib.parse.urlparse(manifest_remote)
remote.fetch = f'{parsed.scheme}://{parsed.netloc}' + rest
remote.fetch = Url(remote.fetch)
remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
manifest.remotes[remote.name] = remote
defaults = {}
for default in xml_tree.iter('default'):
defaults.update(default.attrib)
for project in xml_tree.iter('project'):
name = project.attrib['name']
path = project.attrib.get('path', name)
if 'remote' in project.attrib:
remote = project.attrib['remote']
elif 'remote' in defaults:
remote = defaults['remote']
else: # pragma: no cover
assert False, f'remote not specified for {name}'
assert (
remote in manifest.remotes
), f'Remote {remote} does not exist'
if 'revision' in project.attrib:
revision = project.attrib['revision']
elif manifest.remotes[remote].revision:
revision = manifest.remotes[remote].revision
elif 'revision' in defaults:
revision = defaults['revision']
else: # pragma: no cover
assert False, f'revision not specified for {name}'
if 'upstream' in project.attrib:
upstream = project.attrib['upstream']
elif 'upstream' in defaults: # pragma: no cover
# This is unlikely to be used and hard to test--it requires
# a completely separate manifest definition, otherwise the
# 'else' condition won't be covered. It's also simple.
upstream = defaults['upstream']
else:
upstream = revision
# urllib.urljoin does something different than what's desired
# here.
url = '/'.join(
(
manifest.remotes[remote].fetch.https.rstrip('/'),
name.lstrip('/'),
)
)
manifest.projects.append(
Project(
name=name,
path=path,
remote=remote,
revision=revision,
upstream=upstream,
url=url,
)
)
self.m.file.write_json(
'manifest json',
self.m.path.start_dir / 'manifest.json',
manifest.dict(),
)
return manifest
def _process_gerrit_change(
self,
ctx: CheckoutContext,
host: str,
change_id: str | int,
project: str = 'pigweed/pigweed',
patchset: int | None = None,
) -> Change:
"""Process a LUCI GerritChange and return a Change object."""
host = self.m.gerrit.normalize_host(host)
gitiles_host = host.replace(
'-review.googlesource.com', '.googlesource.com'
)
gerrit_name = gitiles_host.split('.')[0]
details = self.m.gerrit.change_details(
'details',
change_id=str(change_id),
host=host,
max_attempts=5,
query_params=[
'CURRENT_COMMIT',
'CURRENT_REVISION',
],
timeout=30,
test_data=self.m.json.test_api.output(
{
'branch': 'main',
'current_revision': 'f' * 40,
'revisions': {
'f'
* 40: {
'_number': 3,
'commit': {
'parents': [{}],
'message': '',
},
}
},
'project': project,
}
),
).json.output
branch = details['branch']
remote = f'https://{gitiles_host}/{details["project"]}'.strip('/')
rebase = not ctx.options.force_no_rebase
current_revision = details['revisions'][details['current_revision']]
is_merge = len(current_revision['commit']['parents']) > 1
if is_merge:
rebase = False
if not patchset:
patchset = current_revision['_number']
ref = f'refs/changes/{change_id % 100:02}/{change_id}/{patchset}'
return Change(
number=int(change_id),
patchset=patchset,
remote=remote,
ref=ref,
rebase=rebase,
is_merge=is_merge,
branch=branch,
gerrit_name=gerrit_name,
submitted=False,
commit_message=current_revision['commit']['message'],
project=details['project'],
topic=details.get('topic', None) or None,
current_revision=details['current_revision'],
)
def _process_gerrit_changes(
self,
ctx: CheckoutContext,
bb_input: build_pb2.Build.Input,
) -> None:
seen = set()
for i, change in enumerate(bb_input.gerrit_changes):
with self.m.step.nest(str(i)):
result = self._process_gerrit_change(
ctx=ctx,
host=change.host,
project=change.project,
change_id=change.change,
patchset=change.patchset,
)
yield result
seen.add(result.name)
cq_deps_result = self.m.cq_deps.resolve(
result.gerrit_name,
result.number,
result.topic,
)
for dep in cq_deps_result.resolved:
# dep.name should only appear in seen if there are multiple
# gerrit_changes from buildbucket and a later one depends on an
# earlier one. If buildbucket has multiple gerrit_changes the
# cq_deps module is not needed here, so this is just double-checking
# something that shouldn't happen.
if dep.name in seen: # pragma: no cover
continue
seen.add(dep.name)
yield self._process_gerrit_change(
ctx=ctx,
host=dep.host,
project=dep.project,
change_id=dep.change,
)
for cl in cq_deps_result.unresolved:
yield Change(
number=cl.change,
remote=None,
ref=None,
rebase=None,
project=None,
branch=None,
gerrit_name=cl.gerrit_name,
submitted=False,
)
def _number_details(
self,
host: str,
commit_hash: str,
branch: str = 'main',
) -> dict[str, Any]:
if 'github.com' in host or 'github-review' in host:
return None # pragma: no cover
try:
results = self.m.gerrit.change_query(
'number',
f'commit:{commit_hash}',
host=host,
max_attempts=5,
timeout=30,
test_data=self.m.json.test_api.output(
[
{
'_number': '1234',
'branch': branch,
'project': 'pigweed',
}
]
),
).json.output
# Skip this change if it didn't go through Gerrit.
if results and len(results) == 1:
return results[0]
except self.m.step.StepFailure: # pragma: no cover
pass
return None
def _change_data(
self,
ctx: CheckoutContext,
remote: str = None,
branch: str = None,
) -> tuple[Change, ...]:
bb_input: build_pb2.Build.Input = self.m.buildbucket.build.input
results: list[Change] = []
triggers: dict[str, dict[str, triggers_pb2.Trigger]] = (
collections.defaultdict(dict)
)
for trigger in self.m.scheduler.triggers:
gitiles: triggers_pb2.GitilesTrigger = trigger.gitiles
if gitiles:
triggers[gitiles.repo][gitiles.revision] = trigger
with self.m.step.nest('change data'):
if bb_input.gerrit_changes:
with self.m.step.nest('process gerrit changes'):
results.extend(self._process_gerrit_changes(ctx, bb_input))
elif bb_input.gitiles_commit.id:
with self.m.step.nest('process gitiles commit'):
commit: common_pb2.GitilesCommit = bb_input.gitiles_commit
assert commit.host
if commit.project:
remote: str = f'https://{commit.host}/{commit.project}'
host: str = commit.host.replace(
'.googlesource.com', '-review.googlesource.com'
)
gerrit_name: str = commit.host.split('.')[0]
result: dict[str, Any] = self._number_details(
host, commit.id
)
if result:
branch: str = result['branch']
if commit.id in triggers[remote]:
branch = triggers[remote][commit.id].gitiles.ref
branch = branch.removeprefix('refs/heads/')
results.append(
Change(
number=result['_number'],
remote=remote,
ref=commit.id,
rebase=False,
branch=branch,
gerrit_name=gerrit_name,
submitted=True,
project=result['project'],
current_revision=commit.id,
)
)
if not results:
# If not triggered by a gitiles_poller gitiles_commit may be
# empty. In that case treat the most recent commit on the
# remote as the triggering commit. This is a good assumption
# except for Android Repo Tool projects, unless all projects
# are pinned to commits instead of tracking branches. However,
# even if this is wrong it's close enough to have utility.
head: str = self.m.git.get_remote_branch_head(remote, branch)
gerrit_name: str = urllib.parse.urlparse(remote).netloc.split(
'.'
)[0]
host: str = f'{gerrit_name}-review.googlesource.com'
result: dict[str, Any] = self._number_details(host, head)
results.append(
Change(
number=result['_number'] if result else 0,
remote=remote,
ref=head,
rebase=False,
branch=result['branch'] if result else branch,
gerrit_name=gerrit_name,
project=None,
submitted=True,
)
)
with self.m.step.nest('changes'):
for result in results:
with self.m.step.nest(result.name) as change_data_pres:
change_data_pres.step_summary_text = repr(result)
return tuple(results)
def _matching_branches(
self,
repo: str,
branches: Sequence[str],
name: str = 'has branch',
**kwargs,
):
"""Returns the subset of the given branches that exist on gitiles."""
matches: set[str] = set()
with self.m.step.nest(name), self.m.context(infra_steps=True):
for branch in branches:
head: str = self.m.git.get_remote_branch_head(
repo,
branch,
step_name=f'git ls-remote {branch}',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
''
),
**kwargs,
)
with self.m.step.nest('head') as pres:
pres.step_summary_text = repr(head)
if head:
matches.add(branch)
return sorted(matches)
def _apply_change(
self,
ctx: CheckoutContext,
change: Change,
cwd: config_types.Path = None,
):
"""Applies the given change to the given directory.
Args:
ctx: Checkout context object.
change: Change to apply.
cwd: Working directory, defaults to current directory.
"""
with self._apply_change_context(ctx=ctx, change=change, cwd=cwd):
pass
@contextlib.contextmanager
def _apply_change_context(
self,
ctx: CheckoutContext,
change: Change,
cwd: config_types.Path = None,
):
"""Applies the given change to the given directory.
Args:
ctx: Checkout context object.
change: Change to apply.
cwd: Working directory, defaults to current directory.
"""
kwargs: dict[str, Any] = {'cwd': cwd} if cwd else {}
change.applied = True
change.path = self.m.path.relpath(cwd or ctx.root, ctx.root)
try:
apply_step: str = f'apply {change.name}'
with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
if cwd:
pres.step_summary_text = str(
self.m.path.relpath(cwd, ctx.root)
)
with self.m.context(infra_steps=True):
# 'git fetch' fails if a submodule pin in the patch isn't
# present in the remote (for example, if the pin is only
# present in the uploader's workspace). Use
# '--no-recurse-submodules' here so 'git fetch' doesn't fail
# but instead 'git rebase' or 'git submodule update' fails
# later (important because those are not infra steps). Also
# don't use '--recurse-submodules' in 'git checkout' for
# similar reasons.
with self.m.default_timeout():
self.m.git.fetch(
change.remote,
change.ref,
recurse_submodules=False,
step_name='git fetch patch',
)
self.m.git(
'git checkout patch',
'checkout',
'--force',
'-b',
'working',
'FETCH_HEAD',
)
# These remain unused if change.submitted is False.
remote: str | None = None
remote_branch: str | None = None
with self.m.context(infra_steps=True):
# Change "https://foo.googlesource.com/bar"
# to "https___foo_googlesource_com_bar".
# In Android Repo Tool projects, the remote for the manifest
# is often configured in a way that seems incorrect. Instead
# of relying on it, create a whole new remote every time
# that is always correct.
remote = re.sub(r'[^\w]', '_', change.remote)
remote_branch = '/'.join((remote, change.branch))
self.m.git(
'git remote add',
'remote',
'add',
remote,
change.remote,
)
with self.m.default_timeout():
self.m.git.fetch(
remote,
f'refs/heads/{change.branch}',
prune=False,
step_name='git fetch branch',
)
self.m.git(
'git set upstream',
'branch',
f'--set-upstream-to={remote_branch}',
)
if not change.submitted:
with self.m.context(infra_steps=True):
self.m.git(
'pre-rebase log', 'log', '--oneline', '-n', '10'
)
if change.submitted:
change.base = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD_' * 8,
),
)
change.base_type = 'submitted_commit_hash'
elif change.rebase:
self.m.git('git rebase', 'rebase', remote_branch)
change.base = self.m.git.rev_parse(
remote_branch,
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'REMOTE_BRANCH_' * 3,
),
)
change.base_type = 'remote_branch_tip'
else:
change.base = self.m.git(
'merge-base',
'merge-base',
'HEAD',
remote_branch,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'MERGEBASE_' * 4,
),
).stdout
change.base_type = 'merge-base'
with self.m.context(infra_steps=True):
self.m.git(
'post-rebase log', 'log', '--oneline', '-n', '10'
)
# In most cases this is redundant or unnecessary, but it
# shouldn't cause problems. It's necessary when a superproject
# CL is updating a submodule pin and we need to sync the
# submodule to the new revision.
with self.m.default_timeout():
# See b/243673776 for why we detach before updating
# submodules.
self.m.git('detach', 'checkout', '--detach')
self.m.git.submodule_update(
recursive=True,
timeout=ctx.options.submodule_timeout_sec,
)
self.m.git('reattach', 'checkout', '-')
yield
finally:
pass
def _check_unapplied_changes(self, changes: Sequence[Change]):
applied: list[Change] = []
failed_to_apply: list[Change] = []
if not changes: # pragma: no cover
return None
def handle_unapplied_change(change):
with self.m.step.nest(f'failed to apply {change.name}') as pres:
pres.status = 'WARNING'
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
failed_to_apply.append(change)
with self.m.context(infra_steps=True):
if all(not x.applied for x in changes):
with self.m.step.nest('no changes were applied') as pres:
pres.status = 'FAILURE'
for change in changes:
handle_unapplied_change(change)
pres.properties['changes'] = [x.name for x in changes]
raise self.m.step.InfraFailure(
'could not find triggering changes in checkout'
)
elif any(not x.applied for x in changes):
with self.m.step.nest('some changes were not applied') as pres:
pres.status = 'WARNING'
for change in changes:
if change.applied:
applied.append(change)
else:
handle_unapplied_change(change)
else:
applied.extend(changes)
with self.m.step.nest('status') as pres:
pres.step_summary_text = (
f'applied {applied}\nnot applied {failed_to_apply}'
)
return StatusOfChanges(
applied=tuple(applied),
not_applied=tuple(failed_to_apply),
)
def _cached_checkout(
self,
remote: str,
path: config_types.Path,
ref: str,
submodules: bool,
included_submodules: Sequence[str] | None = None,
excluded_submodules: Sequence[str] | None = None,
submodule_timeout_sec: int = 10 * 60,
cache: bool = True,
use_packfiles: bool = True,
**kwargs,
):
submodule_paths = included_submodules = included_submodules or []
if cache:
with self.m.step.nest('cache') as pres, self.m.cache.guard('git'):
parsed_remote = urllib.parse.urlparse(remote)
cache_name = (
parsed_remote.hostname
+ parsed_remote.path.replace('-', '--').replace('/', '-')
)
cache_path = self.m.path.cache_dir / 'git' / cache_name
self.m.file.ensure_directory('makedirs', cache_path)
with self.m.context(cwd=cache_path):
dotgit = cache_path / '.git'
if self.m.path.exists(dotgit): # pragma: no cover
self.m.git.config_remove_section(
'remote.origin', **kwargs
)
pres.step_summary_text = 'hit'
else:
self.m.git.init(bare=False, **kwargs)
pres.step_summary_text = 'miss'
self.m.git.config(
'remote.origin.url',
remote,
step_name='remote set-url',
**kwargs,
)
if use_packfiles:
self.m.git.config(
'fetch.uriprotocols',
'https',
step_name='set fetch.uriprotocols',
**kwargs,
)
with self.m.default_timeout():
try:
self.m.git.fetch(
repository='origin',
prune=True,
tags=True,
recurse_submodules=submodules,
**kwargs,
)
# If the checkout failed save the git config. It might
# not be helpful, but it shouldn't hurt.
except self.m.step.StepFailure as exc:
with self.m.step.nest('git config'):
self.m.git.config(
'--list', '--local', step_name='local'
)
self.m.git.config(
'--list', '--global', step_name='global'
)
raise
self.m.git.merge(ref='FETCH_HEAD', **kwargs)
if included_submodules and excluded_submodules:
raise self.m.step.InfraFailure(
'cannot specify both included_submodules and '
'excluded_submodules'
)
submodule_paths = included_submodules
if excluded_submodules:
submodule_status = self.m.git(
'submodule status',
'submodule',
'status',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
'-0000000000000000000000000000000000000000 pigweed (abc123)\n'
'-1111111111111111111111111111111111111111 nanopb (heads/branch)\n'
),
).stdout.splitlines()
submodule_paths = [
x.split()[1] for x in submodule_status
]
for sub in excluded_submodules:
if sub not in submodule_paths:
raise self.m.step.InfraFailure(
f'excluded submodule {sub} is not a submodule'
)
with self.m.step.nest(f'excluding submodule {sub}'):
pass
submodule_paths.remove(sub)
for sub in submodule_paths:
with self.m.step.nest(f'including submodule {sub}'):
pass
if submodules or submodule_paths:
self.m.git.submodule_sync(recursive=True, **kwargs)
with self.m.default_timeout():
self.m.git.submodule_update(
recursive=True,
force=True,
paths=submodule_paths,
timeout=submodule_timeout_sec,
**kwargs,
)
if not submodules:
# Even though submodules weren't requested, if the cache
# had any active submodules we need to update them.
# Otherwise we'll get weird situations in rolls where an
# uninvolved submodule will be rolled back.
with self.m.default_timeout():
self.m.git.submodule_update(
recursive=True,
force=True,
init=False,
timeout=submodule_timeout_sec,
**kwargs,
)
self.m.file.copytree(
'copy from cache', cache_path, path, symlinks=True
)
# Deliberately not combining contexts into one line so it's obvious to
# both devs and Python which one is "outer" and which is "inner".
with self.m.step.nest('git checkout'):
with self.m.default_timeout():
self.m.git_checkout(
repo=remote,
path=path,
cache=False,
revision=ref,
recursive=submodules,
submodules=submodules,
submodule_force=submodules,
submodule_paths=submodule_paths,
submodule_timeout=submodule_timeout_sec,
step_name="",
use_packfiles=use_packfiles,
)
def _git(self, ctx: CheckoutContext):
"""Checkout code from git."""
super_branch = self._matching_branch(ctx) or ctx.options.branch
with self.m.context(infra_steps=True):
self._cached_checkout(
ctx.options.remote,
path=ctx.root,
ref=super_branch,
cache=not ctx.options.do_not_cache,
submodules=ctx.options.initialize_submodules,
submodule_timeout_sec=ctx.options.submodule_timeout_sec,
included_submodules=ctx.options.included_submodules,
excluded_submodules=ctx.options.excluded_submodules,
use_packfiles=not ctx.options.do_not_use_packfiles,
)
with self.m.context(cwd=ctx.root):
got_revision = None
got_revision_type = 'no_trigger'
submodules = []
if ctx.options.use_trigger:
got_revision = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD' * 10,
),
)
# Check for CLs for the top-level repository.
for change in ctx.changes:
if ctx.remotes_equivalent(
ctx.options.remote, change.remote
):
self._apply_change(ctx, change)
got_revision = change.base
got_revision_type = change.base_type
submodules = ctx.submodules(recursive=True)
# Check for CLs for submodules.
# There are three rough cases:
# 1. Zero submodules have matching remotes. In this case we do
# nothing.
# 2. Exactly one submodule has a matching remote. In this case,
# the change is applied to that submodule, even if the branch
# does not match.
# 3. Multiple submodules have matching remotes.
# 1. Exactly one of them matches the triggering change's
# branch. In this case, the change is applied to this
# submodule.
# 2. Zero or multiple submodules match both the remote and
# the branch of the triggering change. In these cases we
# error out.
for change in ctx.changes:
with self.m.step.nest(f'matching {change.name}') as pres:
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
matching_submodules = []
for submodule in submodules:
if submodule.initialized:
if ctx.remotes_equivalent(
submodule.remote, change.remote
):
self.m.step.empty(
f'match: {submodule.path} '
f'({submodule.remote})'
)
matching_submodules.append(submodule)
else:
self.m.step.empty(
f'no match: {submodule.path} '
f'({submodule.remote})'
)
if not matching_submodules:
pres.step_summary_text = 'no matching submodules'
continue
if len(matching_submodules) > 1:
submodule_info = ', '.join(
f'{self.m.path.relpath(sub.path, ctx.root)} '
f'(branch {sub.branch})'
for sub in matching_submodules
)
matching_with_branch = []
for submodule in matching_submodules:
if submodule.branch == change.branch:
matching_with_branch.append(submodule)
if len(matching_with_branch) == 1:
pres.step_summary_text = (
'one matching submodule'
)
matching_submodules = matching_with_branch
elif len(matching_with_branch) > 1:
pres.step_summary_text = (
'too many submodules match the branch'
)
raise self.m.step.StepFailure(
f'change {change.name} (branch '
f'{change.branch}) matches multiple '
f'submodules ({submodule_info}), but too '
'many branches match'
)
else:
pres.step_summary_text = (
'zero submodules match the branch'
)
raise self.m.step.StepFailure(
f'change {change.name} '
f'(branch {change.branch}) matches '
f'multiple submodules ({submodule_info}) '
'but no branches match'
)
if len(matching_submodules) == 1:
submodule = matching_submodules[0]
if not ctx.options.initialize_submodules:
with self.m.default_timeout():
self.m.git.submodule_update(
paths=(submodule.path,)
)
self._apply_change(ctx, change, cwd=submodule.path)
def _vars_primitive_only(x):
return {
k: v
for k, v in to_dict(x).items()
if isinstance(v, (int, str, bool, type(None)))
}
applied_changes = [
_vars_primitive_only(x) for x in ctx.changes if x.applied
]
ctx.changes_json = self.m.path.mkstemp()
self.m.file.write_json(
'write changes.json',
ctx.changes_json,
applied_changes,
)
# Run git log for both the top-level checkout and every submodule.
with self.m.step.nest('git log'):
self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10')
for submodule in sorted(submodules):
with self.m.context(cwd=submodule.path):
self.m.git(
str(submodule.path),
'log',
'--oneline',
'-n',
'10',
)
if got_revision:
with self.m.step.nest('base') as pres:
pres.properties['got_revision'] = got_revision
# got_revision_type isn't needed by anything but helps
# explain why got_revision is the value it is.
pres.properties['got_revision_type'] = got_revision_type
def _matching_branch(self, ctx: CheckoutContext):
"""Return if there are manifest branches that match the triggering CLs.
If the triggering change is on a branch name that is also present in the
manifest or superproject remote, use that branch when checking out the
project.
Args:
ctx (CheckoutContext): Context object.
Raises:
StepFailure if there are multiple matching branches.
Returns:
One matching branch name, or None.
"""
if not ctx.options.match_branch or not ctx.options.use_trigger:
with self.m.step.nest('not matching branch names'):
return
kind = 'manifest' if ctx.options.use_repo else 'superproject'
manifest_branch = None
branch_names = sorted(
set(
x.branch
for x in ctx.changes
if x.branch not in ('master', 'main', None)
)
)
if not branch_names:
with self.m.step.nest('no non-standard branch names'):
return
with self.m.step.nest('branch names') as pres:
pres.step_summary_text = str(branch_names)
matching_branches = self._matching_branches(
ctx.options.remote, branch_names, name=f'{kind} has branch'
)
if not matching_branches:
with self.m.step.nest('no branch names match'):
return
if len(matching_branches) > 1:
with self.m.step.nest(
f"too many matching branches ({', '.join(matching_branches)})"
) as pres:
pres.step_summary_text = (
"Can't figure out which {} branch to use. Remove some "
'"Requires:" lines to simplify the checkout.'.format(kind)
)
raise self.m.step.StepFailure('multiple matching branches')
manifest_branch = matching_branches.pop()
self.m.step(
f'changing {kind} branch to {manifest_branch}',
None,
)
return manifest_branch
def _repo(self, ctx: CheckoutContext):
"""Checkout code from an Android Repo Tool manifest."""
# Git makes the top-level folder, Repo requires caller to make it.
self.m.file.ensure_directory('mkdir checkout', ctx.root)
with self.m.context(cwd=ctx.root):
manifest_branch = self._matching_branch(ctx) or ctx.options.branch
with self.m.context(infra_steps=True):
kwargs = {}
if ctx.options.repo_init_timeout_sec:
kwargs['timeout'] = ctx.options.repo_init_timeout_sec
kwargs['attempts'] = ctx.options.number_of_attempts
if ctx.options.manifest_groups:
kwargs['groups'] = ctx.options.manifest_groups
self.m.repo.init(
manifest_url=ctx.options.remote,
manifest_branch=manifest_branch,
manifest_name=ctx.options.manifest_file,
clone_bundle=not ctx.options.repo_no_clone_bundle,
**kwargs,
)
manifests_dir = ctx.root / '.repo' / 'manifests'
# If the triggering CL is a manifest change, apply it before running
# sync.
if ctx.options.use_trigger:
for change in ctx.changes:
if change.remote and ctx.remotes_equivalent(
ctx.options.remote, change.remote
):
with self._apply_change_context(
ctx, change, cwd=manifests_dir
):
# Right now the upstream of 'working' is the local
# 'default' branch. 'repo sync' complains if the
# upstream isn't remote, so it's changed to the
# remote branch that's identical to 'default'.
self.m.git(
'git branch',
'branch',
f'--set-upstream-to=origin/{manifest_branch}',
)
ctx.manifest = self._read_manifest(
ctx.options.remote,
manifests_dir / ctx.options.manifest_file,
)
for _, remote_host in sorted(ctx.manifest.remotes.items()):
if remote_host.fetch.url.startswith('sso://'):
self.m.sso.configure_insteadof(remote_host.fetch.url)
with self.m.context(infra_steps=True):
kwargs = {}
if ctx.options.repo_sync_timeout_sec:
kwargs['timeout'] = ctx.options.repo_sync_timeout_sec
kwargs['attempts'] = ctx.options.number_of_attempts
self.m.repo.sync(
force_sync=True, current_branch=True, jobs=2, **kwargs
)
self.m.repo.start('base')
if ctx.options.use_trigger:
for change in ctx.changes:
for entry in ctx.manifest.projects:
if ctx.remotes_equivalent(entry.url, change.remote):
with self._apply_change_context(
ctx,
change,
cwd=entry.path_object(ctx.root),
):
with self.m.step.nest(
'compare branch name'
) as pres:
pres.step_summary_text = (
'CL branch: {}\nupstream branch: {}'
).format(change.branch, entry.upstream)
# Some dependent projects have everything inside one top-level folder
# in their repo workspace. For those projects pretend that top-level
# folder is actually the checkout root. The top member will always
# point to the actual repo workspace root.
ctx.top = ctx.root
files = set(self.m.file.listdir('ls', ctx.root))
dotrepo = ctx.root / '.repo'
if dotrepo in files:
files.remove(dotrepo)
orig_root = ctx.root
if len(files) == 1:
ctx.root = files.pop()
def _workspace(self, ctx: CheckoutContext):
if not ctx.options.eligible_workspace_paths or all(
x.applied for x in ctx.changes
):
return
repos_dir = self.m.path.start_dir / 'bazel_repos'
self.m.file.ensure_directory(f'mkdir {repos_dir}', repos_dir)
with self.m.step.nest('workspace'):
workspace_changed = False
for change in ctx.changes:
if change.applied:
continue # pragma: no cover
with self.m.step.nest(change.name) as pres:
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
for workspace in ctx.options.eligible_workspace_paths:
workspace_path = ctx.root / workspace
self.m.path.mock_add_file(workspace_path)
if not self.m.path.isfile(workspace_path):
continue # pragma: no cover
with self.m.step.nest(workspace):
repos = (
self.m.bazel.retrieve_git_repository_attributes(
checkout=ctx,
project_remote=change.remote,
path=workspace_path,
)
)
if not repos:
continue
# Things will be much simpler if we assume all
# entries with a url matching the change have
# identical remote URLs.
remotes = list(set(x['remote'] for x in repos))
assert len(remotes) == 1
remote = remotes[0]
name = remote
name = name.replace('http://', '')
name = name.replace('https://', '')
name = name.replace('sso://', '')
name = name.replace('.git.corp.google.com', '')
name = name.replace('.googlesource.com', '')
name = name.replace('/', '_')
path = repos_dir / name
self.m.git_checkout(
remote,
path=path,
step_name=f'checkout {name}',
ignore_build_input=True,
cache=False,
)
self._apply_change(ctx, change, path)
for repo in repos:
if 'strip_prefix' in repo:
path = path / repo['strip_prefix']
ctx.bazel_overrides[repo['name']] = path
def _configure_insteadof(self, ctx: CheckoutContext):
"""Configure git to use some urls in place of others."""
if not ctx.options.rewrites:
return
with self.m.step.nest('insteadof'):
for rewrite in ctx.options.rewrites:
self.m.git(
f"{rewrite.original} to {rewrite.final}",
"config",
"--global",
"--add",
f"url.{rewrite.final}.insteadof",
rewrite.original,
)
self.m.git("rewrites", "config", "--get-regexp", "^url.*")
def _name(self, options: Options):
"""Turn "https://foo/bar/baz.git" into "baz"."""
name = options.remote.rstrip('/').removesuffix('.git')
parts = name.split('/')
if options.use_repo and parts[-1] == 'manifest':
parts.pop(-1)
return f'checkout {parts[-1]}'
def __call__(
self,
options: Options,
root: config_types.Path | None = None,
name: str = None,
):
"""Checkout code."""
checkout_name = name or self._name(options)
assert options.remote
initial_options = repr(options)
options.manifest_file = options.manifest_file or 'default.xml'
options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20
options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60
options.number_of_attempts = options.number_of_attempts or 3
options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60
final_options = repr(options)
ctx = CheckoutContext(api=self.m)
ctx.options = options
ctx.changes = []
ctx.root = root or self.m.path.start_dir / 'co'
for remotes in options.equivalent_remotes:
new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
for remote in new_remotes:
assert remote not in ctx.equivalent_remotes
ctx.equivalent_remotes[remote] = new_remotes
with self.m.step.nest(checkout_name) as pres:
with self.m.step.nest('options') as options_pres:
options_pres.step_summary_text = initial_options
with self.m.step.nest('options with defaults') as options_pres:
options_pres.step_summary_text = final_options
if options.remote.endswith('.git'):
options.remote = options.remote[:-4]
if options.use_trigger:
ctx.changes = self._change_data(
ctx, options.remote, options.branch
)
self._configure_insteadof(ctx)
if options.use_repo:
self._repo(ctx)
else:
self._git(ctx)
self._workspace(ctx)
if not options.use_repo:
with self.m.context(cwd=ctx.root):
self.m.git.clean(force=2, recursive=True)
try:
self.m.git.status()
except self.m.step.StepFailure: # pragma: no cover
pass
ctx.status = self._check_unapplied_changes(ctx.changes)
if ctx.status:
for change in ctx.status.applied:
pres.links[f'applied {change.name_with_path}'] = (
change.gerrit_url
)
for change in ctx.status.not_applied:
pres.links[f'failed to apply {change.name}'] = (
change.gerrit_url
)
snapshot_dir = self.m.path.start_dir / 'snapshot'
ctx.snapshot_to_dir(snapshot_dir)
ctx.top = ctx.root
if ctx.options.root_subdirectory:
ctx.root = ctx.root / ctx.options.root_subdirectory
return ctx
def get_revision(
self,
root: config_types.Path,
name: str = 'git log',
test_data: str = 'HASH',
):
"""Like self.revision, but works for secondary checkouts."""
with self.m.context(cwd=root):
step = self.m.git(
name,
'log',
'--max-count=1',
'--pretty=format:%H',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
test_data,
),
)
# Sometimes test data has additional information, following a
# newline. Only keep the part before the first newline.
result = step.stdout.strip().split('\n')[0]
step.presentation.step_summary_text = result
return result