blob: d1a7904ce4abee2197b11fabf4b4ac5609af2e6f [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.
Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""
import re
import xml.etree.ElementTree
import attr
from recipe_engine import config_types, recipe_api
from RECIPE_MODULES.fuchsia.utils import memoize
from six.moves import urllib
PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'
@attr.s
class _Manifest(object):
remotes = attr.ib(default=attr.Factory(dict))
projects = attr.ib(default=attr.Factory(list))
def dict(self):
return {
'remotes': {k: v.dict() for k, v in self.remotes.items()},
'projects': [x.dict() for x in self.projects],
}
class _Url(object):
def __init__(self, url, *args, **kwargs):
super(_Url, self).__init__(*args, **kwargs)
self.url = url
self.https = None
def dict(self):
return self.__dict__.copy()
@attr.s
class _Remote(object):
"""Remote config from manifest."""
name = attr.ib(type=str)
fetch = attr.ib(type=_Url)
review = attr.ib(type=str, default=None)
revision = attr.ib(type=str, default=None)
alias = attr.ib(type=str, default=None)
def dict(self):
res = self.__dict__.copy()
res['fetch'] = res['fetch'].dict()
return res
@attr.s
class _Project(object):
"""Key variables describing a repository/project."""
name = attr.ib(type=str)
path = attr.ib(type=str)
remote = attr.ib(type=str)
revision = attr.ib(type=str)
upstream = attr.ib(type=str)
url = attr.ib(type=str, default=None)
def path_object(self, root):
return root.join(self.path)
def dict(self):
return self.__dict__.copy()
def _str_or_none(x):
if x is None:
return x
return str(x)
@attr.s
class _Change(object):
"""Data from buildbucket."""
# TODO(pwbug/465) Remove converters after switch to Python 3.
number = attr.ib(converter=int)
bb_input = attr.ib(repr=False)
remote = attr.ib(converter=_str_or_none)
ref = attr.ib(converter=_str_or_none)
rebase = attr.ib(type=bool)
branch = attr.ib(converter=_str_or_none)
gerrit_name = attr.ib(converter=_str_or_none)
submitted = attr.ib(type=bool)
applied = attr.ib(type=bool, default=False, repr=False)
base = attr.ib(converter=_str_or_none, default=None)
base_type = attr.ib(converter=_str_or_none, default=None)
@property
def gerrit_url(self):
if not self.number:
return self.gitiles_url
return 'https://{}-review.googlesource.com/c/{}'.format(
self.gerrit_name, self.number
)
@property
def gitiles_url(self):
return '{}/+/{}'.format(self.remote, self.ref)
@property
def name(self):
return '{}:{}'.format(self.gerrit_name, self.number)
@attr.s
class _Submodule(object):
"""Submodule properties."""
api = attr.ib(type=recipe_api.RecipeApi, repr=False)
hash = attr.ib(type=str)
relative_path = attr.ib(type=str)
path = attr.ib(type=config_types.Path)
describe = attr.ib(type=str)
remote = attr.ib(type=str)
@attr.s
class _StatusOfChanges(object):
"""Changes that were applied or not applied."""
applied = attr.ib() # Tuple of _Change.
not_applied = attr.ib() # Tuple of _Change.
class CheckoutApi(recipe_api.RecipeApi):
"""Calls to checkout code."""
def __init__(self, props, *args, **kwargs):
super(CheckoutApi, self).__init__(*args, **kwargs)
# TODO(pwbug/465) Remove str calls after switch to Python 3.
self._remote = str(props.remote)
self._branch = str(props.branch or 'main')
self._use_repo = props.use_repo
self._manifest_file = str(props.manifest_file or 'default.xml')
self._root = None
self._revision = None
self._repo_top = None
self._manifest = None
self._triggering_repo = None
self._changes = None
self._manifest_snapshot = None
self._submodule_snapshot = None
self._repo_init_timeout_sec = props.repo_init_timeout_sec or 20
self._repo_sync_timeout_sec = props.repo_sync_timeout_sec or 120
self._number_of_attempts = props.number_of_attempts or 3
self._submodule_data = {}
self._equivalent_remotes = {}
self._force_no_rebase = props.force_no_rebase
self.props = props
def initialize(self):
for remotes in self.props.equivalent_remotes:
new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
for remote in new_remotes:
assert remote not in self._equivalent_remotes
self._equivalent_remotes[remote] = new_remotes
def remotes_equivalent(self, remote1, remote2):
# Sometimes remote1 or remote2 is None. In that case we shouldn't
# convert sso to https.
if remote1:
remote1 = self.m.sso.sso_to_https(remote1)
if remote2:
remote2 = self.m.sso.sso_to_https(remote2)
if remote1 == remote2:
return True
return remote1 in self._equivalent_remotes.get(remote2, ())
def _read_manifest(self, manifest_remote, manifest_file):
"""Reads manifest file to get git repo locations."""
with self.m.step.nest('read manifest') as read_step:
manifest_text = self.m.file.read_text('read file', manifest_file)
read_step.logs['raw'] = manifest_text
xml_tree = xml.etree.ElementTree.fromstring(manifest_text)
manifest = _Manifest()
for remote in xml_tree.iter('remote'):
remote = _Remote(**remote.attrib)
if remote.fetch.startswith('..'):
rest = remote.fetch[2:]
parsed = urllib.parse.urlparse(manifest_remote)
remote.fetch = (
'{}://{}'.format(parsed.scheme, parsed.netloc,) + rest
)
remote.fetch = _Url(remote.fetch)
remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
manifest.remotes[remote.name] = remote
defaults = {}
for default in xml_tree.iter('default'):
defaults.update(default.attrib)
for project in xml_tree.iter('project'):
name = project.attrib['name']
path = project.attrib.get('path', name)
if 'remote' in project.attrib:
remote = project.attrib['remote']
elif 'remote' in defaults:
remote = defaults['remote']
else: # pragma: no cover
assert False, 'remote not specified for {}'.format(name)
assert (
remote in manifest.remotes
), 'Remote {} does not exist'.format(remote)
if 'revision' in project.attrib:
revision = project.attrib['revision']
elif manifest.remotes[remote].revision:
revision = manifest.remotes[remote].revision
elif 'revision' in defaults:
revision = defaults['revision']
else: # pragma: no cover
assert False, 'revision not specified for {}'.format(name)
if 'upstream' in project.attrib:
upstream = project.attrib['upstream']
elif 'upstream' in defaults: # pragma: no cover
# This is unlikely to be used and hard to test--it requires
# a completely separate manifest definition, otherwise the
# 'else' condition won't be covered. It's also simple.
upstream = defaults['upstream']
else:
upstream = revision
# urllib.urljoin does something different than what's desired
# here.
url = '/'.join(
(
manifest.remotes[remote].fetch.https.rstrip('/'),
name.lstrip('/'),
)
)
manifest.projects.append(
_Project(
name=name,
path=path,
remote=remote,
revision=revision,
upstream=upstream,
url=url,
)
)
self.m.file.write_json(
'manifest json',
self.m.path['start_dir'].join('manifest.json'),
manifest.dict(),
)
return manifest
def _process_gerrit_change(self, bb_input, change):
"""Process a LUCI GerritChange and return a _Change object."""
assert change.host
ref = 'refs/changes/{:02}/{}/{}'.format(
change.change % 100, change.change, change.patchset,
)
host = change.host.replace(
'-review.googlesource.com', '.googlesource.com'
)
remote = 'https://{}/{}'.format(host, change.project).strip('/')
gerrit_name = host.split('.')[0]
branch = self.m.gerrit.change_details(
'details',
change_id=str(change.change),
host=change.host,
max_attempts=5,
timeout=30,
test_data=self.m.json.test_api.output({'branch': 'main'}),
).json.output['branch']
rebase = not self._force_no_rebase
return _Change(
number=change.change,
bb_input=bb_input,
remote=remote,
ref=ref,
rebase=rebase,
branch=branch,
gerrit_name=gerrit_name,
submitted=False,
)
def _process_gerrit_changes(self, bb_input):
seen = set()
for i, change in enumerate(bb_input.gerrit_changes):
with self.m.step.nest(str(i)):
result = self._process_gerrit_change(bb_input, change)
yield result
seen.add(result.name)
deps, unresolved = self.m.cq_deps.resolve(
result.gerrit_name, result.number,
)
for dep in deps:
# dep.name should only appear in seen if there are multiple
# gerrit_changes from buildbucket and a later one depends on an
# earlier one. If buildbucket has multiple gerrit_changes the
# cq_deps module is not needed here, so this is just double-checking
# something that shouldn't happen.
if dep.name in seen: # pragma: no cover
continue
seen.add(dep.name)
yield self._process_gerrit_change(bb_input, dep)
for cl in unresolved:
yield _Change(
number=cl.change,
bb_input=None,
remote=None,
ref=None,
rebase=None,
branch=None,
gerrit_name=cl.gerrit_name,
submitted=False,
)
def _number_details(self, host, commit_hash, branch='main'):
try:
results = self.m.gerrit.change_query(
'number',
'commit:{}'.format(commit_hash),
host=host,
max_attempts=5,
timeout=30,
test_data=self.m.json.test_api.output(
[{'_number': '1234', 'branch': branch}]
),
).json.output
# Skip this change if it didn't go through Gerrit.
if results and len(results) == 1:
return results[0]
except self.m.step.StepFailure: # pragma: no cover
pass
return None
def _change_data(self, remote=None, branch=None):
bb_input = self.m.buildbucket.build.input
results = []
with self.m.step.nest('change data'):
if bb_input.gerrit_changes:
with self.m.step.nest('process gerrit changes'):
results.extend(self._process_gerrit_changes(bb_input))
elif bb_input.gitiles_commit.id:
with self.m.step.nest('process gitiles commit'):
commit = bb_input.gitiles_commit
assert commit.host
if commit.project:
remote = 'https://{}/{}'.format(
commit.host, commit.project
)
host = commit.host.replace(
'.googlesource.com', '-review.googlesource.com'
)
gerrit_name = commit.host.split('.')[0]
result = self._number_details(host, commit.id)
if result:
results.append(
_Change(
number=result['_number'],
bb_input=bb_input,
remote=remote,
ref=commit.id,
rebase=False,
branch=result['branch'],
gerrit_name=gerrit_name,
submitted=True,
)
)
else:
# If not triggered by a gitiles_poller gitiles_commit may be
# empty. In that case treat the most recent commit on the
# remote as the triggering commit. This is a good assumption
# except for Android Repo Tool projects, unless all projects
# are pinned to commits instead of tracking branches. However,
# even if this is wrong it's close enough to have utility.
head = self.m.git.get_remote_branch_head(remote, branch)
gerrit_name = urllib.parse.urlparse(remote).netloc.split('.')[0]
host = '{}-review.googlesource.com'.format(gerrit_name)
result = self._number_details(host, head)
results.append(
_Change(
number=result['_number'] if result else 0,
bb_input=bb_input,
remote=remote,
ref=head,
rebase=False,
branch=result['branch'] if result else branch,
gerrit_name=gerrit_name,
submitted=True,
)
)
with self.m.step.nest('changes'):
for result in results:
with self.m.step.nest(result.name) as change_data_pres:
change_data_pres.step_summary_text = repr(result)
return tuple(results)
def _parse_submodule_status(self, root, line):
"""Parse a `git submodule status` and get the remote URL."""
match = re.search(
r'^\+?(?P<hash>[0-9a-fA-F]{40})\s+'
r'(?P<path>[^()]*)\s+'
r'\((?P<describe>[^()]*)\)$',
line.strip(),
)
if not match:
raise self.m.step.InfraFailure(
'unrecognized submodule status line "{}"'.format(line)
)
with self.m.step.nest(match.group('path')) as pres:
pres.step_summary_text = 'hash={}\ndescribe={}'.format(
match.group('hash'), match.group('describe')
)
path = root.join(match.group('path'))
with self.m.context(cwd=path):
remote = self.m.git(
'git origin {}'.format(path),
'config',
'--get',
'remote.origin.url',
stdout=self.m.raw_io.output_text(),
).stdout.strip()
remote_https = self.m.sso.sso_to_https(remote)
if remote_https.endswith('.git'):
remote_https = remote_https[0:-4]
return _Submodule(
api=self.m,
hash=match.group('hash'),
relative_path=match.group('path'),
path=path,
describe=match.group('describe'),
remote=remote_https,
)
def _matching_branches(self, repo, branches, name='has branch', **kwargs):
"""Returns the subset of the given branches that exist on gitiles."""
matches = set()
with self.m.step.nest(name), self.m.context(infra_steps=True):
for branch in branches:
head = self.m.git.get_remote_branch_head(
repo,
branch,
step_name='git ls-remote {}'.format(branch),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
''
),
**kwargs
)
with self.m.step.nest('head') as pres:
pres.step_summary_text = repr(head)
if head:
matches.add(branch)
return sorted(matches)
def _apply_change(self, change, cwd=None, extra_calls=None):
"""Applies the given change to the given directory.
Args:
change (_Change): Change to apply.
cwd (Path): Working directory, defaults to current directory.
extra_calls (callable): Additional steps to run within the nested
'apply ...' step and, if specified, within directory cwd.
"""
kwargs = {'cwd': cwd} if cwd else {}
change.applied = True
apply_step = 'apply {}'.format(change.name)
with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
with self.m.context(infra_steps=True):
# 'git fetch' fails if a submodule pin in the patch isn't
# present in the remote (for example, if the pin is only
# present in the uploader's workspace). Use
# '--no-recurse-submodules' here so 'git fetch' doesn't fail
# but instead 'git rebase' or 'git submodule update' fails
# later (important because those are not infra steps). Also
# don't use '--recurse-submodules' in 'git checkout' for
# similar reasons.
self.m.git.fetch(
change.remote,
change.ref,
recurse_submodules=False,
step_name='git fetch patch',
)
self.m.git(
'git checkout patch',
'checkout',
'--force',
'-b',
'working',
'FETCH_HEAD',
)
# These remain unused if change.submitted is False.
remote = remote_branch = None
if not change.submitted:
with self.m.context(infra_steps=True):
# Change "https://foo.googlesource.com/bar"
# to "https___foo_googlesource_com_bar".
remote = re.sub(r'[^\w]', '_', change.remote)
remote_branch = '/'.join((remote, change.branch))
self.m.git(
'git remote add',
'remote',
'add',
remote,
change.remote,
)
self.m.git('pre-rebase log', 'log', '--oneline', '-n', '10')
self.m.git.fetch(
remote,
# TODO(pwbug/578) Stop fetching all branches.
# change.branch,
prune=False,
step_name='git fetch branch',
)
self.m.git(
'git set upstream',
'branch',
'--set-upstream-to={}'.format(remote_branch),
)
if change.submitted:
change.base = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD_' * 8,
),
)
change.base_type = 'submitted_commit_hash'
elif change.rebase:
self.m.git('git rebase', 'rebase', remote_branch)
change.base = self.m.git.rev_parse(
remote_branch,
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'REMOTE_BRANCH_' * 3,
),
)
change.base_type = 'remote_branch_tip'
else:
change.base = self.m.git(
'merge-base',
'merge-base',
'HEAD',
remote_branch,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'MERGEBASE_' * 4,
),
).stdout
change.base_type = 'merge-base'
# In most cases this is redundant or unnecessary, but it shouldn't
# cause problems. It's necessary when a superproject CL is updating
# a submodule pin and we need to sync the submodule to the new
# revision. Including "--init" in case this CL is adding a submodule
# in which case it wouldn't otherwise be initialized.
self.m.git(
'git submodule update',
'submodule',
'update',
'--init',
'--recursive',
)
# TODO(pwbug/233) Make this function a context manager so callers
# can do the following:
# with self._apply_change(...):
# extra_calls()
if extra_calls:
extra_calls()
def _check_unapplied_changes(self, changes):
applied = []
failed_to_apply = []
if not changes: # pragma: no cover
return None
def handle_unapplied_change(change):
with self.m.step.nest(
'failed to apply {}'.format(change.name)
) as pres:
pres.status = 'WARNING'
pres.links['gerrit'] = change.gerrit_url
pres.links['gitiles'] = change.gitiles_url
failed_to_apply.append(change)
with self.m.context(infra_steps=True):
if all(not x.applied for x in changes):
with self.m.step.nest('no changes were applied') as pres:
pres.status = 'FAILURE'
for change in changes:
handle_unapplied_change(change)
pres.properties['changes'] = [x.name for x in changes]
raise self.m.step.InfraFailure(
'could not find triggering changes in checkout'
)
elif any(not x.applied for x in changes):
with self.m.step.nest('some changes were not applied') as pres:
pres.status = 'WARNING'
for change in changes:
if change.applied:
applied.append(change)
else:
handle_unapplied_change(change)
else:
applied.extend(changes)
with self.m.step.nest('status') as pres:
pres.step_summary_text = 'applied {}\nnot applied {}'.format(
applied, failed_to_apply,
)
return _StatusOfChanges(
applied=tuple(applied), not_applied=tuple(failed_to_apply),
)
@memoize
def submodules(self, root=None, recursive=False):
"""Return data about all submodules of root."""
if not root:
root = self.root
with self.m.context(cwd=root):
args = ['git submodule status', 'submodule', 'status']
if recursive:
args.append('--recursive')
kwargs = {
'stdout': self.m.raw_io.output_text(),
'step_test_data': lambda: self.m.raw_io.test_api.stream_output_text(
''
),
}
submodule_status_lines = sorted(
self.m.git(*args, **kwargs).stdout.splitlines()
)
submodules = []
if submodule_status_lines:
with self.m.step.nest('parse submodules'):
for line in submodule_status_lines:
submodules.append(
self._parse_submodule_status(root, line)
)
return submodules
def _git(self, remote, branch, use_trigger, root, changes):
"""Checkout code from git.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path): Path to checkout into.
changes (sequence[_Change]): List of triggering changes.
Returns:
_StatusOfChanges with applied and not applied CLs.
"""
with self.m.context(infra_steps=True):
self.m.git.checkout(
remote, path=root, ref=branch, recursive=True, submodules=True
)
with self.m.context(cwd=root):
got_revision = None
got_revision_type = 'no_trigger'
submodules = []
status_of_changes = None
if use_trigger:
got_revision = self.m.git.rev_parse(
'HEAD',
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
'HEAD' * 10,
),
)
# Check for CLs for the top-level repository.
for change in changes:
if self.remotes_equivalent(remote, change.remote):
self._apply_change(change)
got_revision = change.base
got_revision_type = change.base_type
submodules = self.submodules(root, recursive=True)
# Check for CLs for submodules.
for change in changes:
for submodule in submodules:
if self.remotes_equivalent(
submodule.remote, change.remote
):
self._apply_change(change, cwd=submodule.path)
status_of_changes = self._check_unapplied_changes(changes)
# Run git log for both the top-level checkout and every submodule.
with self.m.step.nest('git log'):
self.m.git(str(root), 'log', '--oneline', '-n', '10')
for submodule in sorted(submodules):
with self.m.context(cwd=submodule.path):
self.m.git(
str(submodule.path), 'log', '--oneline', '-n', '10',
)
if got_revision:
with self.m.step.nest('base') as pres:
pres.properties['got_revision'] = got_revision
# got_revision_type isn't needed by anything but helps
# explain why got_revision is the value it is.
pres.properties['got_revision_type'] = got_revision_type
return status_of_changes
def _repo(self, remote, branch, manifest_file, use_trigger, root, changes):
"""Checkout code from an Android Repo Tool manifest.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
manifest_file (str): Name of manifest XML file.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path): Path to checkout into.
changes (sequence[_Change]): List of triggering changes.
Returns:
_StatusOfChanges with applied and not applied CLs.
"""
# Git makes the top-level folder, Repo requires caller to make it.
self.m.file.ensure_directory('mkdir checkout', root)
if manifest_file is None:
manifest_file = self._manifest_file
status_of_changes = None
with self.m.context(cwd=root):
remote = remote.rstrip('/')
# If the triggering change is on a branch name that is also present
# in the manifest branch, use that manifest branch when checking
# out the manifest.
manifest_branch = branch
branch_names = sorted(
set(
x.branch
for x in changes
if x.branch not in ('master', 'main', None)
)
)
if branch_names:
with self.m.step.nest('branch names') as pres:
pres.step_summary_text = str(branch_names)
if changes:
matching_branches = self._matching_branches(
remote, branch_names, name='manifest has branch'
)
if matching_branches:
if len(matching_branches) > 1:
with self.m.step.nest(
'too many matching branches ({})'.format(
', '.join(matching_branches)
)
) as pres:
pres.step_summary_text = (
"Can't figure out which manifest branch to "
'use. Remove some "Requires:" lines to '
'simplify the checkout.'
)
raise self.m.step.StepFailure(
'too many matching branches'
)
else:
manifest_branch = matching_branches.pop()
self.m.step(
'changing manifest branch to {}'.format(
manifest_branch
),
None,
)
with self.m.context(infra_steps=True):
kwargs = {}
if self._repo_init_timeout_sec:
kwargs['timeout'] = self._repo_init_timeout_sec
kwargs['attempts'] = self._number_of_attempts
self.m.repo.init(
manifest_url=remote,
manifest_branch=manifest_branch,
manifest_name=self._manifest_file,
**kwargs
)
manifests_dir = root.join('.repo', 'manifests')
# If the triggering CL is a manifest change, apply it before running
# sync.
if use_trigger:
for change in changes:
if change.remote and self.remotes_equivalent(
remote, change.remote
):
def update_upstream():
# Right now the upstream of 'working' is the local
# 'default' branch. 'repo sync' complains if the
# upstream isn't remote, so it's changed to the
# remote branch that's identical to 'default'.
self.m.git(
'git branch',
'branch',
'--set-upstream-to=origin/{}'.format(
manifest_branch
),
)
self._apply_change(
change,
cwd=manifests_dir,
extra_calls=update_upstream,
)
self._manifest = self._read_manifest(
remote, manifests_dir.join(manifest_file)
)
for _, remote_host in sorted(self._manifest.remotes.items()):
if remote_host.fetch.url.startswith('sso://'):
self.m.sso.configure_insteadof(remote_host.fetch.url)
with self.m.context(infra_steps=True):
kwargs = {}
if self._repo_sync_timeout_sec:
kwargs['timeout'] = self._repo_sync_timeout_sec
kwargs['attempts'] = self._number_of_attempts
self.m.repo.sync(
force_sync=True, current_branch=True, jobs=2, **kwargs
)
self.m.repo.start('base')
if use_trigger:
for change in changes:
for entry in self._manifest.projects:
if self.remotes_equivalent(entry.url, change.remote):
def compare_branch_name():
with self.m.step.nest(
'compare branch name'
) as pres:
pres.step_summary_text = (
'CL branch: {}\nupstream branch: {}'
).format(change.branch, entry.upstream)
self._apply_change(
change,
cwd=entry.path_object(root),
extra_calls=compare_branch_name,
)
status_of_changes = self._check_unapplied_changes(changes)
# Some dependent projects have everything inside one top-level folder
# in their repo workspace. For those projects pretend that top-level
# folder is actually the checkout root. The repo_top member will always
# point to the actual repo workspace root.
with self.m.step.nest('root') as pres:
pres.step_summary_text = 'root={}\nself._root={}\n'.format(
root, self._root
)
if root == self._root:
self._repo_top = self._root
files = set(self.m.file.listdir('ls', root))
dotrepo = self._root.join('.repo')
if dotrepo in files:
files.remove(dotrepo)
if len(files) == 1:
self._root = files.pop()
return status_of_changes
def __call__(
self,
remote=None,
branch=None,
name=None,
use_trigger=True,
root=None,
use_repo=None,
manifest_file=None,
):
"""Checkout code.
Grabs data from buildbucket. If drawing a blank, uses remote. Returns
path to checkout.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
name (str|None): If not None, this is used in the nesting step that
wraps all steps invoked by this method.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path|None): If not None, checkout into this path and not
self.root.
use_repo (bool|None): If True, treat the remote/branch as an Android
Repo Tool manifest. If False, treat as a regular Git repository.
If None, use the property value instead.
manifest_file (str|None): Path to manifest file, defaults to
'default.xml'.
"""
if use_repo is None:
use_repo = self._use_repo
# The caller supplying the branch and not the remote is a weird
# situation that's probably an error. Only grab from properties if
# caller supplied neither.
if remote is None and branch is None:
remote = self._remote
branch = self._branch
branch = branch or 'main'
# Turn "https://foo/bar/baz.git" into "baz".
checkout_name = 'checkout'
if remote and name is None:
name = remote.rstrip('/')
if name.endswith('.git'):
name = name[0:-4]
parts = name.split('/')
if use_repo and parts[-1] == 'manifest':
parts.pop(-1)
name = parts[-1]
checkout_name = 'checkout {}'.format(name)
if remote.endswith('.git'):
remote = remote[0:-4]
with self.m.step.nest(checkout_name) as pres:
changes = []
if use_trigger:
changes = self._change_data(remote, branch)
if not remote:
assert len(changes) == 1
remote = changes[0].remote
if root is None:
root = self._root = self.m.path['start_dir'].join('checkout')
self._changes = changes
if use_repo:
status_of_changes = self._repo(
remote=remote,
branch=branch,
root=root,
use_trigger=use_trigger,
manifest_file=manifest_file,
changes=changes,
)
else:
status_of_changes = self._git(
remote=remote,
branch=branch,
root=root,
use_trigger=use_trigger,
changes=changes,
)
if status_of_changes:
for change in status_of_changes.applied:
pres.links[
'applied {}'.format(change.name)
] = change.gerrit_url
for change in status_of_changes.not_applied:
pres.links[
'failed to apply {}'.format(change.name)
] = change.gerrit_url
snapshot_dir = self.m.path['start_dir'].join('snapshot')
self.snapshot_to_dir(snapshot_dir)
@property
def root(self):
"""Returns the logical top level directory of the checkout.
Returns:
For Git checkouts, returns the top-level directory. For Android Repo
Tool checkouts, returns the top-level directory unless there is
exactly one subdirectory of that top-level directory (except for
.repo). In that case it returns that one subdirectory.
"""
return self._root
@property
def repo_top(self):
"""Always returns the directory containing the .repo folder."""
return self._repo_top
@property
def manifest(self):
return self._manifest
def snapshot_to_dir(self, directory):
self.m.file.ensure_directory('mkdir', directory)
if self.manifest_snapshot:
self.m.file.write_text(
'write manifest.xml',
directory.join('manifest.xml'),
self.manifest_snapshot,
)
if self.submodule_snapshot:
self.m.file.write_text(
'write submodule snapshot',
directory.join('submodules.log'),
self.submodule_snapshot,
)
with self.m.context(cwd=self.root):
log = self.m.git(
'log',
'log',
'--oneline',
'-n',
'10',
stdout=self.m.raw_io.output_text(),
ok_ret='any',
).stdout
self.m.file.write_text(
'write git log', directory.join('git.log'), log,
)
@property
def manifest_snapshot(self):
if not self.use_repo:
return None
if not self._manifest_snapshot:
with self.m.context(cwd=self.root):
self._manifest_snapshot = self.m.repo.manifest_snapshot()
return self._manifest_snapshot
@property
def submodule_snapshot(self):
if self._submodule_snapshot is None:
with self.m.context(cwd=self.root):
# To get step_test_data line to pass pylint.
raw_io_stream_output = self.m.raw_io.test_api.stream_output_text
self._submodule_snapshot = (
self.m.git(
'submodule-status',
'submodule',
'status',
'--recursive',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: raw_io_stream_output(
'submodule status filler text',
),
ok_ret='any',
).stdout.strip()
or ''
)
return self._submodule_snapshot
@property
def remote(self):
return self._remote
@property
def branch(self):
return self._branch
@property
def manifest_file(self):
return self._manifest_file
@property
def use_repo(self):
return self._use_repo
@property
def revision(self):
"""Returns revision of the primary checkout directory."""
assert self._root, 'checkout() not yet called'
if self._revision:
return self._revision
self._revision = self.get_revision(self._root)
return self._revision
@property
def changes(self):
"""Returns the changes that triggered the build.
For CI builds this is a list with at most one item, the merged commit
that triggered the build. For CQ builds this could contain multiple
pending CLs.
"""
assert self._root, 'checkout() not yet called'
return self._changes
def get_revision(self, root, name='git log', test_data='HASH'):
"""Like self.revision, but works for secondary checkouts."""
with self.m.context(cwd=root):
return self.m.git(
name,
'log',
'--max-count=1',
'--pretty=format:%H',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
test_data,
),
).stdout.strip()
# gerrit_host and gerrit_project aren't really properties of checkout, but
# they make some sense here and don't make much sense anywhere else.
_REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')
def gerrit_host(self, remote=None):
match = self._REMOTE_REGEX.match(remote or self.remote)
if not match:
return
gerrit_review_host = '{}'.format(match.group('host'))
if '-review' not in gerrit_review_host:
gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
return gerrit_review_host
def gerrit_project(self, remote=None):
match = self._REMOTE_REGEX.match(remote or self.remote)
if not match:
return
return match.group('project')