# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.

Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""

import collections
import re
import urllib
import xml.etree.ElementTree

import attr
from PB.recipe_modules.pigweed.checkout.options import Options
from recipe_engine import config_types, recipe_api

PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'


@attr.s
class Manifest:
    remotes = attr.ib(default=attr.Factory(dict))
    projects = attr.ib(default=attr.Factory(list))

    def dict(self):
        return {
            'remotes': {k: v.dict() for k, v in self.remotes.items()},
            'projects': [x.dict() for x in self.projects],
        }


class Url:
    def __init__(self, url, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url = url
        self.https = None

    def dict(self):
        return self.__dict__.copy()


@attr.s
class Remote:
    """Remote config from manifest."""

    name = attr.ib(type=str)
    fetch = attr.ib(type=Url)
    review = attr.ib(type=str, default=None)
    revision = attr.ib(type=str, default=None)
    alias = attr.ib(type=str, default=None)

    def dict(self):
        res = self.__dict__.copy()
        res['fetch'] = res['fetch'].dict()
        return res


@attr.s
class Project:
    """Key variables describing a repository/project."""

    name = attr.ib(type=str)
    path = attr.ib(type=str)
    remote = attr.ib(type=str)
    revision = attr.ib(type=str)
    upstream = attr.ib(type=str)
    url = attr.ib(type=str, default=None)

    def path_object(self, root):
        return root.join(self.path)

    def dict(self):
        return self.__dict__.copy()


def _str_or_none(x):
    if x is None:
        return x
    return str(x)


def _int_or_none(x):
    if x is None:
        return x
    return int(x)


@attr.s
class Change:
    """Data from buildbucket."""

    number = attr.ib(converter=int)
    bb_input = attr.ib(repr=False)
    remote = attr.ib(converter=_str_or_none)
    ref = attr.ib(converter=_str_or_none)
    rebase = attr.ib(type=bool)
    project = attr.ib(type=_str_or_none)
    branch = attr.ib(converter=_str_or_none)
    gerrit_name = attr.ib(converter=_str_or_none)
    submitted = attr.ib(type=bool)
    patchset = attr.ib(converter=_int_or_none, default=None)
    applied = attr.ib(type=bool, default=False, repr=False)
    base = attr.ib(converter=_str_or_none, default=None)
    base_type = attr.ib(converter=_str_or_none, default=None)
    is_merge = attr.ib(type=bool, default=False)
    commit_message = attr.ib(type=str, default='')

    @property
    def gerrit_host(self):
        return f'https://{self.gerrit_name}-review.googlesource.com'

    @property
    def gerrit_url(self):
        if not self.number:
            return self.gitiles_url
        return f'{self.gerrit_host}/c/{self.number}'

    @property
    def gitiles_url(self):
        return f'{self.remote}/+/{self.ref}'

    @property
    def name(self):
        return f'{self.gerrit_name}:{self.number}'


@attr.s
class Submodule:
    """Submodule properties."""

    api = attr.ib(type=recipe_api.RecipeApi, repr=False)
    hash = attr.ib(type=str)
    relative_path = attr.ib(type=str)
    path = attr.ib(type=config_types.Path)
    name = attr.ib(type=str)
    describe = attr.ib(type=str)
    remote = attr.ib(type=str)
    initialized = attr.ib(type=bool)
    modified = attr.ib(type=bool)
    conflict = attr.ib(type=bool)
    branch = attr.ib(type=str)
    url = attr.ib(type=str)
    update = attr.ib(type=str)
    ignore = attr.ib(type=str)
    shallow = attr.ib(type=bool)
    fetchRecurseSubmodules = attr.ib(type=bool)
    describe = attr.ib(type=str)


@attr.s
class StatusOfChanges:
    """Changes that were applied or not applied."""

    applied = attr.ib()  # Tuple of Change.
    not_applied = attr.ib()  # Tuple of Change.


@attr.s
class CheckoutContext:
    _api = attr.ib(repr=False)

    # Options protobuf passed in to checkout module.
    options = attr.ib(default=None)

    # List of triggering changes.
    changes = attr.ib(default=None)

    # Actual checkout root.
    top = attr.ib(default=None)

    # Logical checkout root. Usually identical to 'top', but occasionally a
    # subdirectory instead.
    root = attr.ib(default=None)

    # Which triggering changes were applied or not applied.
    status = attr.ib(default=None)

    # Remotes that should be treated identically.
    equivalent_remotes = attr.ib(default=attr.Factory(dict))

    # Parsed repo manifest.
    manifest = attr.ib(default=None)

    # Path to a JSON file containing metadata about the triggering changes.
    changes_json = attr.ib(default=None)

    # Current revision number.
    def revision(self):
        if hasattr(self, '_revision'):
            return self._revision

        self._revision = self._api.checkout.get_revision(self.root)
        return self._revision

    # Repo manifest with all projects pinned.
    def manifest_snapshot(self):
        if not self.options.use_repo:
            return None

        if hasattr(self, '_manifest_snapshot'):
            return self._manifest_snapshot

        with self._api.context(cwd=self.top):
            self._manifest_snapshot = self._api.repo.manifest_snapshot()
            return self._manifest_snapshot

    # Equivalent of manifest_snapshot() but not as strictly formatted.
    def submodule_snapshot(self):
        if self.options.use_repo:
            return None

        if hasattr(self, '_submodule_snapshot'):
            return self._submodule_snapshot

        with self._api.context(cwd=self.root):
            # To get step_test_data line to pass pylint.
            raw_io_stream_output = self._api.raw_io.test_api.stream_output_text

            self._submodule_snapshot = (
                self._api.git(
                    'submodule-status',
                    'submodule',
                    'status',
                    '--recursive',
                    stdout=self._api.raw_io.output_text(),
                    step_test_data=lambda: raw_io_stream_output(
                        'submodule status filler text',
                    ),
                    ok_ret='any',
                ).stdout.strip()
                or ''
            )
            return self._submodule_snapshot

    def snapshot_to_dir(self, directory):
        self._api.file.ensure_directory('mkdir', directory)
        if self.manifest_snapshot():
            self._api.file.write_text(
                'write manifest.xml',
                directory / 'manifest.xml',
                self.manifest_snapshot(),
            )

        if self.submodule_snapshot():
            self._api.file.write_text(
                'write submodule snapshot',
                directory / 'submodules.log',
                self.submodule_snapshot(),
            )

        with self._api.context(cwd=self.root):
            log = self._api.git(
                'log',
                'log',
                '--oneline',
                '-n',
                '10',
                stdout=self._api.raw_io.output_text(),
                ok_ret='any',
            ).stdout
        self._api.file.write_text(
            'write git log', directory / 'git.log', log,
        )

    def submodules(self, recursive=False):
        """Return data about all submodules."""

        cmd = [
            'python3',
            self._api.checkout.resource('submodule_status.py'),
            self.root,
            self._api.json.output(),
        ]

        if recursive:
            cmd.append('--recursive')

        submodules = []
        submodule_status = self._api.step(
            'submodule status',
            cmd,
            step_test_data=lambda: self._api.json.test_api.output({}),
        ).json.output
        for sub in submodule_status.values():
            sub['remote'] = self._api.sso.sso_to_https(sub['remote'])
            if sub['remote'].endswith('.git'):
                sub['remote'] = sub['remote'][:-4]
            sub['relative_path'] = sub['path']
            sub['path'] = self.root / sub['path']
            submodules.append(Submodule(self._api, **sub))

        return submodules

    _REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')

    def gerrit_host(self):
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        gerrit_review_host = f"{match.group('host')}"
        if '-review' not in gerrit_review_host:
            gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
        return gerrit_review_host

    def gerrit_project(self):
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        return match.group('project')

    def remotes_equivalent(self, remote1, remote2):
        # Sometimes remote1 or remote2 is None. In that case we shouldn't
        # convert sso to https.
        if remote1:
            remote1 = self._api.sso.sso_to_https(remote1)
        if remote2:
            remote2 = self._api.sso.sso_to_https(remote2)
        if remote1 == remote2:
            return True
        return remote1 in self.equivalent_remotes.get(remote2, ())


class CheckoutApi(recipe_api.RecipeApi):
    """Calls to checkout code."""

    def _read_manifest(self, manifest_remote, manifest_file):
        """Reads manifest file to get git repo locations."""

        with self.m.step.nest('read manifest') as read_step:
            manifest_text = self.m.file.read_text('read file', manifest_file)
            read_step.logs['raw'] = manifest_text

            xml_tree = xml.etree.ElementTree.fromstring(manifest_text)

            manifest = Manifest()

            for remote in xml_tree.iter('remote'):
                remote = Remote(**remote.attrib)
                if remote.fetch.startswith('..'):
                    rest = remote.fetch[2:]
                    parsed = urllib.parse.urlparse(manifest_remote)
                    remote.fetch = f'{parsed.scheme}://{parsed.netloc}' + rest
                remote.fetch = Url(remote.fetch)
                remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
                manifest.remotes[remote.name] = remote

            defaults = {}
            for default in xml_tree.iter('default'):
                defaults.update(default.attrib)

            for project in xml_tree.iter('project'):
                name = project.attrib['name']
                path = project.attrib.get('path', name)

                if 'remote' in project.attrib:
                    remote = project.attrib['remote']
                elif 'remote' in defaults:
                    remote = defaults['remote']
                else:  # pragma: no cover
                    assert False, f'remote not specified for {name}'

                assert (
                    remote in manifest.remotes
                ), f'Remote {remote} does not exist'

                if 'revision' in project.attrib:
                    revision = project.attrib['revision']
                elif manifest.remotes[remote].revision:
                    revision = manifest.remotes[remote].revision
                elif 'revision' in defaults:
                    revision = defaults['revision']
                else:  # pragma: no cover
                    assert False, f'revision not specified for {name}'

                if 'upstream' in project.attrib:
                    upstream = project.attrib['upstream']
                elif 'upstream' in defaults:  # pragma: no cover
                    # This is unlikely to be used and hard to test--it requires
                    # a completely separate manifest definition, otherwise the
                    # 'else' condition won't be covered. It's also simple.
                    upstream = defaults['upstream']
                else:
                    upstream = revision

                # urllib.urljoin does something different than what's desired
                # here.
                url = '/'.join(
                    (
                        manifest.remotes[remote].fetch.https.rstrip('/'),
                        name.lstrip('/'),
                    )
                )
                manifest.projects.append(
                    Project(
                        name=name,
                        path=path,
                        remote=remote,
                        revision=revision,
                        upstream=upstream,
                        url=url,
                    )
                )

            self.m.file.write_json(
                'manifest json',
                self.m.path['start_dir'] / 'manifest.json',
                manifest.dict(),
            )

        return manifest

    def _process_gerrit_change(self, ctx, bb_input, change):
        """Process a LUCI GerritChange and return a Change object."""

        assert change.host
        ref = f'refs/changes/{change.change % 100:02}/{change.change}/{change.patchset}'
        host = change.host.replace(
            '-review.googlesource.com', '.googlesource.com'
        )
        remote = f'https://{host}/{change.project}'.strip('/')
        gerrit_name = host.split('.')[0]
        details = self.m.gerrit.change_details(
            'details',
            change_id=str(change.change),
            host=change.host,
            max_attempts=5,
            query_params=['CURRENT_COMMIT', 'CURRENT_REVISION',],
            timeout=30,
            test_data=self.m.json.test_api.output(
                {
                    'branch': 'main',
                    'current_revision': 'f' * 40,
                    'revisions': {
                        'f' * 40: {'commit': {'parents': [{}], 'message': '',},}
                    },
                    'project': 'pigweed',
                }
            ),
        ).json.output
        branch = details['branch']

        rebase = not ctx.options.force_no_rebase

        current_revision = details['revisions'][details['current_revision']]
        is_merge = len(current_revision['commit']['parents']) > 1
        if is_merge:
            rebase = False

        return Change(
            number=change.change,
            patchset=change.patchset,
            bb_input=bb_input,
            remote=remote,
            ref=ref,
            rebase=rebase,
            is_merge=is_merge,
            branch=branch,
            gerrit_name=gerrit_name,
            submitted=False,
            commit_message=current_revision['commit']['message'],
            project=details['project'],
        )

    def _process_gerrit_changes(self, ctx, bb_input):
        seen = set()
        for i, change in enumerate(bb_input.gerrit_changes):
            with self.m.step.nest(str(i)):
                result = self._process_gerrit_change(ctx, bb_input, change)
                yield result
                seen.add(result.name)

        deps, unresolved = self.m.cq_deps.resolve(
            result.gerrit_name, result.number,
        )
        for dep in deps:
            # dep.name should only appear in seen if there are multiple
            # gerrit_changes from buildbucket and a later one depends on an
            # earlier one. If buildbucket has multiple gerrit_changes the
            # cq_deps module is not needed here, so this is just double-checking
            # something that shouldn't happen.
            if dep.name in seen:  # pragma: no cover
                continue
            seen.add(dep.name)
            yield self._process_gerrit_change(ctx, bb_input, dep)

        for cl in unresolved:
            yield Change(
                number=cl.change,
                bb_input=None,
                remote=None,
                ref=None,
                rebase=None,
                project=None,
                branch=None,
                gerrit_name=cl.gerrit_name,
                submitted=False,
            )

    def _number_details(self, host, commit_hash, branch='main'):
        if 'github.com' in host or 'github-review' in host:
            return None  # pragma: no cover

        try:
            results = self.m.gerrit.change_query(
                'number',
                f'commit:{commit_hash}',
                host=host,
                max_attempts=5,
                timeout=30,
                test_data=self.m.json.test_api.output(
                    [
                        {
                            '_number': '1234',
                            'branch': branch,
                            'project': 'pigweed',
                        }
                    ]
                ),
            ).json.output
            # Skip this change if it didn't go through Gerrit.
            if results and len(results) == 1:
                return results[0]
        except self.m.step.StepFailure:  # pragma: no cover
            pass

        return None

    def _change_data(self, ctx, remote=None, branch=None):
        bb_input = self.m.buildbucket.build.input
        results = []

        triggers = collections.defaultdict(dict)
        for trigger in self.m.scheduler.triggers:
            gitiles = trigger.gitiles
            if gitiles:
                triggers[gitiles.repo][gitiles.revision] = trigger

        with self.m.step.nest('change data'):
            if bb_input.gerrit_changes:
                with self.m.step.nest('process gerrit changes'):
                    results.extend(self._process_gerrit_changes(ctx, bb_input))

            elif bb_input.gitiles_commit.id:
                with self.m.step.nest('process gitiles commit'):
                    commit = bb_input.gitiles_commit
                    assert commit.host
                    if commit.project:
                        remote = f'https://{commit.host}/{commit.project}'

                    host = commit.host.replace(
                        '.googlesource.com', '-review.googlesource.com'
                    )
                    gerrit_name = commit.host.split('.')[0]

                    result = self._number_details(host, commit.id)

                    if result:
                        branch = result['branch']
                        if commit.id in triggers[remote]:
                            branch = triggers[remote][commit.id].gitiles.ref
                            if branch.startswith('refs/heads/'):
                                branch = branch[len('refs/heads/') :]

                        results.append(
                            Change(
                                number=result['_number'],
                                bb_input=bb_input,
                                remote=remote,
                                ref=commit.id,
                                rebase=False,
                                branch=branch,
                                gerrit_name=gerrit_name,
                                submitted=True,
                                project=result['project'],
                            )
                        )

            if not results:
                # If not triggered by a gitiles_poller gitiles_commit may be
                # empty. In that case treat the most recent commit on the
                # remote as the triggering commit. This is a good assumption
                # except for Android Repo Tool projects, unless all projects
                # are pinned to commits instead of tracking branches. However,
                # even if this is wrong it's close enough to have utility.
                head = self.m.git.get_remote_branch_head(remote, branch)
                gerrit_name = urllib.parse.urlparse(remote).netloc.split('.')[0]
                host = f'{gerrit_name}-review.googlesource.com'
                result = self._number_details(host, head)

                results.append(
                    Change(
                        number=result['_number'] if result else 0,
                        bb_input=bb_input,
                        remote=remote,
                        ref=head,
                        rebase=False,
                        branch=result['branch'] if result else branch,
                        gerrit_name=gerrit_name,
                        project=None,
                        submitted=True,
                    )
                )

            with self.m.step.nest('changes'):
                for result in results:
                    with self.m.step.nest(result.name) as change_data_pres:
                        change_data_pres.step_summary_text = repr(result)

            return tuple(results)

    def _matching_branches(self, repo, branches, name='has branch', **kwargs):
        """Returns the subset of the given branches that exist on gitiles."""
        matches = set()
        with self.m.step.nest(name), self.m.context(infra_steps=True):
            for branch in branches:
                head = self.m.git.get_remote_branch_head(
                    repo,
                    branch,
                    step_name=f'git ls-remote {branch}',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        ''
                    ),
                    **kwargs,
                )
                with self.m.step.nest('head') as pres:
                    pres.step_summary_text = repr(head)

                if head:
                    matches.add(branch)
        return sorted(matches)

    def _apply_change(self, ctx, change, cwd=None, extra_calls=None):
        """Applies the given change to the given directory.

        Args:
            change (Change): Change to apply.
            cwd (Path): Working directory, defaults to current directory.
            extra_calls (callable): Additional steps to run within the nested
                'apply ...' step and, if specified, within directory cwd.
        """
        kwargs = {'cwd': cwd} if cwd else {}
        change.applied = True

        apply_step = f'apply {change.name}'
        with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
            pres.links['gerrit'] = change.gerrit_url
            pres.links['gitiles'] = change.gitiles_url
            if cwd:
                pres.step_summary_text = str(self.m.path.relpath(cwd, ctx.root))

            with self.m.context(infra_steps=True):
                # 'git fetch' fails if a submodule pin in the patch isn't
                # present in the remote (for example, if the pin is only
                # present in the uploader's workspace). Use
                # '--no-recurse-submodules' here so 'git fetch' doesn't fail
                # but instead 'git rebase' or 'git submodule update' fails
                # later (important because those are not infra steps). Also
                # don't use '--recurse-submodules' in 'git checkout' for
                # similar reasons.
                with self.m.default_timeout():
                    self.m.git.fetch(
                        change.remote,
                        change.ref,
                        recurse_submodules=False,
                        step_name='git fetch patch',
                    )
                self.m.git(
                    'git checkout patch',
                    'checkout',
                    '--force',
                    '-b',
                    'working',
                    'FETCH_HEAD',
                )

            # These remain unused if change.submitted is False.
            remote = remote_branch = None

            with self.m.context(infra_steps=True):
                # Change "https://foo.googlesource.com/bar"
                #     to "https___foo_googlesource_com_bar".
                remote = re.sub(r'[^\w]', '_', change.remote)
                remote_branch = '/'.join((remote, change.branch))
                self.m.git(
                    'git remote add', 'remote', 'add', remote, change.remote,
                )

                with self.m.default_timeout():
                    self.m.git.fetch(
                        remote,
                        f'refs/heads/{change.branch}',
                        prune=False,
                        step_name='git fetch branch',
                    )

                self.m.git(
                    'git set upstream',
                    'branch',
                    f'--set-upstream-to={remote_branch}',
                )

            if not change.submitted:
                with self.m.context(infra_steps=True):
                    self.m.git('pre-rebase log', 'log', '--oneline', '-n', '10')

            if change.submitted:
                change.base = self.m.git.rev_parse(
                    'HEAD',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'HEAD_' * 8,
                    ),
                )
                change.base_type = 'submitted_commit_hash'

            elif change.rebase:
                self.m.git('git rebase', 'rebase', remote_branch)

                change.base = self.m.git.rev_parse(
                    remote_branch,
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'REMOTE_BRANCH_' * 3,
                    ),
                )
                change.base_type = 'remote_branch_tip'

            else:
                change.base = self.m.git(
                    'merge-base',
                    'merge-base',
                    'HEAD',
                    remote_branch,
                    stdout=self.m.raw_io.output_text(),
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'MERGEBASE_' * 4,
                    ),
                ).stdout
                change.base_type = 'merge-base'

            # In most cases this is redundant or unnecessary, but it shouldn't
            # cause problems. It's necessary when a superproject CL is updating
            # a submodule pin and we need to sync the submodule to the new
            # revision.
            with self.m.default_timeout():
                # See b/243673776 for why we detach before updating submodules.
                self.m.git('detach', 'checkout', '--detach')
                self.m.git.update_submodule(
                    recursive=True, timeout=ctx.options.submodule_timeout_sec,
                )
                self.m.git('reattach', 'checkout', '-')

            # TODO(b/237660477) Make this function a context manager so callers
            # can do the following:
            # with self._apply_change(...):
            #   extra_calls()
            if extra_calls:
                extra_calls()

    def _check_unapplied_changes(self, changes):
        applied = []
        failed_to_apply = []
        if not changes:  # pragma: no cover
            return None

        def handle_unapplied_change(change):
            with self.m.step.nest(f'failed to apply {change.name}') as pres:
                pres.status = 'WARNING'
                pres.links['gerrit'] = change.gerrit_url
                pres.links['gitiles'] = change.gitiles_url
            failed_to_apply.append(change)

        with self.m.context(infra_steps=True):
            if all(not x.applied for x in changes):
                with self.m.step.nest('no changes were applied') as pres:
                    pres.status = 'FAILURE'
                    for change in changes:
                        handle_unapplied_change(change)
                    pres.properties['changes'] = [x.name for x in changes]

                raise self.m.step.InfraFailure(
                    'could not find triggering changes in checkout'
                )

            elif any(not x.applied for x in changes):
                with self.m.step.nest('some changes were not applied') as pres:
                    pres.status = 'WARNING'
                    for change in changes:
                        if change.applied:
                            applied.append(change)
                        else:
                            handle_unapplied_change(change)

            else:
                applied.extend(changes)

        with self.m.step.nest('status') as pres:
            pres.step_summary_text = (
                f'applied {applied}\nnot applied {failed_to_apply}'
            )

        return StatusOfChanges(
            applied=tuple(applied), not_applied=tuple(failed_to_apply),
        )

    def _cached_checkout(
        self,
        remote,
        path,
        ref,
        submodules,
        included_submodules=None,
        excluded_submodules=None,
        submodule_timeout_sec=10 * 60,
        cache=True,
        use_packfiles=True,
        **kwargs,
    ):
        submodule_paths = included_submodules = included_submodules or []

        if cache:
            with self.m.step.nest('cache'), self.m.cache.guard('git'):
                parsed_remote = urllib.parse.urlparse(remote)
                cache_name = parsed_remote.hostname + parsed_remote.path.replace(
                    '-', '--'
                ).replace(
                    '/', '-'
                )
                cache_path = self.m.path['cache'] / 'git' / cache_name
                self.m.file.ensure_directory('makedirs', cache_path)

                with self.m.context(cwd=cache_path):
                    dotgit = cache_path / '.git'
                    if self.m.path.exists(dotgit):  # pragma: no cover
                        self.m.git.config_remove_section(
                            'remote.origin', **kwargs
                        )
                    else:
                        self.m.git.init(bare=False, **kwargs)

                    self.m.git.config(
                        'remote.origin.url',
                        remote,
                        step_name='remote set-url',
                        **kwargs,
                    )

                    if use_packfiles:
                        self.m.git.config(
                            'fetch.uriprotocols',
                            'https',
                            step_name='set fetch.uriprotocols',
                            **kwargs,
                        )

                    with self.m.default_timeout():
                        self.m.git.fetch(
                            repository='origin',
                            prune=True,
                            tags=True,
                            recurse_submodules=submodules,
                            **kwargs,
                        )

                    self.m.git.raw_checkout(
                        ref='FETCH_HEAD', force=True, **kwargs
                    )

                    if included_submodules and excluded_submodules:
                        raise self.m.step.InfraFailure(
                            'cannot specify both included_submodules and '
                            'excluded_submodules'
                        )

                    submodule_paths = included_submodules
                    if excluded_submodules:
                        submodule_status = self.m.git(
                            'submodule status',
                            'submodule',
                            'status',
                            stdout=self.m.raw_io.output_text(),
                            step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                                '-0000000000000000000000000000000000000000 pigweed\n'
                                '-1111111111111111111111111111111111111111 nanopb\n'
                            ),
                        ).stdout.splitlines()

                        submodule_paths = [
                            x.split(None, 1)[1] for x in submodule_status
                        ]
                        for sub in excluded_submodules:
                            if sub not in submodule_paths:
                                raise self.m.step.InfraFailure(
                                    f'excluded submodule {sub} is not a submodule'
                                )
                            with self.m.step.nest(f'excluding submodule {sub}'):
                                pass
                            submodule_paths.remove(sub)

                    for sub in submodule_paths:
                        with self.m.step.nest(f'including submodule {sub}'):
                            pass

                    if submodules or submodule_paths:
                        self.m.git.sync_submodule(recursive=True, **kwargs)
                        with self.m.default_timeout():
                            self.m.git.update_submodule(
                                recursive=True,
                                force=True,
                                paths=submodule_paths,
                                timeout=submodule_timeout_sec,
                                **kwargs,
                            )

                    if not submodules:
                        # Even though submodules weren't requested, if the cache
                        # had any active submodules we need to update them.
                        # Otherwise we'll get weird situations in rolls where an
                        # uninvolved submodule will be rolled back.
                        with self.m.default_timeout():
                            self.m.git.update_submodule(
                                recursive=True,
                                force=True,
                                init=False,
                                timeout=submodule_timeout_sec,
                                **kwargs,
                            )

            self.m.file.copytree(
                'copy from cache', cache_path, path, symlinks=True
            )

        # Deliberately not combining contexts into one line so it's obvious to
        # both devs and Python which one is "outer" and which is "inner".
        with self.m.step.nest('git checkout'):
            with self.m.default_timeout():
                self.m.git_checkout(
                    repo=remote,
                    path=path,
                    cache=False,
                    revision=ref,
                    recursive=submodules,
                    submodules=submodules,
                    submodule_force=submodules,
                    submodule_paths=submodule_paths,
                    step_name="",
                    use_packfiles=use_packfiles,
                )

    def _git(self, ctx):
        """Checkout code from git."""

        super_branch = self._matching_branch(ctx) or ctx.options.branch

        with self.m.context(infra_steps=True):
            self._cached_checkout(
                ctx.options.remote,
                path=ctx.root,
                ref=super_branch,
                cache=not ctx.options.do_not_cache,
                submodules=ctx.options.initialize_submodules,
                submodule_timeout_sec=ctx.options.submodule_timeout_sec,
                included_submodules=ctx.options.included_submodules,
                excluded_submodules=ctx.options.excluded_submodules,
                use_packfiles=not ctx.options.do_not_use_packfiles,
            )

        with self.m.context(cwd=ctx.root):
            got_revision = None
            got_revision_type = 'no_trigger'

            submodules = []

            if ctx.options.use_trigger:
                got_revision = self.m.git.rev_parse(
                    'HEAD',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'HEAD' * 10,
                    ),
                )

                # Check for CLs for the top-level repository.
                for change in ctx.changes:
                    if ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):
                        self._apply_change(ctx, change)
                        got_revision = change.base
                        got_revision_type = change.base_type

                submodules = ctx.submodules(recursive=True)

                # Check for CLs for submodules.
                # There are three rough cases:
                # 1. Zero submodules have matching remotes. In this case we do
                #    nothing.
                # 2. Exactly one submodule has a matching remote. In this case,
                #    the change is applied to that submodule, even if the branch
                #    does not match.
                # 3. Multiple submodules have matching remotes.
                #    1. Exactly one of them matches the triggering change's
                #       branch. In this case, the change is applied to this
                #       submodule.
                #    2. Zero or multiple submodules match both the remote and
                #       the branch of the triggering change. In these cases we
                #       error out.
                for change in ctx.changes:
                    matching_submodules = []
                    for submodule in submodules:
                        if ctx.remotes_equivalent(
                            submodule.remote, change.remote
                        ):
                            matching_submodules.append(submodule)

                    if not matching_submodules:
                        continue

                    if len(matching_submodules) > 1:
                        submodule_info = ', '.join(
                            f'{self.m.path.relpath(sub.path, ctx.root)} '
                            f'(branch {sub.branch})'
                            for sub in matching_submodules
                        )

                        matching_with_branch = []
                        for submodule in matching_submodules:
                            if submodule.branch == change.branch:
                                matching_with_branch.append(submodule)

                        if len(matching_with_branch) == 1:
                            matching_submodules = matching_with_branch

                        elif len(matching_with_branch) > 1:
                            raise self.m.step.StepFailure(
                                f'change {change.name} (branch '
                                f'{change.branch}) matches multiple submodules '
                                f'({submodule_info}), but too many branches '
                                'match'
                            )

                        else:
                            raise self.m.step.StepFailure(
                                f'change {change.name} '
                                f'(branch {change.branch}) matches multiple '
                                f'submodules ({submodule_info}) but no '
                                'branches match'
                            )

                    if len(matching_submodules) == 1:
                        submodule = matching_submodules[0]
                        if not ctx.options.initialize_submodules:
                            with self.m.default_timeout():
                                self.m.git.update_submodule(
                                    paths=(submodule.path,)
                                )
                        self._apply_change(ctx, change, cwd=submodule.path)

                ctx.status = self._check_unapplied_changes(ctx.changes)

                def _vars_primitive_only(x):
                    return {
                        k: v
                        for k, v in vars(x).items()
                        if isinstance(v, (int, str, bool, type(None)))
                    }

                applied_changes = [
                    _vars_primitive_only(x) for x in ctx.changes if x.applied
                ]
                ctx.changes_json = self.m.path.mkstemp()
                self.m.file.write_json(
                    'write changes.json', ctx.changes_json, applied_changes,
                )

            # Run git log for both the top-level checkout and every submodule.
            with self.m.step.nest('git log'):
                self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10')
                for submodule in sorted(submodules):
                    with self.m.context(cwd=submodule.path):
                        self.m.git(
                            str(submodule.path), 'log', '--oneline', '-n', '10',
                        )

            if got_revision:
                with self.m.step.nest('base') as pres:
                    pres.properties['got_revision'] = got_revision
                    # got_revision_type isn't needed by anything but helps
                    # explain why got_revision is the value it is.
                    pres.properties['got_revision_type'] = got_revision_type

    def _matching_branch(self, ctx):
        """Return if there are manifest branches that match the triggering CLs.

        If the triggering change is on a branch name that is also present in the
        manifest or superproject remote, use that branch when checking out the
        project.

        Args:
            ctx (CheckoutContext): Context object.

        Raises:
            StepFailure if there are multiple matching branches.

        Returns:
            One matching branch name, or None.
        """
        if not ctx.options.match_branch or not ctx.options.use_trigger:
            with self.m.step.nest('not matching branch names'):
                return

        kind = 'manifest' if ctx.options.use_repo else 'superproject'

        manifest_branch = None
        branch_names = sorted(
            set(
                x.branch
                for x in ctx.changes
                if x.branch not in ('master', 'main', None)
            )
        )

        if not branch_names:
            with self.m.step.nest('no non-standard branch names'):
                return

        with self.m.step.nest('branch names') as pres:
            pres.step_summary_text = str(branch_names)

        matching_branches = self._matching_branches(
            ctx.options.remote, branch_names, name=f'{kind} has branch'
        )
        if not matching_branches:
            with self.m.step.nest('no branch names match'):
                return

        if len(matching_branches) > 1:
            with self.m.step.nest(
                f"too many matching branches ({', '.join(matching_branches)})"
            ) as pres:
                pres.step_summary_text = (
                    "Can't figure out which {} branch to use. Remove some "
                    '"Requires:" lines to simplify the checkout.'.format(kind)
                )
                raise self.m.step.StepFailure('multiple matching branches')

        manifest_branch = matching_branches.pop()
        self.m.step(
            f'changing {kind} branch to {manifest_branch}', None,
        )
        return manifest_branch

    def _repo(self, ctx):
        """Checkout code from an Android Repo Tool manifest.

        Args:
            remote (str): URL of git repository.
            branch (str): Remote branch to retrieve.
            manifest_file (str): Name of manifest XML file.
            use_trigger (bool): Attempt to apply the triggering change to the
                checkout.
            root (Path): Path to checkout into.
            changes (sequence[Change]): List of triggering changes.

        Returns:
            _StatusOfChanges with applied and not applied CLs.
        """

        # Git makes the top-level folder, Repo requires caller to make it.
        self.m.file.ensure_directory('mkdir checkout', ctx.root)

        with self.m.context(cwd=ctx.root):
            manifest_branch = self._matching_branch(ctx) or ctx.options.branch

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_init_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_init_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                if ctx.options.manifest_groups:
                    kwargs['groups'] = ctx.options.manifest_groups

                self.m.repo.init(
                    manifest_url=ctx.options.remote,
                    manifest_branch=manifest_branch,
                    manifest_name=ctx.options.manifest_file,
                    **kwargs,
                )

            manifests_dir = ctx.root / '.repo' / 'manifests'
            # If the triggering CL is a manifest change, apply it before running
            # sync.
            if ctx.options.use_trigger:
                for change in ctx.changes:
                    if change.remote and ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):

                        def update_upstream():
                            # Right now the upstream of 'working' is the local
                            # 'default' branch. 'repo sync' complains if the
                            # upstream isn't remote, so it's changed to the
                            # remote branch that's identical to 'default'.
                            self.m.git(
                                'git branch',
                                'branch',
                                f'--set-upstream-to=origin/{manifest_branch}',
                            )

                        self._apply_change(
                            ctx,
                            change,
                            cwd=manifests_dir,
                            extra_calls=update_upstream,
                        )

            ctx.manifest = self._read_manifest(
                ctx.options.remote, manifests_dir / ctx.options.manifest_file,
            )

            for _, remote_host in sorted(ctx.manifest.remotes.items()):
                if remote_host.fetch.url.startswith('sso://'):
                    self.m.sso.configure_insteadof(remote_host.fetch.url)

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_sync_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_sync_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                self.m.repo.sync(
                    force_sync=True, current_branch=True, jobs=2, **kwargs
                )
                self.m.repo.start('base')

            if ctx.options.use_trigger:
                for change in ctx.changes:
                    for entry in ctx.manifest.projects:
                        if ctx.remotes_equivalent(entry.url, change.remote):

                            def compare_branch_name():
                                with self.m.step.nest(
                                    'compare branch name'
                                ) as pres:
                                    pres.step_summary_text = (
                                        'CL branch: {}\nupstream branch: {}'
                                    ).format(change.branch, entry.upstream)

                            self._apply_change(
                                ctx,
                                change,
                                cwd=entry.path_object(ctx.root),
                                extra_calls=compare_branch_name,
                            )

                ctx.status = self._check_unapplied_changes(ctx.changes)

        # Some dependent projects have everything inside one top-level folder
        # in their repo workspace. For those projects pretend that top-level
        # folder is actually the checkout root. The top member will always
        # point to the actual repo workspace root.
        ctx.top = ctx.root
        files = set(self.m.file.listdir('ls', ctx.root))
        dotrepo = ctx.root / '.repo'
        if dotrepo in files:
            files.remove(dotrepo)
        orig_root = ctx.root
        if len(files) == 1:
            ctx.root = files.pop()

    def _name(self, options):
        """Turn "https://foo/bar/baz.git" into "baz"."""
        name = options.remote.rstrip('/')
        if name.endswith('.git'):
            name = name[:-4]
        parts = name.split('/')
        if options.use_repo and parts[-1] == 'manifest':
            parts.pop(-1)
        return f'checkout {parts[-1]}'

    def __call__(self, options, root=None, name=None):
        """Checkout code."""

        checkout_name = name or self._name(options)

        assert options.remote

        options.manifest_file = options.manifest_file or 'default.xml'
        options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20
        options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60
        options.number_of_attempts = options.number_of_attempts or 3
        options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60

        ctx = CheckoutContext(api=self.m)
        ctx.options = options
        ctx.changes = []
        ctx.root = root or self.m.path['start_dir'] / 'co'

        for remotes in options.equivalent_remotes:
            new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
            for remote in new_remotes:
                assert remote not in ctx.equivalent_remotes
                ctx.equivalent_remotes[remote] = new_remotes

        with self.m.step.nest(checkout_name) as pres:
            if options.remote.endswith('.git'):
                options.remote = options.remote[:-4]

            if options.use_trigger:
                ctx.changes = self._change_data(
                    ctx, options.remote, options.branch
                )

            if options.use_repo:
                self._repo(ctx)

            else:
                self._git(ctx)

            if ctx.status:
                for change in ctx.status.applied:
                    pres.links[f'applied {change.name}'] = change.gerrit_url

                for change in ctx.status.not_applied:
                    pres.links[
                        f'failed to apply {change.name}'
                    ] = change.gerrit_url

            snapshot_dir = self.m.path['start_dir'] / 'snapshot'
            ctx.snapshot_to_dir(snapshot_dir)

            ctx.top = ctx.root
            if ctx.options.root_subdirectory:
                ctx.root = ctx.root / ctx.options.root_subdirectory

        return ctx

    def get_revision(self, root, name='git log', test_data='HASH'):
        """Like self.revision, but works for secondary checkouts."""
        with self.m.context(cwd=root):
            step = self.m.git(
                name,
                'log',
                '--max-count=1',
                '--pretty=format:%H',
                stdout=self.m.raw_io.output_text(),
                step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                    test_data,
                ),
            )

            result = step.stdout.strip()
            step.presentation.step_summary_text = result
            return result
