blob: 881db5176c9e3496d72215c2ea7457e0c942f2a8 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Copied from
# https://chromium.googlesource.com/chromiumos/infra/recipes/+/HEAD/recipe_modules/repo/
"""API for working with the 'repo' VCS tool.
See: https://chromium.googlesource.com/external/repo/
"""
import collections
import os
import types
from xml.etree import cElementTree as ElementTree
from recipe_engine import recipe_api
MANIFEST_MOCK = """
<manifest>
<project path="SAMPLE" revision="FROM_REV"/>
</manifest>
"""
ManifestDiff = collections.namedtuple(
'ManifestDiff', ['name', 'path', 'from_rev', 'to_rev']
)
ProjectInfo = collections.namedtuple(
'ProjectInfo', ['name', 'path', 'remote', 'branch']
)
_REPO_URL = 'https://pigweed.googlesource.com/third_party/gerrit/git-repo'
_REPO_REV = 'b750b48f50eb4a11087ca6775161d5bf4d5c47d5'
class RepoApi(recipe_api.RecipeApi):
"""A module for interacting with the repo tool."""
ManifestDiff = ManifestDiff # pylint: disable=invalid-name
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._repo = None
@property
def repo(self):
return self.resource('repo')
def _find_root(self):
"""Starting from cwd, find an ancestor with a '.repo' subdir."""
# We need a copy of cwd that we can modify. join() with no arguments
# returns the instance, so we add an element, and then remove it.
candidate = self.m.context.cwd
while candidate.pieces:
if self.m.path.exists(candidate.join('.repo')):
return candidate
candidate = self.m.path.dirname(candidate)
return None
def _step(self, args, name=None, attempts=1, **kwargs):
"""Executes 'repo' with the supplied arguments.
Args:
args (list): A list of arguments to supply to 'repo'.
name (str): The name of the step. If None, generate from the args.
attempts (int): Number of attempts.
**kwargs: See 'step.__call__'.
Returns:
StepData: See 'step.__call__'.
"""
if name is None:
name = 'repo'
# Add first non-flag argument to name.
for arg in args:
if isinstance(arg, str) and arg[:1] != '-':
name += ' ' + arg
break
kwargs.setdefault('infra_step', True)
for i in range(attempts):
try:
return self.m.step(name, [self.repo] + args, **kwargs)
except self.m.step.StepFailure:
if i == attempts - 1:
raise
def _clear_git_locks(self):
"""Removes any git locks found in the entire repo checkout."""
repo_cmd = [
'find',
'.repo/',
'-type',
'f',
'-name',
'*.lock',
'-print',
'-delete',
]
self.m.step('clear repo locks', repo_cmd, infra_step=True)
try:
git_cmd = [
'forall',
'--ignore-missing',
'-j',
'32',
'-c',
'find',
'.git/',
'-type',
'f',
'-name',
'*.lock',
'-print',
'-delete',
]
self._step(git_cmd, 'clear git locks')
except recipe_api.StepFailure: # pragma: nocover
# try again without the --ignore-missing
git_cmd = [
'forall',
'-j',
'32',
'-c',
'find',
'.git/',
'-type',
'f',
'-name',
'*.lock',
'-print',
'-delete',
]
self._step(git_cmd, 'retry clear git locks')
def init( # pylint: disable=invalid-name
self,
manifest_url,
_kwonly=(),
manifest_branch=None,
manifest_name=None,
reference=None,
groups=None,
depth=None,
repo_url=_REPO_URL,
repo_rev=_REPO_REV,
timeout=15 * 60,
**kwargs,
):
"""Executes 'repo init' with the given arguments.
Args:
manifest_url (str): URL of the manifest repository to clone.
_kwonly: Fake argument.
manifest_branch (str): Manifest repository branch to checkout.
manifest_name (str): Manifest file to use.
reference (str): Location of a mirror directory to bootstrap sync.
groups (list): Groups to checkout (see `repo init --groups`).
depth (int): Create a shallow clone of the given depth.
repo_url (str): URL of the repo repository.
repo_rev (str): Repo binary revision to use.
timeout (int): Timeout in seconds.
**kwargs: Passed through to self.m.step().
"""
assert (
_kwonly == ()
), ( # pylint: disable=g-explicit-bool-comparison
'init accepts only 1 positional arg'
)
cmd = ['init', '--manifest-url', manifest_url, '--groups', 'all']
if manifest_branch is not None:
cmd += ['--manifest-branch', manifest_branch]
if reference is not None:
cmd += ['--reference', reference]
if groups is not None:
assert not isinstance(groups, str)
cmd += ['--groups', ','.join(groups)]
if depth is not None:
cmd += ['--depth', '%d' % depth]
if repo_url is not None:
cmd += ['--repo-url', repo_url]
if repo_rev is not None:
cmd += ['--repo-rev', repo_rev, '--no-repo-verify']
if manifest_name:
cmd += ['--manifest-name', manifest_name]
self._step(cmd, timeout=timeout, **kwargs)
self._clear_git_locks()
if self.m.context.cwd:
self.m.path.mock_add_paths(self.m.context.cwd.join('.repo'))
def sync( # pylint: disable=invalid-name
self,
_kwonly=(),
force_sync=False,
detach=False,
current_branch=False,
jobs=None,
manifest_name=None,
no_tags=False,
optimized_fetch=False,
cache_dir=None,
timeout=None,
attempts=1,
verbose=True,
**kwargs,
):
"""Executes 'repo sync' with the given arguments.
Args:
_kwonly: Fake argument.
force_sync (bool): Overwrite existing git directories if needed.
detach (bool): Detach projects back to manifest revision.
current_branch (bool): Fetch only current branch.
jobs (int): Projects to fetch simultaneously.
manifest_name (str): Temporary manifest to use for this sync.
no_tags (bool): Don't fetch tags.
optimized_fetch (bool): Only fetch projects if revision doesn't exist.
cache_dir (Path): Use git-cache with this cache directory.
timeout (int): Timeout in seconds.
attempts (int): Number of attempts.
verbose (bool): Show all output.
**kwargs: Passed through to self.m.step().
"""
assert (
_kwonly == ()
), ( # pylint: disable=g-explicit-bool-comparison
'init accepts only 1 positional arg'
)
cmd = ['sync']
if force_sync:
cmd += ['--force-sync']
if detach:
cmd += ['--detach']
if current_branch:
cmd += ['--current-branch']
if jobs is not None:
cmd += ['--jobs', '%d' % jobs]
if manifest_name is not None:
cmd += ['--manifest-name', manifest_name]
if no_tags:
cmd += ['--no-tags']
if optimized_fetch:
cmd += ['--optimized-fetch']
if cache_dir is not None:
cmd += ['--cache-dir', cache_dir]
if verbose:
cmd += ['--verbose']
# Doing retry logic here instead of using that in _step() because we
# need to clean up the checkout or we sometimes get inconsistent
# results.
for i in range(attempts):
try:
return self._step(cmd, name=None, timeout=timeout, **kwargs)
except self.m.step.StepFailure:
if i == attempts - 1:
raise
self._step(
['forall', '-c', 'git reset --hard HEAD'], ok_ret='any'
)
def sync_manifest(self, manifest_data, **kwargs):
"""Sync to the given manifest file data.
Args:
manifest_data (str): Manifest XML data to use for the sync.
**kwargs: Keyword arguments to pass to 'repo.sync'.
"""
repo_root = self._find_root()
assert repo_root is not None, 'no repo root found'
manifest_path = self.m.path.mkstemp('manifest')
self.m.file.write_raw('write manifest', manifest_path, manifest_data)
repo_manifests_path = repo_root.join('.repo', 'manifests')
manifest_relpath = os.path.relpath(
str(manifest_path), str(repo_manifests_path)
)
self.sync(manifest_name=manifest_relpath, **kwargs)
def start(self, branch, projects=None):
"""Start a new branch in the given projects, or all projects if not set.
Args:
branch (str): The new branch name.
projects (list[str]): The projects for which to start a branch.
"""
cmd = ['start', branch]
if projects is not None:
cmd.extend(projects)
else:
cmd.append('--all')
self._step(cmd)
def project_infos(self, projects=()):
"""Uses 'repo forall' to gather project information.
Args:
projects (List[str]): Project names or paths to return info for. Defaults
to all projects.
Returns:
List[ProjectInfo]: Requested project infos.
"""
def step_test_data():
data = '\n'.join(
f'{p}|src/{p}|cros|refs/heads/main'
for p in projects or ['a', 'b', 'c']
)
return self.m.raw_io.test_api.stream_output_text(data)
cmd = ['forall'] + list(projects)
cmd += [
'-c',
r'echo $REPO_PROJECT\|$REPO_PATH\|$REPO_REMOTE\|$REPO_RREV',
]
step_data = self._step(
cmd,
stdout=self.m.raw_io.output_text(add_output_log=True),
step_test_data=step_test_data,
)
infos = []
for line in step_data.stdout.strip().split('\n'):
name, path, remote, rrev = line.split('|')
branch = None
if rrev.startswith('refs/heads/'):
branch = rrev
infos.append(ProjectInfo(name, path, remote, branch))
return infos
def project_info(self, project):
"""Use 'repo forall' to gather project information for one project.
Args:
project (str|Path): Project name or path to return info for.
Returns:
ProjectInfo: The request project info.
"""
project_infos = self.project_infos(projects=[project])
assert len(set(project_infos)) == 1, 'expected one project'
return project_infos[0]
def manifest_snapshot(self, manifest_file=None):
"""Uses repo to create a manifest snapshot and returns it as a string.
By default uses the internal .repo manifest, but can optionally take
another manifest to use.
Args:
manifest_file (Path): If given, path to alternate manifest file to use.
Returns:
str: The manifest XML as a string.
"""
step_test_data = lambda: ( # pylint: disable=g-long-lambda
self.m.raw_io.test_api.stream_output_text('<manifest></manifest>')
)
cmd = ['manifest', '-r']
if manifest_file:
cmd += ['-m', manifest_file]
step_data = self._step(
cmd,
stdout=self.m.raw_io.output_text(add_output_log=True),
step_test_data=step_test_data,
)
return step_data.stdout.strip()
def diff_manifests(self, from_manifest_str, to_manifest_str):
"""Diffs the two manifests and returns an array of differences.
Given the two manifest XML strings, generates an array of `ManifestDiff`.
This only returns **CHANGED** projects, it skips over projects that were
added or deleted.
Args:
from_manifest_str (str): The from manifest XML string
to_manifest_str (str): The to manifest XML string.
Returns:
List[ManifestDiff]: An array of `ManifestDiff` namedtuple for any existing
changed project (excludes added/removed projects).
"""
def project_paths(xml_data):
xml = ElementTree.fromstring(xml_data)
attrs = [proj.attrib for proj in xml.iterfind('project')]
# Make sure `path` is set, use `name` if `path` is missing
for attr in attrs:
attr['path'] = attr.get('path', attr.get('name'))
# Key them by path
return {attr['path']: attr for attr in attrs}
from_paths = project_paths(from_manifest_str)
to_paths = project_paths(to_manifest_str)
changes = []
for from_path, from_attrs in from_paths.items():
if from_path not in to_paths:
# Project was deleted, we don't care. Move on, nothing to see here!
continue
from_name = from_attrs['name']
from_revision = from_attrs['revision']
to_revision = to_paths[from_path]['revision']
if from_revision == to_revision:
# The revision didn't change (aka no CLs landed between the last
# snapshot and this one for that path.
continue
changes.append(
ManifestDiff(from_name, from_path, from_revision, to_revision)
)
return changes
def diff_manifests_informational(
self, old_manifest_path, new_manifest_path
):
"""Informational step that logs a "manifest diff".
Args:
old_manifest_path (Path): Path to old manifest file.
new_manifest_path (Path): Path to new manifest file.
"""
name = 'manifest diff'
# Manifest paths must be relative to the current repo .repo/manifests dir.
repo_root = self._find_root()
if repo_root is None:
step = self.m.step(name, [])
step.presentation.step_text = (
'manifest diff failed; no repo root found'
)
return
manifests_dir = self.m.path.abspath(
repo_root.join('.repo', 'manifests')
)
cmd = [
'diffmanifests',
os.path.relpath(str(old_manifest_path), manifests_dir),
os.path.relpath(str(new_manifest_path), manifests_dir),
]
self._step(cmd, name=name)
def _git_clean_checkout(self, root_path):
"""Ensure the given repo does not contain untracked files or directories.
We're assuming that the root path provided has already been validated.
Args:
root_path (Path): Path to the repo root.
"""
with self.m.step.nest('ensure clean checkout'):
with self.m.context(cwd=root_path, infra_steps=True):
cmd = [
'forall',
'--ignore-missing',
'-j',
'32',
'-c',
'git',
'clean',
'-d',
'-f',
]
self._step(
cmd, stdout=self.m.raw_io.output_text(add_output_log=True)
)
def ensure_synced_checkout(
self, root_path, manifest_url, init_opts=None, sync_opts=None
):
"""Ensure the given repo checkout exists and is synced.
Args:
root_path (Path): Path to the repo root.
manifest_url (str): Manifest URL for 'repo.init`.
init_opts (dict): Extra keyword arguments to pass to 'repo.init'.
sync_opts (dict): Extra keyword arguments to pass to 'repo.sync'.
"""
with self.m.step.nest('ensure synced checkout'):
self.m.file.ensure_directory('ensure root path', root_path)
with self.m.context(cwd=root_path, infra_steps=True):
for retries in range(2):
try:
# Remove .repo/manifests and .repo/manifests.git to avoid potential
# problems when switching to a different manifest repo or branch.
for manifest_dir in ('manifests', 'manifests.git'):
self.m.file.rmtree(
f'remove .repo/{manifest_dir}',
root_path.join('.repo', manifest_dir),
)
self.init(manifest_url, **(init_opts or {}))
self._git_clean_checkout(root_path)
self._binary_selfupdate(root_path)
self.sync(**(sync_opts or {}))
break
except recipe_api.StepFailure:
if retries < 1:
self.m.file.rmcontents(
'clean up root path and retry', root_path
)
else:
raise
# Sanity check since `repo init` will happily reuse a repository in the
# cwd's ancestor directories.
assert self.m.path.exists(
root_path.join('.repo')
), '.repo not created!'
def _binary_selfupdate(self, root_path):
"""Issues a repo selfupdate to update the binary.
Args:
root_path (Path): Path to the repo root.
"""
with self.m.step.nest('repo binary update'):
with self.m.context(cwd=root_path, infra_steps=True):
cmd = ['selfupdate']
self._step(cmd, ok_ret='any')