blob: f30fa70417aa740db11822aaabb763a815249dda [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.
Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""
import collections
import re
import urllib
import xml.etree.ElementTree
import attr
from PB.recipe_modules.pigweed.checkout.options import Options
from recipe_engine import config_types, recipe_api
PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'
@attr.s
class Manifest:
remotes = attr.ib(default=attr.Factory(dict))
projects = attr.ib(default=attr.Factory(list))
def dict(self):
return {
'remotes': {k: v.dict() for k, v in self.remotes.items()},
'projects': [x.dict() for x in self.projects],
}
class Url:
def __init__(self, url, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = url
self.https = None
def dict(self):
return self.__dict__.copy()
@attr.s
class Remote:
"""Remote config from manifest."""
name = attr.ib(type=str)
fetch = attr.ib(type=Url)
review = attr.ib(type=str, default=None)
revision = attr.ib(type=str, default=None)
alias = attr.ib(type=str, default=None)
def dict(self):
res = self.__dict__.copy()
res['fetch'] = res['fetch'].dict()
return res
@attr.s
class Project:
"""Key variables describing a repository/project."""
name = attr.ib(type=str)
path = attr.ib(type=str)
remote = attr.ib(type=str)
revision = attr.ib(type=str)
upstream = attr.ib(type=str)
url = attr.ib(type=str, default=None)
def path_object(self, root):
return root.join(self.path)
def dict(self):
return self.__dict__.copy()
def _str_or_none(x):
if x is None:
return x
return str(x)
def _int_or_none(x):
if x is None:
return x
return int(x)
@attr.s
class Change:
"""Data from buildbucket."""
number = attr.ib(converter=int)
bb_input = attr.ib(repr=False)
remote = attr.ib(converter=_str_or_none)
ref = attr.ib(converter=_str_or_none)
rebase = attr.ib(type=bool)
project = attr.ib(type=_str_or_none)
branch = attr.ib(converter=_str_or_none)
gerrit_name = attr.ib(converter=_str_or_none)
submitted = attr.ib(type=bool)
patchset = attr.ib(converter=_int_or_none, default=None)
applied = attr.ib(type=bool, default=False, repr=False)
base = attr.ib(converter=_str_or_none, default=None)
base_type = attr.ib(converter=_str_or_none, default=None)
is_merge = attr.ib(type=bool, default=False)
commit_message = attr.ib(type=str, default='')
@property
def gerrit_host(self):
return f'https://{self.gerrit_name}-review.googlesource.com'
@property
def gerrit_url(self):
if not self.number:
return self.gitiles_url
return f'{self.gerrit_host}/c/{self.number}'
@property
def gitiles_url(self):
return f'{self.remote}/+/{self.ref}'
@property
def name(self):
return f'{self.gerrit_name}:{self.number}'
@attr.s
class Submodule:
"""Submodule properties."""
api = attr.ib(type=recipe_api.RecipeApi, repr=False)
hash = attr.ib(type=str)
relative_path = attr.ib(type=str)
path = attr.ib(type=config_types.Path)
name = attr.ib(type=str)
describe = attr.ib(type=str)
remote = attr.ib(type=str)
initialized = attr.ib(type=bool)
modified = attr.ib(type=bool)
conflict = attr.ib(type=bool)
branch = attr.ib(type=str)
url = attr.ib(type=str)
update = attr.ib(type=str)
ignore = attr.ib(type=str)
shallow = attr.ib(type=bool)
fetchRecurseSubmodules = attr.ib(type=bool)
describe = attr.ib(type=str)
@attr.s
class StatusOfChanges:
"""Changes that were applied or not applied."""
applied = attr.ib() # Tuple of Change.
not_applied = attr.ib() # Tuple of Change.
@attr.s
class CheckoutContext:
_api = attr.ib(repr=False)
# Options protobuf passed in to checkout module.
options = attr.ib(default=None)
# List of triggering changes.
changes = attr.ib(default=None)
# Actual checkout root.
top = attr.ib(default=None)
# Logical checkout root. Usually identical to 'top', but occasionally a
# subdirectory instead.
root = attr.ib(default=None)
# Which triggering changes were applied or not applied.
status = attr.ib(default=None)
# Remotes that should be treated identically.
equivalent_remotes = attr.ib(default=attr.Factory(dict))
# Parsed repo manifest.
manifest = attr.ib(default=None)
# Path to a JSON file containing metadata about the triggering changes.
changes_json = attr.ib(default=None)
# Current revision number.
def revision(self):
if hasattr(self, '_revision'):
return self._revision
self._revision = self._api.checkout.get_revision(self.root)
return self._revision
def applied_changes(self):
return [x for x in self.changes if x.applied]
# Repo manifest with all projects pinned.
def manifest_snapshot(self):
if not self.options.use_repo:
return None
if hasattr(self, '_manifest_snapshot'):
return self._manifest_snapshot
with self._api.context(cwd=self.top):
self._manifest_snapshot = self._api.repo.manifest_snapshot()
return self._manifest_snapshot
# Equivalent of manifest_snapshot() but not as strictly formatted.
def submodule_snapshot(self):
if self.options.use_repo:
return None
if hasattr(self, '_submodule_snapshot'):
return self._submodule_snapshot
with self._api.context(cwd=self.root):
# To get step_test_data line to pass pylint.
raw_io_stream_output = self._api.raw_io.test_api.stream_output_text
self._submodule_snapshot = (
self._api.git(
'submodule-status',
'submodule',
'status',
'--recursive',
stdout=self._api.raw_io.output_text(),
step_test_data=lambda: raw_io_stream_output(
'submodule status filler text',
),
ok_ret='any',
).stdout.strip()
or ''
)
return self._submodule_snapshot
def snapshot_to_dir(self, directory):
self._api.file.ensure_directory('mkdir', directory)
if self.manifest_snapshot():
self._api.file.write_text(
'write manifest.xml',
directory / 'manifest.xml',
self.manifest_snapshot(),
)
if self.submodule_snapshot():
self._api.file.write_text(
'write submodule snapshot',
directory / 'submodules.log',
self.submodule_snapshot(),
)
with self._api.context(cwd=self.root):
log = self._api.git(
'log',
'log',
'--oneline',
'-n',
'10',
stdout=self._api.raw_io.output_text(),
ok_ret='any',
).stdout
self._api.file.write_text(
'write git log', directory / 'git.log', log,
)
def submodules(self, recursive=False):
"""Return data about all submodules."""
cmd = [
'python3',
self._api.checkout.resource('submodule_status.py'),
self.root,
self._api.json.output(),
]
if recursive:
cmd.append('--recursive')
submodules = []
submodule_status = self._api.step(
'submodule status',
cmd,
step_test_data=lambda: self._api.json.test_api.output({}),
).json.output
for sub in submodule_status.values():
sub['remote'] = self._api.sso.sso_to_https(sub['remote'])
if sub['remote'].endswith('.git'):
sub['remote'] = sub['remote'][:-4]
sub['relative_path'] = sub['path']
sub['path'] = self.root / sub['path']
submodules.append(Submodule(self._api, **sub))
return submodules
_REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')
def gerrit_host(self):
match = self._REMOTE_REGEX.match(self.options.remote)
if not match:
return # pragma: no cover
gerrit_review_host = f"{match.group('host')}"
if '-review' not in gerrit_review_host:
gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
return gerrit_review_host
def gerrit_project(self):
match = self._REMOTE_REGEX.match(self.options.remote)
if not match:
return # pragma: no cover
return match.group('project')
def remotes_equivalent(self, remote1, remote2):
# Sometimes remote1 or remote2 is None. In that case we shouldn't
# convert sso to https.
if remote1:
remote1 = self._api.sso.sso_to_https(remote1)
if remote2:
remote2 = self._api.sso.sso_to_https(remote2)
if remote1 == remote2:
return True
return remote1 in self.equivalent_remotes.get(remote2, ())
class CheckoutApi(recipe_api.RecipeApi):
"""Calls to checkout code."""
def _read_manifest(self, manifest_remote, manifest_file):
"""Reads manifest file to get git repo locations."""
with self.m.step.nest('read manifest') as read_step:
manifest_text = self.m.file.read_text('read file', manifest_file)
read_step.logs['raw'] = manifest_text
xml_tree = xml.etree.ElementTree.fromstring(manifest_text)
manifest = Manifest()
for remote in xml_tree.iter('remote'):
remote = Remote(**remote.attrib)
if remote.fetch.startswith('..'):
rest = remote.fetch[2:]
parsed = urllib.parse.urlparse(manifest_remote)
remote.fetch = f'{parsed.scheme}://{parsed.netloc}' + rest
remote.fetch = Url(remote.fetch)
remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
manifest.remotes[remote.name] = remote
defaults = {}
for default in xml_tree.iter('default'):
defaults.update(default.attrib)
for project in xml_tree.iter('project'):
name = project.attrib['name']
path = project.attrib.get('path', name)
if 'remote' in project.attrib:
remote = project.attrib['remote']
elif 'remote' in defaults:
remote = defaults['remote']
else: # pragma: no cover
assert False, f'remote not specified for {name}'
assert (
remote in manifest.remotes
), f'Remote {remote} does not exist'
if 'revision' in project.attrib:
revision = project.attrib['revision']
elif manifest.remotes[remote].revision:
revision = manifest.remotes[remote].revision
elif 'revision' in defaults:
revision = defaults['revision']
else: # pragma: no cover
assert False, f'revision not specified for {name}'
if 'upstream' in project.attrib:
upstream = project.attrib['upstream']
elif 'upstream' in defaults: # pragma: no cover
# This is unlikely to be used and hard to test--it requires
# a completely separate manifest definition, otherwise the
# 'else' condition won't be covered. It's also simple.
upstream = defaults['upstream']
else:
upstream = revision
# urllib.urljoin does something different than what's desired
# here.
url = '/'.join(
(
manifest.remotes[remote].fetch.https.rstrip('/'),
name.lstrip('/'),
)
)
manifest.projects.append(
Project(
name=name,
path=path,
remote=remote,
revision=revision,
upstream=upstream,
url=url,
)
)
self.m.file.write_json(
'manifest json',
self.m.path['start_dir'] / 'manifest.json',
manifest.dict(),
)
return manifest
def _process_gerrit_change(self, ctx, bb_input, change):
"""Process a LUCI GerritChange and return a Change object."""
assert change.host
ref = f'refs/changes/{change.change % 100:02}/{change.change}/{change.patchset}'
host = change.host.replace(
'-review.googlesource.com', '.googlesource.com'
)
remote = f'https://{host}/{change.project}'.strip('/')
gerrit_name = host.split('.')[0]
details = self.m.gerrit.change_details(
'details',
change_id=str(change.change),
host=change.host,
max_attempts=5,
query_params=['CURRENT_COMMIT', 'CURRENT_REVISION',],
timeout=30,
test_data=self.m.json.test_api.output(
{
'branch': 'main',
'current_revision': 'f' * 40,
'revisions': {
'f' * 40: {'commit': {'parents': [{}], 'message': '',},}
},
'project': 'pigweed',
}
),
).json.output
branch = details['branch']
rebase = not ctx.options.force_no_rebase
current_revision = details['revisions'][details['current_revision']]
is_merge = len(current_revision['commit']['parents']) > 1
if is_merge:
rebase = False
return Change(
number=change.change,
patchset=change.patchset,
bb_input=bb_input,
remote=remote,
ref=ref,
rebase=rebase,
is_merge=is_merge,
branch=branch,
gerrit_name=gerrit_name,
submitted=False,
commit_message=current_revision['commit']['message'],
project=details['project'],
)
def _process_gerrit_changes(self, ctx, bb_input):
seen = set()
for i, change in enumerate(bb_input.gerrit_changes):
with self.m.step.nest(str(i)):
result = self._process_gerrit_change(ctx, bb_input, change)
yield result
seen.add(result.name)
deps, unresolved = self.m.cq_deps.resolve(
result.gerrit_name, result.number,
)
for dep in deps:
# dep.name should only appear in seen if there are multiple
# gerrit_changes from buildbucket and a later one depends on an
# earlier one. If buildbucket has multiple gerrit_changes the
# cq_deps module is not needed here, so this is just double-checking
# something that shouldn't happen.
if dep.name in seen: # pragma: no cover
continue
seen.add(dep.name)
yield self._process_gerrit_change(ctx, bb_input, dep)
for cl in unresolved:
yield Change(
number=cl.change,
bb_input=None,
remote=None,
ref=None,
rebase=None,
project=None,
branch=None,
gerrit_name=cl.gerrit_name,
submitted=False,
)
def _number_details(self, host, commit_hash, branch='main'):
if 'github.com' in host or 'github-review' in host:
return None # pragma: no cover
try:
results = self.m.gerrit.change_query(
'number',
f'commit:{commit_hash}',
host=host,
max_attempts=5,
timeout=30,
test_data=self.m.json.test_api.output(
[
{
'_number': '1234',
'branch': branch,
'project': 'pigweed',
}
]
),
).json.output
# Skip this change if it didn't go through Gerrit.
if results and len(results) == 1:
return results[0]
except self.m.step.StepFailure: # pragma: no cover
pass
return None
def _change_data(self, ctx, remote=None, branch=None):
bb_input = self.m.buildbucket.build.input
results = []
triggers = collections.defaultdict(dict)
for trigger in self.m.scheduler.triggers:
gitiles = trigger.gitiles
if gitiles:
triggers[gitiles.repo][gitiles.revision] = trigger
with self.m.step.nest('change data'):
if bb_input.gerrit_changes:
with self.m.step.nest('process gerrit changes'):
results.extend(self._process_gerrit_changes(ctx, bb_input))
elif bb_input.gitiles_commit.id:
with self.m.step.nest('process gitiles commit'):
commit = bb_input.gitiles_commit
assert commit.host
if commit.project:
remote = f'https://{commit.host}/{commit.project}'
host = commit.host.replace(
'.googlesource.com', '-review.googlesource.com'
)
gerrit_name = commit.host.split('.')[0]
result = self._number_details(host, commit.id)
if result:
branch = result['branch']
if commit.id in triggers[remote]:
branch = triggers[remote][commit.id].gitiles.ref
if branch.startswith('refs/heads/'):
branch = branch[len('refs/heads/') :]
results.append(
Change(
number=result['_number'],
bb_input=bb_input,
remote=remote,
ref=commit.id,
rebase=False,
branch=branch,
gerrit_name=gerrit_name,
submitted=True,
project=result['project'],
)
)
if not results:
# If not triggered by a gitiles_poller gitiles_commit may be
# empty. In that case treat the most recent commit on the
# remote as the triggering commit. This is a good assumption
# except for Android Repo Tool projects, unless all projects
# are pinned to commits instead of tracking branches. However,
# even if this is wrong it's close enough to have utility.
head = self.m.git.get_remote_branch_head(remote, branch)
gerrit_name = urllib.parse.urlparse(remote).netloc.split('.')[0]
host = f'{gerrit_name}-review.googlesource.com'
result = self._number_details(host, head)
results.append(
Change(
number=result['_number'] if result else 0,
bb_input=bb_input,
remote=remote,
ref=head,
rebase=False,
branch=result['branch'] if result else branch,
gerrit_name=gerrit_name,
project=None,
submitted=True,
)
)
with self.m.step.nest('changes'):
for result in results:
with self.m.step.nest(result.name) as change_data_pres:
change_data_pres.step_summary_text = repr(result)
return tuple(results)
def _matching_branches(self, repo, branches, name='has branch', **kwargs):
"""Returns the subset of the given branches that exist on gitiles."""
matches = set()
with self.m.step.nest(name), self.m.context(infra_steps=True):
for branch in branches:
head = self.m.git.get_remote_branch_head(
repo,
branch,
step_name=f'git ls-remote {branch}',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
''
),
**kwargs,
)
with self.m.step.nest('head') as pres:
pres.step_summary_text = repr(head)
if head:
matches.add(branch)
return sorted(matches)
def _apply_change(self, ctx, change, cwd=None, extra_calls=None):
"""Applies the given change to the given directory.
Args:
change (Change): Change to apply.
cwd (Path): Working directory, defaults to current directory.
extra_calls (callable): Additional steps to run within the nested
'apply ...' step and, if specified, within directory cwd.
"""
kwargs = {'cwd': cwd} if cwd else {}
change.applied = True
apply_step = f'apply {change.name}'
with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
if cwd:
pres.step_summary_text = str(self.m.path.relpath(cwd, ctx.root))
with self.m.context(infra_steps=True):
# 'git fetch' fails if a submodule pin in the patch isn't
# present in the remote (for example, if the pin is only
# present in the uploader's workspace). Use
# '--no-recurse-submodules' here so 'git fetch' doesn't fail
# but instead 'git rebase' or 'git submodule update' fails
# later (important because those are not infra steps). Also
# don't use '--recurse-submodules' in 'git checkout' for
# similar reasons.
with self.m.default_timeout():
self.m.git.fetch(
change.remote,
change.ref,
recurse_submodules=False,
step_name='git fetch patch',
)
self.m.git(
'git checkout patch',
'checkout',
'--force',
'-b',
'working',
'FETCH_HEAD',
)
# These remain unused if change.submitted is False.
remote = remote_branch = None
with self.m.context(infra_steps=True):
# Change "https://foo.googlesource.com/bar"
# to "https___foo_googlesource_com_bar".
remote = re.sub(r'[^\w]', '_', change.remote)
remote_branch = '/'.join((remote, change.branch))
self.m.git(
'git remote add', 'remote', 'add', remote, change.remote,
)
with self.m.default_timeout():
self.m.git.fetch(
remote,
f'refs/heads/{change.branch}',
prune=False,
step_name='git fetch branch',
)
self.m.git(
'git set upstream',
'branch',
f'--set-upstream-to={remote_branch}',
)
if not change.submitted:
with self.m.context(infra_steps=True):
self.m.git('pre-rebase log', 'log', '--oneline', '-n', '10')
if change.submitted:
change.base = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD_' * 8,
),
)
change.base_type = 'submitted_commit_hash'
elif change.rebase:
self.m.git('git rebase', 'rebase', remote_branch)
change.base = self.m.git.rev_parse(
remote_branch,
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'REMOTE_BRANCH_' * 3,
),
)
change.base_type = 'remote_branch_tip'
else:
change.base = self.m.git(
'merge-base',
'merge-base',
'HEAD',
remote_branch,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'MERGEBASE_' * 4,
),
).stdout
change.base_type = 'merge-base'
# In most cases this is redundant or unnecessary, but it shouldn't
# cause problems. It's necessary when a superproject CL is updating
# a submodule pin and we need to sync the submodule to the new
# revision.
with self.m.default_timeout():
# See b/243673776 for why we detach before updating submodules.
self.m.git('detach', 'checkout', '--detach')
self.m.git.update_submodule(
recursive=True, timeout=ctx.options.submodule_timeout_sec,
)
self.m.git('reattach', 'checkout', '-')
# TODO: b/237660477 - Make this function a context manager so
# callers can do the following:
# with self._apply_change(...):
# extra_calls()
if extra_calls:
extra_calls()
def _check_unapplied_changes(self, changes):
applied = []
failed_to_apply = []
if not changes: # pragma: no cover
return None
def handle_unapplied_change(change):
with self.m.step.nest(f'failed to apply {change.name}') as pres:
pres.status = 'WARNING'
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
failed_to_apply.append(change)
with self.m.context(infra_steps=True):
if all(not x.applied for x in changes):
with self.m.step.nest('no changes were applied') as pres:
pres.status = 'FAILURE'
for change in changes:
handle_unapplied_change(change)
pres.properties['changes'] = [x.name for x in changes]
raise self.m.step.InfraFailure(
'could not find triggering changes in checkout'
)
elif any(not x.applied for x in changes):
with self.m.step.nest('some changes were not applied') as pres:
pres.status = 'WARNING'
for change in changes:
if change.applied:
applied.append(change)
else:
handle_unapplied_change(change)
else:
applied.extend(changes)
with self.m.step.nest('status') as pres:
pres.step_summary_text = (
f'applied {applied}\nnot applied {failed_to_apply}'
)
return StatusOfChanges(
applied=tuple(applied), not_applied=tuple(failed_to_apply),
)
def _cached_checkout(
self,
remote,
path,
ref,
submodules,
included_submodules=None,
excluded_submodules=None,
submodule_timeout_sec=10 * 60,
cache=True,
use_packfiles=True,
**kwargs,
):
submodule_paths = included_submodules = included_submodules or []
if cache:
with self.m.step.nest('cache'), self.m.cache.guard('git'):
parsed_remote = urllib.parse.urlparse(remote)
cache_name = parsed_remote.hostname + parsed_remote.path.replace(
'-', '--'
).replace(
'/', '-'
)
cache_path = self.m.path['cache'] / 'git' / cache_name
self.m.file.ensure_directory('makedirs', cache_path)
with self.m.context(cwd=cache_path):
dotgit = cache_path / '.git'
if self.m.path.exists(dotgit): # pragma: no cover
self.m.git.config_remove_section(
'remote.origin', **kwargs
)
else:
self.m.git.init(bare=False, **kwargs)
self.m.git.config(
'remote.origin.url',
remote,
step_name='remote set-url',
**kwargs,
)
if use_packfiles:
self.m.git.config(
'fetch.uriprotocols',
'https',
step_name='set fetch.uriprotocols',
**kwargs,
)
with self.m.default_timeout():
self.m.git.fetch(
repository='origin',
prune=True,
tags=True,
recurse_submodules=submodules,
**kwargs,
)
self.m.git.raw_checkout(
ref='FETCH_HEAD', force=True, **kwargs
)
if included_submodules and excluded_submodules:
raise self.m.step.InfraFailure(
'cannot specify both included_submodules and '
'excluded_submodules'
)
submodule_paths = included_submodules
if excluded_submodules:
submodule_status = self.m.git(
'submodule status',
'submodule',
'status',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
'-0000000000000000000000000000000000000000 pigweed\n'
'-1111111111111111111111111111111111111111 nanopb\n'
),
).stdout.splitlines()
submodule_paths = [
x.split(None, 1)[1] for x in submodule_status
]
for sub in excluded_submodules:
if sub not in submodule_paths:
raise self.m.step.InfraFailure(
f'excluded submodule {sub} is not a submodule'
)
with self.m.step.nest(f'excluding submodule {sub}'):
pass
submodule_paths.remove(sub)
for sub in submodule_paths:
with self.m.step.nest(f'including submodule {sub}'):
pass
if submodules or submodule_paths:
self.m.git.sync_submodule(recursive=True, **kwargs)
with self.m.default_timeout():
self.m.git.update_submodule(
recursive=True,
force=True,
paths=submodule_paths,
timeout=submodule_timeout_sec,
**kwargs,
)
if not submodules:
# Even though submodules weren't requested, if the cache
# had any active submodules we need to update them.
# Otherwise we'll get weird situations in rolls where an
# uninvolved submodule will be rolled back.
with self.m.default_timeout():
self.m.git.update_submodule(
recursive=True,
force=True,
init=False,
timeout=submodule_timeout_sec,
**kwargs,
)
self.m.file.copytree(
'copy from cache', cache_path, path, symlinks=True
)
# Deliberately not combining contexts into one line so it's obvious to
# both devs and Python which one is "outer" and which is "inner".
with self.m.step.nest('git checkout'):
with self.m.default_timeout():
self.m.git_checkout(
repo=remote,
path=path,
cache=False,
revision=ref,
recursive=submodules,
submodules=submodules,
submodule_force=submodules,
submodule_paths=submodule_paths,
step_name="",
use_packfiles=use_packfiles,
)
def _git(self, ctx):
"""Checkout code from git."""
super_branch = self._matching_branch(ctx) or ctx.options.branch
with self.m.context(infra_steps=True):
self._cached_checkout(
ctx.options.remote,
path=ctx.root,
ref=super_branch,
cache=not ctx.options.do_not_cache,
submodules=ctx.options.initialize_submodules,
submodule_timeout_sec=ctx.options.submodule_timeout_sec,
included_submodules=ctx.options.included_submodules,
excluded_submodules=ctx.options.excluded_submodules,
use_packfiles=not ctx.options.do_not_use_packfiles,
)
with self.m.context(cwd=ctx.root):
got_revision = None
got_revision_type = 'no_trigger'
submodules = []
if ctx.options.use_trigger:
got_revision = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD' * 10,
),
)
# Check for CLs for the top-level repository.
for change in ctx.changes:
if ctx.remotes_equivalent(
ctx.options.remote, change.remote
):
self._apply_change(ctx, change)
got_revision = change.base
got_revision_type = change.base_type
submodules = ctx.submodules(recursive=True)
# Check for CLs for submodules.
# There are three rough cases:
# 1. Zero submodules have matching remotes. In this case we do
# nothing.
# 2. Exactly one submodule has a matching remote. In this case,
# the change is applied to that submodule, even if the branch
# does not match.
# 3. Multiple submodules have matching remotes.
# 1. Exactly one of them matches the triggering change's
# branch. In this case, the change is applied to this
# submodule.
# 2. Zero or multiple submodules match both the remote and
# the branch of the triggering change. In these cases we
# error out.
for change in ctx.changes:
matching_submodules = []
for submodule in submodules:
if ctx.remotes_equivalent(
submodule.remote, change.remote
):
matching_submodules.append(submodule)
if not matching_submodules:
continue
if len(matching_submodules) > 1:
submodule_info = ', '.join(
f'{self.m.path.relpath(sub.path, ctx.root)} '
f'(branch {sub.branch})'
for sub in matching_submodules
)
matching_with_branch = []
for submodule in matching_submodules:
if submodule.branch == change.branch:
matching_with_branch.append(submodule)
if len(matching_with_branch) == 1:
matching_submodules = matching_with_branch
elif len(matching_with_branch) > 1:
raise self.m.step.StepFailure(
f'change {change.name} (branch '
f'{change.branch}) matches multiple submodules '
f'({submodule_info}), but too many branches '
'match'
)
else:
raise self.m.step.StepFailure(
f'change {change.name} '
f'(branch {change.branch}) matches multiple '
f'submodules ({submodule_info}) but no '
'branches match'
)
if len(matching_submodules) == 1:
submodule = matching_submodules[0]
if not ctx.options.initialize_submodules:
with self.m.default_timeout():
self.m.git.update_submodule(
paths=(submodule.path,)
)
self._apply_change(ctx, change, cwd=submodule.path)
ctx.status = self._check_unapplied_changes(ctx.changes)
def _vars_primitive_only(x):
return {
k: v
for k, v in vars(x).items()
if isinstance(v, (int, str, bool, type(None)))
}
applied_changes = [
_vars_primitive_only(x) for x in ctx.changes if x.applied
]
ctx.changes_json = self.m.path.mkstemp()
self.m.file.write_json(
'write changes.json', ctx.changes_json, applied_changes,
)
# Run git log for both the top-level checkout and every submodule.
with self.m.step.nest('git log'):
self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10')
for submodule in sorted(submodules):
with self.m.context(cwd=submodule.path):
self.m.git(
str(submodule.path), 'log', '--oneline', '-n', '10',
)
if got_revision:
with self.m.step.nest('base') as pres:
pres.properties['got_revision'] = got_revision
# got_revision_type isn't needed by anything but helps
# explain why got_revision is the value it is.
pres.properties['got_revision_type'] = got_revision_type
def _matching_branch(self, ctx):
"""Return if there are manifest branches that match the triggering CLs.
If the triggering change is on a branch name that is also present in the
manifest or superproject remote, use that branch when checking out the
project.
Args:
ctx (CheckoutContext): Context object.
Raises:
StepFailure if there are multiple matching branches.
Returns:
One matching branch name, or None.
"""
if not ctx.options.match_branch or not ctx.options.use_trigger:
with self.m.step.nest('not matching branch names'):
return
kind = 'manifest' if ctx.options.use_repo else 'superproject'
manifest_branch = None
branch_names = sorted(
set(
x.branch
for x in ctx.changes
if x.branch not in ('master', 'main', None)
)
)
if not branch_names:
with self.m.step.nest('no non-standard branch names'):
return
with self.m.step.nest('branch names') as pres:
pres.step_summary_text = str(branch_names)
matching_branches = self._matching_branches(
ctx.options.remote, branch_names, name=f'{kind} has branch'
)
if not matching_branches:
with self.m.step.nest('no branch names match'):
return
if len(matching_branches) > 1:
with self.m.step.nest(
f"too many matching branches ({', '.join(matching_branches)})"
) as pres:
pres.step_summary_text = (
"Can't figure out which {} branch to use. Remove some "
'"Requires:" lines to simplify the checkout.'.format(kind)
)
raise self.m.step.StepFailure('multiple matching branches')
manifest_branch = matching_branches.pop()
self.m.step(
f'changing {kind} branch to {manifest_branch}', None,
)
return manifest_branch
def _repo(self, ctx):
"""Checkout code from an Android Repo Tool manifest.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
manifest_file (str): Name of manifest XML file.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path): Path to checkout into.
changes (sequence[Change]): List of triggering changes.
Returns:
_StatusOfChanges with applied and not applied CLs.
"""
# Git makes the top-level folder, Repo requires caller to make it.
self.m.file.ensure_directory('mkdir checkout', ctx.root)
with self.m.context(cwd=ctx.root):
manifest_branch = self._matching_branch(ctx) or ctx.options.branch
with self.m.context(infra_steps=True):
kwargs = {}
if ctx.options.repo_init_timeout_sec:
kwargs['timeout'] = ctx.options.repo_init_timeout_sec
kwargs['attempts'] = ctx.options.number_of_attempts
if ctx.options.manifest_groups:
kwargs['groups'] = ctx.options.manifest_groups
self.m.repo.init(
manifest_url=ctx.options.remote,
manifest_branch=manifest_branch,
manifest_name=ctx.options.manifest_file,
**kwargs,
)
manifests_dir = ctx.root / '.repo' / 'manifests'
# If the triggering CL is a manifest change, apply it before running
# sync.
if ctx.options.use_trigger:
for change in ctx.changes:
if change.remote and ctx.remotes_equivalent(
ctx.options.remote, change.remote
):
def update_upstream():
# Right now the upstream of 'working' is the local
# 'default' branch. 'repo sync' complains if the
# upstream isn't remote, so it's changed to the
# remote branch that's identical to 'default'.
self.m.git(
'git branch',
'branch',
f'--set-upstream-to=origin/{manifest_branch}',
)
self._apply_change(
ctx,
change,
cwd=manifests_dir,
extra_calls=update_upstream,
)
ctx.manifest = self._read_manifest(
ctx.options.remote, manifests_dir / ctx.options.manifest_file,
)
for _, remote_host in sorted(ctx.manifest.remotes.items()):
if remote_host.fetch.url.startswith('sso://'):
self.m.sso.configure_insteadof(remote_host.fetch.url)
with self.m.context(infra_steps=True):
kwargs = {}
if ctx.options.repo_sync_timeout_sec:
kwargs['timeout'] = ctx.options.repo_sync_timeout_sec
kwargs['attempts'] = ctx.options.number_of_attempts
self.m.repo.sync(
force_sync=True, current_branch=True, jobs=2, **kwargs
)
self.m.repo.start('base')
if ctx.options.use_trigger:
for change in ctx.changes:
for entry in ctx.manifest.projects:
if ctx.remotes_equivalent(entry.url, change.remote):
def compare_branch_name():
with self.m.step.nest(
'compare branch name'
) as pres:
pres.step_summary_text = (
'CL branch: {}\nupstream branch: {}'
).format(change.branch, entry.upstream)
self._apply_change(
ctx,
change,
cwd=entry.path_object(ctx.root),
extra_calls=compare_branch_name,
)
ctx.status = self._check_unapplied_changes(ctx.changes)
# Some dependent projects have everything inside one top-level folder
# in their repo workspace. For those projects pretend that top-level
# folder is actually the checkout root. The top member will always
# point to the actual repo workspace root.
ctx.top = ctx.root
files = set(self.m.file.listdir('ls', ctx.root))
dotrepo = ctx.root / '.repo'
if dotrepo in files:
files.remove(dotrepo)
orig_root = ctx.root
if len(files) == 1:
ctx.root = files.pop()
def _configure_insteadof(self, ctx):
"""Configure git to use some urls in place of others."""
if not ctx.options.rewrites:
return
with self.m.step.nest('insteadof'):
for rewrite in ctx.options.rewrites:
self.m.git(
f"{rewrite.original} to {rewrite.final}",
"config",
"--global",
"--add",
f"url.{rewrite.final}.insteadof",
rewrite.original,
)
self.m.git("rewrites", "config", "--get-regexp", "^url.*")
def _name(self, options):
"""Turn "https://foo/bar/baz.git" into "baz"."""
name = options.remote.rstrip('/')
if name.endswith('.git'):
name = name[:-4]
parts = name.split('/')
if options.use_repo and parts[-1] == 'manifest':
parts.pop(-1)
return f'checkout {parts[-1]}'
def __call__(self, options, root=None, name=None):
"""Checkout code."""
checkout_name = name or self._name(options)
assert options.remote
options.manifest_file = options.manifest_file or 'default.xml'
options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20
options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60
options.number_of_attempts = options.number_of_attempts or 3
options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60
ctx = CheckoutContext(api=self.m)
ctx.options = options
ctx.changes = []
ctx.root = root or self.m.path['start_dir'] / 'co'
for remotes in options.equivalent_remotes:
new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
for remote in new_remotes:
assert remote not in ctx.equivalent_remotes
ctx.equivalent_remotes[remote] = new_remotes
with self.m.step.nest(checkout_name) as pres:
if options.remote.endswith('.git'):
options.remote = options.remote[:-4]
if options.use_trigger:
ctx.changes = self._change_data(
ctx, options.remote, options.branch
)
self._configure_insteadof(ctx)
if options.use_repo:
self._repo(ctx)
else:
self._git(ctx)
if ctx.status:
for change in ctx.status.applied:
pres.links[f'applied {change.name}'] = change.gerrit_url
for change in ctx.status.not_applied:
pres.links[
f'failed to apply {change.name}'
] = change.gerrit_url
snapshot_dir = self.m.path['start_dir'] / 'snapshot'
ctx.snapshot_to_dir(snapshot_dir)
ctx.top = ctx.root
if ctx.options.root_subdirectory:
ctx.root = ctx.root / ctx.options.root_subdirectory
return ctx
def get_revision(self, root, name='git log', test_data='HASH'):
"""Like self.revision, but works for secondary checkouts."""
with self.m.context(cwd=root):
step = self.m.git(
name,
'log',
'--max-count=1',
'--pretty=format:%H',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
test_data,
),
)
result = step.stdout.strip()
step.presentation.step_summary_text = result
return result