blob: a956b6d510bff05bcd1d22c59893ed00dba90b79 [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.
Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""
import re
import xml.etree.ElementTree
import attr
from recipe_engine import recipe_api
import urlparse
@attr.s
class _Manifest(object):
remotes = attr.ib(default=attr.Factory(dict))
projects = attr.ib(default=attr.Factory(list))
def dict(self):
return {
'remotes': {k: v.dict() for k, v in self.remotes.iteritems()},
'projects': [x.dict() for x in self.projects],
}
class _Url(object):
def __init__(self, url, *args, **kwargs):
super(_Url, self).__init__(*args, **kwargs)
self.url = url
self.https = None
def dict(self):
return self.__dict__.copy()
@attr.s
class _Remote(object):
"""Remote config from manifest."""
name = attr.ib(type=str)
fetch = attr.ib(type=_Url)
review = attr.ib(type=str, default=None)
revision = attr.ib(type=str, default=None)
def dict(self):
res = self.__dict__.copy()
res['fetch'] = res['fetch'].dict()
return res
@attr.s
class _Project(object):
"""Key variables describing a repository/project."""
name = attr.ib(type=str)
path = attr.ib(type=str)
remote = attr.ib(type=str)
revision = attr.ib(type=str)
url = attr.ib(type=str, default=None)
def dict(self):
return self.__dict__.copy()
@attr.s
class _BbData(object):
"""Data from buildbucket."""
input = attr.ib()
remote = attr.ib(type=str)
ref = attr.ib(type=str)
rebase = attr.ib(type=bool)
class _Submodule(object):
"""Submodule properties."""
def __init__(self, api, hash, path, describe, remote, *args, **kwargs):
super(_Submodule, self).__init__(*args, **kwargs)
self.api = api
self.hash = hash
self.path = path
self.describe = describe
self.remote = remote
class CheckoutApi(recipe_api.RecipeApi):
"""Calls to checkout code."""
def __init__(self, props, *args, **kwargs):
super(CheckoutApi, self).__init__(*args, **kwargs)
self._remote = props.remote
self._branch = props.branch
self._use_repo = props.use_repo
self._manifest_file = props.manifest_file
self._root = None
self._revision = None
self._repo_top = None
self._manifest = None
def _read_manifest(self, manifest_file):
"""Reads manifest file to get git repo locations."""
with self.m.step.nest('read manifest') as read_step:
manifest_text = self.m.file.read_text('read file', manifest_file)
read_step.logs['raw'] = manifest_text
xml_tree = xml.etree.ElementTree.fromstring(manifest_text)
manifest = _Manifest()
for remote in xml_tree.iter('remote'):
remote = _Remote(**remote.attrib)
remote.fetch = _Url(remote.fetch)
remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
manifest.remotes[remote.name] = remote
defaults = {}
for default in xml_tree.iter('default'):
defaults.update(default.attrib)
for project in xml_tree.iter('project'):
name = project.attrib['name']
path = project.attrib.get('path', name)
if 'remote' in project.attrib:
remote = project.attrib['remote']
elif 'remote' in defaults:
remote = defaults['remote']
else: # pragma: no cover
assert False, 'remote not specified for {}'.format(name)
assert remote in manifest.remotes, 'Remote {} does not exist'.format(
remote)
if 'revision' in project.attrib:
revision = project.attrib['revision']
elif manifest.remotes[remote].revision:
revision = manifest.remotes[remote].revision
elif 'revision' in defaults:
revision = defaults['revision']
else: # pragma: no cover
assert False, 'revision not specified for {}'.format(name)
url = urlparse.urljoin(manifest.remotes[remote].fetch.https, name)
manifest.projects.append(
_Project(
name=name, path=path, remote=remote, revision=revision,
url=url))
self.m.file.write_json('manifest json',
self.m.path['start_dir'].join('manifest.json'),
manifest.dict())
return manifest
def _bb_data(self):
input_ = self.m.buildbucket.build.input
remote = ref = None
rebase = False
if input_.gerrit_changes:
change = input_.gerrit_changes[0]
assert change.host
ref = 'refs/changes/{:02}/{}/{}'.format(change.change % 100,
change.change, change.patchset)
host = change.host.replace('-review.googlesource.com',
'.googlesource.com')
remote = 'https://{}/{}'.format(host, change.project).strip('/')
return _BbData(input=input_, remote=remote, ref=ref, rebase=True)
if input_.gitiles_commit:
commit = input_.gitiles_commit
if not commit.host:
return None
ref = commit.id
remote = 'https://{}/{}'.format(commit.host, commit.project)
return _BbData(input=input_, remote=remote, ref=ref, rebase=False)
# The buildbucket module always sets .gerrit_changes or .gitiles_commit,
# but in case it doesn't in the future return None. (This could be left off
# but then pylint would complain too.)
return None # pragma: no cover
def _parse_submodule_status(self, root, line):
"""Parse a `git submodule status` and get the remote URL."""
match = re.search(
r'^(?P<hash>[0-9a-fA-F]{40})\s+'
r'(?P<path>[^()]*)\s+'
r'\((?P<describe>[^()]*)\)$', line.strip())
if not match:
raise self.m.step.InfraFailure(
'unrecognized submodule status line "{}"'.format(line))
with self.m.step.nest(match.group('path')) as pres:
pres.step_summary_text = 'hash={}\ndescribe={}'.format(
match.group('hash'), match.group('describe'))
path = root.join(*match.group('path').split('/'))
remote = self.m.git(
'config',
'--get',
'remote.origin.url',
name='git origin {}'.format(path),
stdout=self.m.raw_io.output(),
).stdout
remote_https = self.m.sso.sso_to_https(remote)
if remote_https.endswith('.git'):
remote_https = remote_https[0:-4]
return _Submodule(
api=self.m,
hash=match.group('hash'),
path=path,
describe=match.group('describe'),
remote=remote_https)
def _git(self, remote, branch, use_trigger, root):
"""Checkout code from git.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path): Path to checkout into.
"""
if remote.endswith('.git'):
remote = remote[0:-4]
with self.m.context(infra_steps=True):
self.m.git.checkout(
remote, path=root, ref=branch, recursive=True, submodules=True)
bb = self._bb_data()
submodules = []
with self.m.context(cwd=root):
if use_trigger and bb:
with self.m.step.nest('bb_data') as pres:
pres.step_summary_text = str(vars(bb))
# If the trigger is for the top-level repository.
if remote == bb.remote:
with self.m.context(infra_steps=True):
self.m.git('fetch', bb.remote, bb.ref)
self.m.git(
'checkout',
'--recurse-submodules',
'-b',
'working',
'FETCH_HEAD',
name='git checkout patch')
if bb.rebase:
self.m.git('rebase', 'origin/{}'.format(branch))
# Otherwise it must be for a submodule.
else:
# Recursively look at all submodule paths (submodules can have
# submodules) and find (among other things) the remote URL.
submodule_status_lines = self.m.git(
'submodule',
'status',
'--recursive',
name='git submodule status',
stdout=self.m.raw_io.output(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output(''),
).stdout.splitlines()
if submodule_status_lines:
with self.m.step.nest('parse_submodules'):
for line in submodule_status_lines:
submodules.append(self._parse_submodule_status(root, line))
match = None
for submodule in submodules:
if submodule.remote == bb.remote:
match = submodule
break
if not match:
raise self.m.step.InfraFailure(
'could not find triggering repo "{}" in checkout'.format(
bb.remote))
# Checkout based on the trigger.
with self.m.context(cwd=match.path):
with self.m.context(infra_steps=True):
self.m.git('fetch', bb.remote, bb.ref)
self.m.git(
'checkout',
'--recurse-submodules',
'-b',
'working',
'FETCH_HEAD',
name='git checkout patch')
if bb.rebase:
self.m.git('rebase', 'origin/{}'.format(branch))
# Run git log for both the top-level checkout and every submodule.
with self.m.step.nest('git log'):
self.m.git('log', '--oneline', '-n', '10', name=str(root))
for submodule in submodules:
with self.m.context(cwd=submodule.path):
self.m.git('log', '--oneline', '-n', '10', name=str(submodule.path))
def _repo(self, remote, branch, manifest_file, use_trigger, root):
"""Checkout code from an Android Repo Tool manifest.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
manifest_file (str): Name of manifest XML file.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path): Path to checkout into.
"""
# Git makes the top-level folder, Repo requires caller to make it.
self.m.file.ensure_directory('mkdir checkout', root)
if manifest_file is None:
manifest_file = self._manifest_file or 'default.xml'
with self.m.context(cwd=root):
remote = remote.rstrip('/')
if use_trigger:
bb = self._bb_data()
with self.m.context(infra_steps=True):
self.m.repo.init(
manifest_url=remote,
manifest_branch=branch,
)
# If the triggering CL is a manifest change, apply it before running
# sync.
if use_trigger and bb:
manifests_dir = root.join('.repo', 'manifests')
if bb.remote and remote == bb.remote:
with self.m.context(cwd=manifests_dir):
with self.m.context(infra_steps=True):
self.m.git('fetch', remote, bb.ref)
self.m.git('checkout', '-b', 'working', 'FETCH_HEAD')
# Right now the upstream of 'working' is the local 'default'
# branch. 'repo sync' complains if the upstream isn't remote, so
# it's changed to the remote branch that's identical to 'default'.
self.m.git('branch', '--set-upstream-to=origin/{}'.format(branch))
# Failure during rebase is a conflict and not an infra failure.
if bb.rebase:
# The repo tool checks out the top of the manifest repo and
# creates a local 'default' branch. Rebase this CL on top of that
# branch.
self.m.git('rebase', 'default')
# Show recent history so it's clear in MILO what code was tested.
self.m.git('log', '--oneline', 'HEAD~10..', ok_ret=(0, 128))
self._manifest = self._read_manifest(manifests_dir.join(manifest_file))
# TODO(pwbug/193) find a way to do this non-globally.
for _, remote_host in sorted(self._manifest.remotes.iteritems()):
if remote_host.fetch.url.startswith('sso://'):
self.m.sso.configure_insteadof(remote_host.fetch.url)
with self.m.context(infra_steps=True):
self.m.repo.sync(
force_sync=True,
current_branch=True,
jobs=20,
cache_dir=self.m.path['cache'].join('repo-tool'),
)
self.m.repo.start('base')
if use_trigger and bb:
if bb.remote and remote != bb.remote:
for entry in self._manifest.projects:
if entry.url == bb.remote:
with self.m.context(cwd=root.join(*entry.path.split('/'))):
with self.m.context(infra_steps=True):
self.m.git('fetch', bb.remote, bb.ref)
self.m.git('checkout', '-b', 'working', 'FETCH_HEAD')
# Failure during rebase is a conflict and not an infra failure.
if bb.rebase:
self.m.git('rebase', 'base')
# Show recent history so it's clear in MILO what code was
# tested.
self.m.git('log', '--oneline', 'HEAD~10..', ok_ret=(0, 128))
break
else:
raise self.m.step.StepFailure('failed to find {} change {}'.format(
bb.remote, bb.ref))
self.m.repo.manifest_snapshot()
# Some dependent projects have everything inside one top-level folder
# in their repo workspace. For those projects pretend that top-level
# folder is actually the checkout root. The repo_top member will always
# point to the actual repo workspace root.
if root == self._root:
self._repo_top = self._root
files = set(self.m.file.listdir('ls', root))
dotrepo = self._root.join('.repo')
if dotrepo in files:
files.remove(dotrepo)
if len(files) == 1:
self._root = files.pop()
def __call__(self,
remote=None,
branch=None,
name=None,
use_trigger=True,
root=None,
use_repo=None,
manifest_file=None):
"""Checkout code.
Grabs data from buildbucket. If drawing a blank, uses remote. Returns
path to checkout.
Args:
remote (str): URL of git repository.
branch (str): Remote branch to retrieve.
name (str|None): If not None, this is used in the nesting step that wraps
all steps invoked by this method.
use_trigger (bool): Attempt to apply the triggering change to the
checkout.
root (Path|None): If not None, checkout into this path and not self.root.
use_repo (bool|None): If True, treat the remote/branch as an Android Repo
Tool manifest. If False, treat as a regular Git repository. If None,
use the property value instead.
manifest_file (str|None): Path to manifest file, defaults to
'default.xml'.
"""
if use_repo is None:
use_repo = self._use_repo
# The caller supplying the branch and not the remote is a weird situation
# that's probably an error. Only grab from properties if caller supplied
# neither.
if remote is None and branch is None:
remote = self._remote
branch = self._branch
assert remote
branch = branch or 'master'
# Turn "https://foo/bar/baz.git" into "baz".
if name is None:
name = remote.rstrip('/').split('/')[-1]
if name.endswith('.git'):
name = name[0:-4]
if root is None:
root = self._root = self.m.path['start_dir'].join('checkout')
with self.m.step.nest('checkout {}'.format(name)):
if use_repo:
self._repo(
remote=remote,
branch=branch,
root=root,
use_trigger=use_trigger,
manifest_file=manifest_file)
else:
self._git(
remote=remote, branch=branch, root=root, use_trigger=use_trigger)
@property
def root(self):
"""Returns the logical top level directory of the checkout.
Returns:
For Git checkouts, returns the top-level directory. For Android Repo Tool
checkouts, returns the top-level directory unless there is exactly one
subdirectory of that top-level directory (except for .repo). In that case
it returns that one subdirectory.
"""
return self._root
@property
def repo_top(self):
"""Always returns the directory containing the .repo folder."""
return self._repo_top
@property
def manifest(self):
return self._manifest
@property
def remote(self):
return self._remote
@property
def branch(self):
return self._branch
@property
def manifest_file(self):
return self._manifest_file
@property
def revision(self):
"""Returns revision of the primary checkout directory."""
assert self._root, 'checkout() not yet called'
if self._revision:
return self._revision
self._revision = self.get_revision(self._root)
return self._revision
def get_revision(self, root):
"""Like self.revision, but works for secondary checkouts."""
with self.m.context(cwd=root):
return self.m.git(
'log',
'--max-count=1',
'--pretty=format:%H',
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.test_api.m.raw_io.stream_output('HASH'),
).stdout.strip()
# gerrit_host and gerrit_project aren't really properties of checkout, but
# they make some sense here and don't make much sense anywhere else.
_REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')
def gerrit_host(self, remote=None):
match = self._REMOTE_REGEX.match(remote or self.remote)
if not match:
return
gerrit_review_host = 'https://{}'.format(match.group('host'))
if '-review' not in gerrit_review_host:
gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
return gerrit_review_host
def gerrit_project(self, remote=None):
match = self._REMOTE_REGEX.match(remote or self.remote)
if not match:
return
return match.group('project')