| # -*- coding: utf-8 -*- |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # Copied from |
| # https://chromium.googlesource.com/chromiumos/infra/recipes/+/HEAD/recipe_modules/repo/ |
| """API for working with the 'repo' VCS tool. |
| |
| See: https://chromium.googlesource.com/external/repo/ |
| """ |
| |
| import collections |
| import os |
| import types |
| from xml.etree import cElementTree as ElementTree |
| |
| from recipe_engine import recipe_api |
| |
| MANIFEST_MOCK = """ |
| <manifest> |
| <project path="SAMPLE" revision="FROM_REV"/> |
| </manifest> |
| """ |
| |
| ManifestDiff = collections.namedtuple( |
| 'ManifestDiff', ['name', 'path', 'from_rev', 'to_rev'] |
| ) |
| |
| ProjectInfo = collections.namedtuple( |
| 'ProjectInfo', ['name', 'path', 'remote', 'branch'] |
| ) |
| |
| _REPO_URL = 'https://pigweed.googlesource.com/third_party/gerrit/git-repo' |
| _REPO_REV = 'b750b48f50eb4a11087ca6775161d5bf4d5c47d5' |
| |
| |
| class RepoApi(recipe_api.RecipeApi): |
| """A module for interacting with the repo tool.""" |
| |
| ManifestDiff = ManifestDiff # pylint: disable=invalid-name |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self._repo = None |
| |
| @property |
| def repo(self): |
| return self.resource('repo') |
| |
| def _find_root(self): |
| """Starting from cwd, find an ancestor with a '.repo' subdir.""" |
| # We need a copy of cwd that we can modify. join() with no arguments |
| # returns the instance, so we add an element, and then remove it. |
| candidate = self.m.context.cwd |
| while candidate.pieces: |
| if self.m.path.exists(candidate.join('.repo')): |
| return candidate |
| candidate = self.m.path.dirname(candidate) |
| return None |
| |
| def _step(self, args, name=None, attempts=1, **kwargs): |
| """Executes 'repo' with the supplied arguments. |
| |
| Args: |
| args (list): A list of arguments to supply to 'repo'. |
| name (str): The name of the step. If None, generate from the args. |
| attempts (int): Number of attempts. |
| **kwargs: See 'step.__call__'. |
| |
| Returns: |
| StepData: See 'step.__call__'. |
| """ |
| if name is None: |
| name = 'repo' |
| # Add first non-flag argument to name. |
| for arg in args: |
| if isinstance(arg, str) and arg[:1] != '-': |
| name += ' ' + arg |
| break |
| kwargs.setdefault('infra_step', True) |
| for i in range(attempts): |
| try: |
| return self.m.step(name, [self.repo] + args, **kwargs) |
| except self.m.step.StepFailure: |
| if i == attempts - 1: |
| raise |
| |
| def _clear_git_locks(self): |
| """Removes any git locks found in the entire repo checkout.""" |
| |
| repo_cmd = [ |
| 'find', |
| '.repo/', |
| '-type', |
| 'f', |
| '-name', |
| '*.lock', |
| '-print', |
| '-delete', |
| ] |
| self.m.step('clear repo locks', repo_cmd, infra_step=True) |
| |
| try: |
| git_cmd = [ |
| 'forall', |
| '--ignore-missing', |
| '-j', |
| '32', |
| '-c', |
| 'find', |
| '.git/', |
| '-type', |
| 'f', |
| '-name', |
| '*.lock', |
| '-print', |
| '-delete', |
| ] |
| self._step(git_cmd, 'clear git locks') |
| except recipe_api.StepFailure: # pragma: nocover |
| # try again without the --ignore-missing |
| git_cmd = [ |
| 'forall', |
| '-j', |
| '32', |
| '-c', |
| 'find', |
| '.git/', |
| '-type', |
| 'f', |
| '-name', |
| '*.lock', |
| '-print', |
| '-delete', |
| ] |
| self._step(git_cmd, 'retry clear git locks') |
| |
| def init( # pylint: disable=invalid-name |
| self, |
| manifest_url, |
| _kwonly=(), |
| manifest_branch=None, |
| manifest_name=None, |
| reference=None, |
| groups=None, |
| depth=None, |
| repo_url=_REPO_URL, |
| repo_rev=_REPO_REV, |
| timeout=15 * 60, |
| **kwargs, |
| ): |
| """Executes 'repo init' with the given arguments. |
| |
| Args: |
| manifest_url (str): URL of the manifest repository to clone. |
| _kwonly: Fake argument. |
| manifest_branch (str): Manifest repository branch to checkout. |
| manifest_name (str): Manifest file to use. |
| reference (str): Location of a mirror directory to bootstrap sync. |
| groups (list): Groups to checkout (see `repo init --groups`). |
| depth (int): Create a shallow clone of the given depth. |
| repo_url (str): URL of the repo repository. |
| repo_rev (str): Repo binary revision to use. |
| timeout (int): Timeout in seconds. |
| **kwargs: Passed through to self.m.step(). |
| """ |
| assert ( |
| _kwonly == () |
| ), ( # pylint: disable=g-explicit-bool-comparison |
| 'init accepts only 1 positional arg' |
| ) |
| cmd = ['init', '--manifest-url', manifest_url, '--groups', 'all'] |
| if manifest_branch is not None: |
| cmd += ['--manifest-branch', manifest_branch] |
| if reference is not None: |
| cmd += ['--reference', reference] |
| if groups is not None: |
| assert not isinstance(groups, str) |
| cmd += ['--groups', ','.join(groups)] |
| if depth is not None: |
| cmd += ['--depth', '%d' % depth] |
| if repo_url is not None: |
| cmd += ['--repo-url', repo_url] |
| if repo_rev is not None: |
| cmd += ['--repo-rev', repo_rev, '--no-repo-verify'] |
| if manifest_name: |
| cmd += ['--manifest-name', manifest_name] |
| self._step(cmd, timeout=timeout, **kwargs) |
| self._clear_git_locks() |
| |
| if self.m.context.cwd: |
| self.m.path.mock_add_paths(self.m.context.cwd.join('.repo')) |
| |
| def sync( # pylint: disable=invalid-name |
| self, |
| _kwonly=(), |
| force_sync=False, |
| detach=False, |
| current_branch=False, |
| jobs=None, |
| manifest_name=None, |
| no_tags=False, |
| optimized_fetch=False, |
| cache_dir=None, |
| timeout=None, |
| attempts=1, |
| **kwargs, |
| ): |
| """Executes 'repo sync' with the given arguments. |
| |
| Args: |
| _kwonly: Fake argument. |
| force_sync (bool): Overwrite existing git directories if needed. |
| detach (bool): Detach projects back to manifest revision. |
| current_branch (bool): Fetch only current branch. |
| jobs (int): Projects to fetch simultaneously. |
| manifest_name (str): Temporary manifest to use for this sync. |
| no_tags (bool): Don't fetch tags. |
| optimized_fetch (bool): Only fetch projects if revision doesn't exist. |
| cache_dir (Path): Use git-cache with this cache directory. |
| timeout (int): Timeout in seconds. |
| attempts (int): Number of attempts. |
| **kwargs: Passed through to self.m.step(). |
| """ |
| assert ( |
| _kwonly == () |
| ), ( # pylint: disable=g-explicit-bool-comparison |
| 'init accepts only 1 positional arg' |
| ) |
| cmd = ['sync'] |
| if force_sync: |
| cmd += ['--force-sync'] |
| if detach: |
| cmd += ['--detach'] |
| if current_branch: |
| cmd += ['--current-branch'] |
| if jobs is not None: |
| cmd += ['--jobs', '%d' % jobs] |
| if manifest_name is not None: |
| cmd += ['--manifest-name', manifest_name] |
| if no_tags: |
| cmd += ['--no-tags'] |
| if optimized_fetch: |
| cmd += ['--optimized-fetch'] |
| if cache_dir is not None: |
| cmd += ['--cache-dir', cache_dir] |
| |
| # Doing retry logic here instead of using that in _step() because we |
| # need to clean up the checkout or we sometimes get inconsistent |
| # results. |
| for i in range(attempts): |
| try: |
| return self._step(cmd, name=None, timeout=timeout, **kwargs) |
| except self.m.step.StepFailure: |
| if i == attempts - 1: |
| raise |
| self._step( |
| ['forall', '-c', 'git reset --hard HEAD'], ok_ret='any' |
| ) |
| |
| def sync_manifest(self, manifest_data, **kwargs): |
| """Sync to the given manifest file data. |
| |
| Args: |
| manifest_data (str): Manifest XML data to use for the sync. |
| **kwargs: Keyword arguments to pass to 'repo.sync'. |
| """ |
| repo_root = self._find_root() |
| assert repo_root is not None, 'no repo root found' |
| |
| manifest_path = self.m.path.mkstemp('manifest') |
| self.m.file.write_raw('write manifest', manifest_path, manifest_data) |
| |
| repo_manifests_path = repo_root.join('.repo', 'manifests') |
| manifest_relpath = os.path.relpath( |
| str(manifest_path), str(repo_manifests_path) |
| ) |
| self.sync(manifest_name=manifest_relpath, **kwargs) |
| |
| def start(self, branch, projects=None): |
| """Start a new branch in the given projects, or all projects if not set. |
| |
| Args: |
| branch (str): The new branch name. |
| projects (list[str]): The projects for which to start a branch. |
| """ |
| cmd = ['start', branch] |
| if projects is not None: |
| cmd.extend(projects) |
| else: |
| cmd.append('--all') |
| self._step(cmd) |
| |
| def project_infos(self, projects=()): |
| """Uses 'repo forall' to gather project information. |
| |
| Args: |
| projects (List[str]): Project names or paths to return info for. Defaults |
| to all projects. |
| |
| Returns: |
| List[ProjectInfo]: Requested project infos. |
| """ |
| |
| def step_test_data(): |
| data = '\n'.join( |
| '%s|src/%s|cros|refs/heads/main' % (p, p) |
| for p in projects or ['a', 'b', 'c'] |
| ) |
| return self.m.raw_io.test_api.stream_output_text(data) |
| |
| cmd = ['forall'] + list(projects) |
| cmd += [ |
| '-c', |
| r'echo $REPO_PROJECT\|$REPO_PATH\|$REPO_REMOTE\|$REPO_RREV', |
| ] |
| step_data = self._step( |
| cmd, |
| stdout=self.m.raw_io.output_text(add_output_log=True), |
| step_test_data=step_test_data, |
| ) |
| |
| infos = [] |
| for line in step_data.stdout.strip().split('\n'): |
| name, path, remote, rrev = line.split('|') |
| branch = None |
| if rrev.startswith('refs/heads/'): |
| branch = rrev |
| infos.append(ProjectInfo(name, path, remote, branch)) |
| return infos |
| |
| def project_info(self, project): |
| """Use 'repo forall' to gather project information for one project. |
| |
| Args: |
| project (str|Path): Project name or path to return info for. |
| |
| Returns: |
| ProjectInfo: The request project info. |
| """ |
| project_infos = self.project_infos(projects=[project]) |
| assert len(set(project_infos)) == 1, 'expected one project' |
| return project_infos[0] |
| |
| def manifest_snapshot(self, manifest_file=None): |
| """Uses repo to create a manifest snapshot and returns it as a string. |
| |
| By default uses the internal .repo manifest, but can optionally take |
| another manifest to use. |
| |
| Args: |
| manifest_file (Path): If given, path to alternate manifest file to use. |
| |
| Returns: |
| str: The manifest XML as a string. |
| """ |
| step_test_data = lambda: ( # pylint: disable=g-long-lambda |
| self.m.raw_io.test_api.stream_output_text('<manifest></manifest>') |
| ) |
| |
| cmd = ['manifest', '-r'] |
| if manifest_file: |
| cmd += ['-m', manifest_file] |
| |
| step_data = self._step( |
| cmd, |
| stdout=self.m.raw_io.output_text(add_output_log=True), |
| step_test_data=step_test_data, |
| ) |
| return step_data.stdout.strip() |
| |
| def diff_manifests(self, from_manifest_str, to_manifest_str): |
| """Diffs the two manifests and returns an array of differences. |
| |
| Given the two manifest XML strings, generates an array of `ManifestDiff`. |
| This only returns **CHANGED** projects, it skips over projects that were |
| added or deleted. |
| |
| Args: |
| from_manifest_str (str): The from manifest XML string |
| to_manifest_str (str): The to manifest XML string. |
| |
| Returns: |
| List[ManifestDiff]: An array of `ManifestDiff` namedtuple for any existing |
| changed project (excludes added/removed projects). |
| """ |
| |
| def project_paths(xml_data): |
| xml = ElementTree.fromstring(xml_data) |
| attrs = [proj.attrib for proj in xml.iterfind('project')] |
| # Make sure `path` is set, use `name` if `path` is missing |
| for attr in attrs: |
| attr['path'] = attr.get('path', attr.get('name')) |
| # Key them by path |
| return {attr['path']: attr for attr in attrs} |
| |
| from_paths = project_paths(from_manifest_str) |
| to_paths = project_paths(to_manifest_str) |
| changes = [] |
| for from_path, from_attrs in from_paths.items(): |
| if from_path not in to_paths: |
| # Project was deleted, we don't care. Move on, nothing to see here! |
| continue |
| from_name = from_attrs['name'] |
| from_revision = from_attrs['revision'] |
| to_revision = to_paths[from_path]['revision'] |
| if from_revision == to_revision: |
| # The revision didn't change (aka no CLs landed between the last |
| # snapshot and this one for that path. |
| continue |
| changes.append( |
| ManifestDiff(from_name, from_path, from_revision, to_revision) |
| ) |
| return changes |
| |
| def diff_manifests_informational( |
| self, old_manifest_path, new_manifest_path |
| ): |
| """Informational step that logs a "manifest diff". |
| |
| Args: |
| old_manifest_path (Path): Path to old manifest file. |
| new_manifest_path (Path): Path to new manifest file. |
| """ |
| name = 'manifest diff' |
| # Manifest paths must be relative to the current repo .repo/manifests dir. |
| repo_root = self._find_root() |
| if repo_root is None: |
| step = self.m.step(name, []) |
| step.presentation.step_text = ( |
| 'manifest diff failed; no repo root found' |
| ) |
| return |
| manifests_dir = self.m.path.abspath( |
| repo_root.join('.repo', 'manifests') |
| ) |
| |
| cmd = [ |
| 'diffmanifests', |
| os.path.relpath(str(old_manifest_path), manifests_dir), |
| os.path.relpath(str(new_manifest_path), manifests_dir), |
| ] |
| self._step(cmd, name=name) |
| |
| def _git_clean_checkout(self, root_path): |
| """Ensure the given repo does not contain untracked files or directories. |
| |
| We're assuming that the root path provided has already been validated. |
| |
| Args: |
| root_path (Path): Path to the repo root. |
| """ |
| with self.m.step.nest('ensure clean checkout'): |
| with self.m.context(cwd=root_path, infra_steps=True): |
| cmd = [ |
| 'forall', |
| '--ignore-missing', |
| '-j', |
| '32', |
| '-c', |
| 'git', |
| 'clean', |
| '-d', |
| '-f', |
| ] |
| self._step( |
| cmd, stdout=self.m.raw_io.output_text(add_output_log=True) |
| ) |
| |
| def ensure_synced_checkout( |
| self, root_path, manifest_url, init_opts=None, sync_opts=None |
| ): |
| """Ensure the given repo checkout exists and is synced. |
| |
| Args: |
| root_path (Path): Path to the repo root. |
| manifest_url (str): Manifest URL for 'repo.init`. |
| init_opts (dict): Extra keyword arguments to pass to 'repo.init'. |
| sync_opts (dict): Extra keyword arguments to pass to 'repo.sync'. |
| """ |
| with self.m.step.nest('ensure synced checkout'): |
| self.m.file.ensure_directory('ensure root path', root_path) |
| with self.m.context(cwd=root_path, infra_steps=True): |
| for retries in range(2): |
| try: |
| # Remove .repo/manifests and .repo/manifests.git to avoid potential |
| # problems when switching to a different manifest repo or branch. |
| for manifest_dir in ('manifests', 'manifests.git'): |
| self.m.file.rmtree( |
| 'remove .repo/%s' % manifest_dir, |
| root_path.join('.repo', manifest_dir), |
| ) |
| |
| self.init(manifest_url, **(init_opts or {})) |
| self._git_clean_checkout(root_path) |
| self._binary_selfupdate(root_path) |
| self.sync(**(sync_opts or {})) |
| break |
| except recipe_api.StepFailure: |
| if retries < 1: |
| self.m.file.rmcontents( |
| 'clean up root path and retry', root_path |
| ) |
| else: |
| raise |
| |
| # Sanity check since `repo init` will happily reuse a repository in the |
| # cwd's ancestor directories. |
| assert self.m.path.exists( |
| root_path.join('.repo') |
| ), '.repo not created!' |
| |
| def _binary_selfupdate(self, root_path): |
| """Issues a repo selfupdate to update the binary. |
| |
| Args: |
| root_path (Path): Path to the repo root. |
| """ |
| with self.m.step.nest('repo binary update'): |
| with self.m.context(cwd=root_path, infra_steps=True): |
| cmd = ['selfupdate'] |
| self._step(cmd, ok_ret='any') |