# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.

Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""

from __future__ import annotations

import collections
import contextlib
import re
from typing import Any, Sequence, TYPE_CHECKING
import urllib
import xml.etree.ElementTree

import attrs
from PB.go.chromium.org.luci.buildbucket.proto import (
    build as build_pb2,
    common as common_pb2,
)
from PB.go.chromium.org.luci.scheduler.api.scheduler.v1 import (
    triggers as triggers_pb2,
)
from PB.recipe_modules.pigweed.checkout.options import Options
from recipe_engine import recipe_api

if TYPE_CHECKING:  # pragma: no cover
    from recipe_engine import config_types

PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'


def to_dict(obj) -> dict[str, Any]:
    try:
        # Modifications to the dict returned by the built-in vars() function
        # modify the original data structure. Always create a copy for this
        # function to return.
        return __builtins__['vars'](obj).copy()
    except TypeError:
        keys = [x for x in obj.__slots__ if not x.startswith('__')]
        return {k: getattr(obj, k) for k in keys}


@attrs.define
class Manifest:
    remotes: dict[str, 'Remote'] = attrs.Factory(dict)
    projects: list['Project'] = attrs.Factory(list)

    def dict(self) -> dict[str, Any]:
        return {
            'remotes': {k: v.dict() for k, v in self.remotes.items()},
            'projects': [x.dict() for x in self.projects],
        }


class Url:
    def __init__(self, url: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url: str = url
        self.https: str | None = None

    def dict(self) -> dict[str, Any]:
        return to_dict(self)


@attrs.define
class Remote:
    """Remote config from manifest."""

    name: str
    fetch: Url
    review: str | None = None
    revision: str | None = None
    alias: str | None = None

    def dict(self) -> dict[str, Any]:
        res = to_dict(self)
        res['fetch'] = res['fetch'].dict()
        return res


@attrs.define
class Project:
    """Key variables describing a repository/project."""

    name: str
    path: str
    remote: str
    revision: str
    upstream: str
    url: str | None = None

    def path_object(self, root: config_types.Path) -> config_types.Path:
        return root / self.path

    def dict(self) -> dict[str, Any]:
        return to_dict(self)


def _str_or_none(x: Any | None) -> str | None:
    if x is None:
        return x
    return str(x)


def _int_or_none(x: Any | None) -> int | None:
    if x is None:
        return x
    return int(x)


@attrs.define
class Change:
    """Data from buildbucket."""

    number: int = attrs.field(converter=int)
    remote: str | None = attrs.field(converter=_str_or_none)
    ref: str | None = attrs.field(converter=_str_or_none)
    rebase: bool | None = None
    project: str | None = None
    branch: str | None = attrs.field(converter=_str_or_none, default=None)
    gerrit_name: str | None = attrs.field(converter=_str_or_none, default=None)
    submitted: bool = False
    patchset: int | None = attrs.field(converter=_int_or_none, default=None)
    applied: bool = attrs.field(default=False, repr=False)
    path: str | None = None
    base: str | None = attrs.field(converter=_str_or_none, default=None)
    base_type: str | None = attrs.field(converter=_str_or_none, default=None)
    is_merge: bool = attrs.field(default=False)
    commit_message: str = attrs.field(default='')
    topic: str | None = None

    @property
    def gerrit_host(self) -> str:
        return f'https://{self.gerrit_name}-review.googlesource.com'

    @property
    def gerrit_url(self) -> str:
        if not self.number:
            return self.gitiles_url
        return f'{self.gerrit_host}/c/{self.number}'

    @property
    def gitiles_url(self) -> str:
        return f'{self.remote}/+/{self.ref}'

    @property
    def name(self) -> str:
        return f'{self.gerrit_name}:{self.number}'

    @property
    def name_with_path(self) -> str:
        return f'{self.name} ({self.path})'


@attrs.define
class Submodule:
    """Submodule properties."""

    api: recipe_api.RecipeApi = attrs.field(repr=False)
    hash: str
    relative_path: str
    path: config_types.Path
    name: str
    describe: str
    remote: str
    initialized: bool
    modified: bool
    conflict: bool
    branch: str
    url: str
    update: str
    ignore: str
    shallow: bool
    fetchRecurseSubmodules: bool
    describe: str

    def __lt__(self, other: 'Submodule') -> bool:
        return (self.relative_path, self.url) < (other.relative_path, other.url)


@attrs.define
class StatusOfChanges:
    """Changes that were applied or not applied."""

    applied: tuple[Change, ...]
    not_applied: tuple[Change, ...]


@attrs.define(slots=False)
class CheckoutContext:
    _api: recipe_api.RecipeApi = attrs.field(repr=False)
    options: Options = None
    changes: list[Change] | None = None  # List of triggering changes.
    top: config_types.Path = None  # Actual checkout root.
    # Logical checkout root. Usually identical to 'top', but occasionally a
    # subdirectory instead.
    root: config_types.Path = None
    # Which triggering changes were applied or not applied.
    status: StatusOfChanges | None = None
    # Remotes that should be treated identically.
    equivalent_remotes: dict[str, list[str]] | None = attrs.field(factory=dict)
    manifest: Manifest | None = None  # Parsed repo manifest.
    # Path to a JSON file containing metadata about the triggering changes.
    changes_json: config_types.Path | None = None

    # Current revision number.
    def revision(self) -> str:
        if hasattr(self, '_revision'):
            return self._revision

        self._revision = self._api.checkout.get_revision(self.root)
        return self._revision

    def applied_changes(self) -> list[Change]:
        return [x for x in self.changes if x.applied]

    # Repo manifest with all projects pinned.
    def manifest_snapshot(self):
        if not self.options.use_repo:
            return None

        if hasattr(self, '_manifest_snapshot'):
            return self._manifest_snapshot

        with self._api.context(cwd=self.top):
            self._manifest_snapshot = self._api.repo.manifest_snapshot()
            return self._manifest_snapshot

    # Equivalent of manifest_snapshot() but not as strictly formatted.
    def submodule_snapshot(self):
        if self.options.use_repo:
            return None

        if hasattr(self, '_submodule_snapshot'):
            return self._submodule_snapshot

        with self._api.context(cwd=self.root):
            # To get step_test_data line to pass pylint.
            raw_io_stream_output = self._api.raw_io.test_api.stream_output_text

            self._submodule_snapshot = (
                self._api.git(
                    'submodule-status',
                    'submodule',
                    'status',
                    '--recursive',
                    stdout=self._api.raw_io.output_text(),
                    step_test_data=lambda: raw_io_stream_output(
                        'submodule status filler text',
                    ),
                    ok_ret='any',
                ).stdout.strip()
                or ''
            )
            return self._submodule_snapshot

    def snapshot_to_dir(self, directory: config_types.Path) -> None:
        self._api.file.ensure_directory('mkdir', directory)
        if self.manifest_snapshot():
            self._api.file.write_text(
                'write manifest.xml',
                directory / 'manifest.xml',
                self.manifest_snapshot(),
            )

        if self.submodule_snapshot():
            self._api.file.write_text(
                'write submodule snapshot',
                directory / 'submodules.log',
                self.submodule_snapshot(),
            )

        with self._api.context(cwd=self.root):
            log = self._api.git(
                'log',
                'log',
                '--oneline',
                '-n',
                '10',
                stdout=self._api.raw_io.output_text(),
                ok_ret='any',
            ).stdout
        self._api.file.write_text(
            'write git log',
            directory / 'git.log',
            log,
        )

    def submodules(self, recursive: bool = False) -> list[Submodule]:
        """Return data about all submodules."""

        cmd = [
            'python3',
            self._api.checkout.resource('submodule_status.py'),
            self.root,
            self._api.json.output(),
        ]

        if recursive:
            cmd.append('--recursive')

        submodules = []
        submodule_status = self._api.step(
            'submodule status',
            cmd,
            step_test_data=lambda: self._api.json.test_api.output({}),
        ).json.output
        for sub in submodule_status.values():
            sub['remote'] = self._api.sso.sso_to_https(sub['remote'])
            if sub['remote'].endswith('.git'):
                sub['remote'] = sub['remote'][:-4]
            sub['relative_path'] = sub['path']
            sub['path'] = self.root / sub['path']
            submodules.append(Submodule(self._api, **sub))

        return submodules

    _REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')

    def gerrit_host(self) -> str | None:
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        gerrit_review_host = f"{match.group('host')}"
        if '-review' not in gerrit_review_host:
            gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
        return gerrit_review_host

    def gerrit_project(self) -> str | None:
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        return match.group('project')

    def remotes_equivalent(self, remote1: str, remote2: str) -> bool:
        # Sometimes remote1 or remote2 is None. In that case we shouldn't
        # convert sso to https.
        if remote1:
            remote1 = self._api.sso.sso_to_https(remote1).removesuffix('.git')
        if remote2:
            remote2 = self._api.sso.sso_to_https(remote2).removesuffix('.git')
        if remote1 == remote2:
            return True
        return remote1 in self.equivalent_remotes.get(remote2, ())


class CheckoutApi(recipe_api.RecipeApi):
    """Calls to checkout code."""

    Change = Change
    CheckoutContext = CheckoutContext

    def _read_manifest(
        self, manifest_remote: str, manifest_file: str
    ) -> Manifest:
        """Reads manifest file to get git repo locations."""

        with self.m.step.nest('read manifest') as read_step:
            manifest_text: str = self.m.file.read_text(
                'read file', manifest_file
            )
            read_step.logs['raw'] = manifest_text

            xml_tree = xml.etree.ElementTree.fromstring(manifest_text)

            manifest = Manifest()

            for remote in xml_tree.iter('remote'):
                with self.m.step.nest('log') as pres:
                    pres.step_summary_text = repr(remote.attrib)
                remote = Remote(**remote.attrib)
                if remote.fetch.startswith('..'):
                    rest = remote.fetch[2:]
                    parsed = urllib.parse.urlparse(manifest_remote)
                    remote.fetch = f'{parsed.scheme}://{parsed.netloc}' + rest
                remote.fetch = Url(remote.fetch)
                remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
                manifest.remotes[remote.name] = remote

            defaults = {}
            for default in xml_tree.iter('default'):
                defaults.update(default.attrib)

            for project in xml_tree.iter('project'):
                name = project.attrib['name']
                path = project.attrib.get('path', name)

                if 'remote' in project.attrib:
                    remote = project.attrib['remote']
                elif 'remote' in defaults:
                    remote = defaults['remote']
                else:  # pragma: no cover
                    assert False, f'remote not specified for {name}'

                assert (
                    remote in manifest.remotes
                ), f'Remote {remote} does not exist'

                if 'revision' in project.attrib:
                    revision = project.attrib['revision']
                elif manifest.remotes[remote].revision:
                    revision = manifest.remotes[remote].revision
                elif 'revision' in defaults:
                    revision = defaults['revision']
                else:  # pragma: no cover
                    assert False, f'revision not specified for {name}'

                if 'upstream' in project.attrib:
                    upstream = project.attrib['upstream']
                elif 'upstream' in defaults:  # pragma: no cover
                    # This is unlikely to be used and hard to test--it requires
                    # a completely separate manifest definition, otherwise the
                    # 'else' condition won't be covered. It's also simple.
                    upstream = defaults['upstream']
                else:
                    upstream = revision

                # urllib.urljoin does something different than what's desired
                # here.
                url = '/'.join(
                    (
                        manifest.remotes[remote].fetch.https.rstrip('/'),
                        name.lstrip('/'),
                    )
                )
                manifest.projects.append(
                    Project(
                        name=name,
                        path=path,
                        remote=remote,
                        revision=revision,
                        upstream=upstream,
                        url=url,
                    )
                )

            self.m.file.write_json(
                'manifest json',
                self.m.path.start_dir / 'manifest.json',
                manifest.dict(),
            )

        return manifest

    def _process_gerrit_change(
        self,
        ctx: CheckoutContext,
        host: str,
        change_id: str | int,
        project: str = 'pigweed/pigweed',
        patchset: int | None = None,
    ) -> Change:
        """Process a LUCI GerritChange and return a Change object."""

        host = self.m.gerrit.normalize_host(host)
        gitiles_host = host.replace(
            '-review.googlesource.com', '.googlesource.com'
        )
        gerrit_name = gitiles_host.split('.')[0]
        details = self.m.gerrit.change_details(
            'details',
            change_id=str(change_id),
            host=host,
            max_attempts=5,
            query_params=[
                'CURRENT_COMMIT',
                'CURRENT_REVISION',
            ],
            timeout=30,
            test_data=self.m.json.test_api.output(
                {
                    'branch': 'main',
                    'current_revision': 'f' * 40,
                    'revisions': {
                        'f'
                        * 40: {
                            '_number': 3,
                            'commit': {
                                'parents': [{}],
                                'message': '',
                            },
                        }
                    },
                    'project': project,
                }
            ),
        ).json.output
        branch = details['branch']

        remote = f'https://{gitiles_host}/{details["project"]}'.strip('/')
        rebase = not ctx.options.force_no_rebase

        current_revision = details['revisions'][details['current_revision']]
        is_merge = len(current_revision['commit']['parents']) > 1
        if is_merge:
            rebase = False

        if not patchset:
            patchset = current_revision['_number']

        ref = f'refs/changes/{change_id % 100:02}/{change_id}/{patchset}'

        return Change(
            number=int(change_id),
            patchset=patchset,
            remote=remote,
            ref=ref,
            rebase=rebase,
            is_merge=is_merge,
            branch=branch,
            gerrit_name=gerrit_name,
            submitted=False,
            commit_message=current_revision['commit']['message'],
            project=details['project'],
            topic=details.get('topic', None) or None,
        )

    def _process_gerrit_changes(
        self,
        ctx: CheckoutContext,
        bb_input: build_pb2.Build.Input,
    ) -> None:
        seen = set()
        for i, change in enumerate(bb_input.gerrit_changes):
            with self.m.step.nest(str(i)):
                result = self._process_gerrit_change(
                    ctx=ctx,
                    host=change.host,
                    project=change.project,
                    change_id=change.change,
                    patchset=change.patchset,
                )
                yield result
                seen.add(result.name)

        cq_deps_result = self.m.cq_deps.resolve(
            result.gerrit_name,
            result.number,
            result.topic,
        )
        for dep in cq_deps_result.resolved:
            # dep.name should only appear in seen if there are multiple
            # gerrit_changes from buildbucket and a later one depends on an
            # earlier one. If buildbucket has multiple gerrit_changes the
            # cq_deps module is not needed here, so this is just double-checking
            # something that shouldn't happen.
            if dep.name in seen:  # pragma: no cover
                continue
            seen.add(dep.name)
            yield self._process_gerrit_change(
                ctx=ctx,
                host=dep.host,
                project=dep.project,
                change_id=dep.change,
            )

        for cl in cq_deps_result.unresolved:
            yield Change(
                number=cl.change,
                remote=None,
                ref=None,
                rebase=None,
                project=None,
                branch=None,
                gerrit_name=cl.gerrit_name,
                submitted=False,
            )

    def _number_details(
        self,
        host: str,
        commit_hash: str,
        branch: str = 'main',
    ) -> dict[str, Any]:
        if 'github.com' in host or 'github-review' in host:
            return None  # pragma: no cover

        try:
            results = self.m.gerrit.change_query(
                'number',
                f'commit:{commit_hash}',
                host=host,
                max_attempts=5,
                timeout=30,
                test_data=self.m.json.test_api.output(
                    [
                        {
                            '_number': '1234',
                            'branch': branch,
                            'project': 'pigweed',
                        }
                    ]
                ),
            ).json.output
            # Skip this change if it didn't go through Gerrit.
            if results and len(results) == 1:
                return results[0]
        except self.m.step.StepFailure:  # pragma: no cover
            pass

        return None

    def _change_data(
        self,
        ctx: CheckoutContext,
        remote: str = None,
        branch: str = None,
    ) -> tuple[Change, ...]:
        bb_input: build_pb2.Build.Input = self.m.buildbucket.build.input
        results: list[Change] = []

        triggers: dict[str, dict[str, triggers_pb2.Trigger]] = (
            collections.defaultdict(dict)
        )
        for trigger in self.m.scheduler.triggers:
            gitiles: triggers_pb2.GitilesTrigger = trigger.gitiles
            if gitiles:
                triggers[gitiles.repo][gitiles.revision] = trigger

        with self.m.step.nest('change data'):
            if bb_input.gerrit_changes:
                with self.m.step.nest('process gerrit changes'):
                    results.extend(self._process_gerrit_changes(ctx, bb_input))

            elif bb_input.gitiles_commit.id:
                with self.m.step.nest('process gitiles commit'):
                    commit: common_pb2.GitilesCommit = bb_input.gitiles_commit
                    assert commit.host
                    if commit.project:
                        remote: str = f'https://{commit.host}/{commit.project}'

                    host: str = commit.host.replace(
                        '.googlesource.com', '-review.googlesource.com'
                    )
                    gerrit_name: str = commit.host.split('.')[0]

                    result: dict[str, Any] = self._number_details(
                        host, commit.id
                    )

                    if result:
                        branch: str = result['branch']
                        if commit.id in triggers[remote]:
                            branch = triggers[remote][commit.id].gitiles.ref
                            branch = branch.removeprefix('refs/heads/')

                        results.append(
                            Change(
                                number=result['_number'],
                                remote=remote,
                                ref=commit.id,
                                rebase=False,
                                branch=branch,
                                gerrit_name=gerrit_name,
                                submitted=True,
                                project=result['project'],
                            )
                        )

            if not results:
                # If not triggered by a gitiles_poller gitiles_commit may be
                # empty. In that case treat the most recent commit on the
                # remote as the triggering commit. This is a good assumption
                # except for Android Repo Tool projects, unless all projects
                # are pinned to commits instead of tracking branches. However,
                # even if this is wrong it's close enough to have utility.
                head: str = self.m.git.get_remote_branch_head(remote, branch)
                gerrit_name: str = urllib.parse.urlparse(remote).netloc.split(
                    '.'
                )[0]
                host: str = f'{gerrit_name}-review.googlesource.com'
                result: dict[str, Any] = self._number_details(host, head)

                results.append(
                    Change(
                        number=result['_number'] if result else 0,
                        remote=remote,
                        ref=head,
                        rebase=False,
                        branch=result['branch'] if result else branch,
                        gerrit_name=gerrit_name,
                        project=None,
                        submitted=True,
                    )
                )

            with self.m.step.nest('changes'):
                for result in results:
                    with self.m.step.nest(result.name) as change_data_pres:
                        change_data_pres.step_summary_text = repr(result)

            return tuple(results)

    def _matching_branches(
        self,
        repo: str,
        branches: Sequence[str],
        name: str = 'has branch',
        **kwargs,
    ):
        """Returns the subset of the given branches that exist on gitiles."""
        matches: set[str] = set()
        with self.m.step.nest(name), self.m.context(infra_steps=True):
            for branch in branches:
                head: str = self.m.git.get_remote_branch_head(
                    repo,
                    branch,
                    step_name=f'git ls-remote {branch}',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        ''
                    ),
                    **kwargs,
                )
                with self.m.step.nest('head') as pres:
                    pres.step_summary_text = repr(head)

                if head:
                    matches.add(branch)
        return sorted(matches)

    def _apply_change(
        self,
        ctx: CheckoutContext,
        change: Change,
        cwd: config_types.Path = None,
    ):
        """Applies the given change to the given directory.

        Args:
            ctx: Checkout context object.
            change: Change to apply.
            cwd: Working directory, defaults to current directory.
        """
        with self._apply_change_context(ctx=ctx, change=change, cwd=cwd):
            pass

    @contextlib.contextmanager
    def _apply_change_context(
        self,
        ctx: CheckoutContext,
        change: Change,
        cwd: config_types.Path = None,
    ):
        """Applies the given change to the given directory.

        Args:
            ctx: Checkout context object.
            change: Change to apply.
            cwd: Working directory, defaults to current directory.
        """
        kwargs: dict[str, Any] = {'cwd': cwd} if cwd else {}
        change.applied = True
        change.path = self.m.path.relpath(cwd or ctx.root, ctx.root)

        try:

            apply_step: str = f'apply {change.name}'
            with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
                pres.links['gerrit'] = change.gerrit_url
                pres.links['gitiles'] = change.gitiles_url
                if cwd:
                    pres.step_summary_text = str(
                        self.m.path.relpath(cwd, ctx.root)
                    )

                with self.m.context(infra_steps=True):
                    # 'git fetch' fails if a submodule pin in the patch isn't
                    # present in the remote (for example, if the pin is only
                    # present in the uploader's workspace). Use
                    # '--no-recurse-submodules' here so 'git fetch' doesn't fail
                    # but instead 'git rebase' or 'git submodule update' fails
                    # later (important because those are not infra steps). Also
                    # don't use '--recurse-submodules' in 'git checkout' for
                    # similar reasons.
                    with self.m.default_timeout():
                        self.m.git.fetch(
                            change.remote,
                            change.ref,
                            recurse_submodules=False,
                            step_name='git fetch patch',
                        )
                    self.m.git(
                        'git checkout patch',
                        'checkout',
                        '--force',
                        '-b',
                        'working',
                        'FETCH_HEAD',
                    )

                # These remain unused if change.submitted is False.
                remote: str | None = None
                remote_branch: str | None = None

                with self.m.context(infra_steps=True):
                    # Change "https://foo.googlesource.com/bar"
                    #     to "https___foo_googlesource_com_bar".
                    # In Android Repo Tool projects, the remote for the manifest
                    # is often configured in a way that seems incorrect. Instead
                    # of relying on it, create a whole new remote every time
                    # that is always correct.
                    remote = re.sub(r'[^\w]', '_', change.remote)
                    remote_branch = '/'.join((remote, change.branch))
                    self.m.git(
                        'git remote add',
                        'remote',
                        'add',
                        remote,
                        change.remote,
                    )

                    with self.m.default_timeout():
                        self.m.git.fetch(
                            remote,
                            f'refs/heads/{change.branch}',
                            prune=False,
                            step_name='git fetch branch',
                        )

                    self.m.git(
                        'git set upstream',
                        'branch',
                        f'--set-upstream-to={remote_branch}',
                    )

                if not change.submitted:
                    with self.m.context(infra_steps=True):
                        self.m.git(
                            'pre-rebase log', 'log', '--oneline', '-n', '10'
                        )

                if change.submitted:
                    change.base = self.m.git.rev_parse(
                        'HEAD',
                        step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                            'HEAD_' * 8,
                        ),
                    )
                    change.base_type = 'submitted_commit_hash'

                elif change.rebase:
                    self.m.git('git rebase', 'rebase', remote_branch)

                    change.base = self.m.git.rev_parse(
                        remote_branch,
                        step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                            'REMOTE_BRANCH_' * 3,
                        ),
                    )
                    change.base_type = 'remote_branch_tip'

                else:
                    change.base = self.m.git(
                        'merge-base',
                        'merge-base',
                        'HEAD',
                        remote_branch,
                        stdout=self.m.raw_io.output_text(),
                        step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                            'MERGEBASE_' * 4,
                        ),
                    ).stdout
                    change.base_type = 'merge-base'

                # In most cases this is redundant or unnecessary, but it
                # shouldn't cause problems. It's necessary when a superproject
                # CL is updating a submodule pin and we need to sync the
                # submodule to the new revision.
                with self.m.default_timeout():
                    # See b/243673776 for why we detach before updating
                    # submodules.
                    self.m.git('detach', 'checkout', '--detach')
                    self.m.git.update_submodule(
                        recursive=True,
                        timeout=ctx.options.submodule_timeout_sec,
                    )
                    self.m.git('reattach', 'checkout', '-')

                yield

        finally:
            pass

    def _check_unapplied_changes(self, changes: Sequence[Change]):
        applied: list[Change] = []
        failed_to_apply: list[Change] = []
        if not changes:  # pragma: no cover
            return None

        def handle_unapplied_change(change):
            with self.m.step.nest(f'failed to apply {change.name}') as pres:
                pres.status = 'WARNING'
                pres.links['gerrit'] = change.gerrit_url
                pres.links['gitiles'] = change.gitiles_url
            failed_to_apply.append(change)

        with self.m.context(infra_steps=True):
            if all(not x.applied for x in changes):
                with self.m.step.nest('no changes were applied') as pres:
                    pres.status = 'FAILURE'
                    for change in changes:
                        handle_unapplied_change(change)
                    pres.properties['changes'] = [x.name for x in changes]

                raise self.m.step.InfraFailure(
                    'could not find triggering changes in checkout'
                )

            elif any(not x.applied for x in changes):
                with self.m.step.nest('some changes were not applied') as pres:
                    pres.status = 'WARNING'
                    for change in changes:
                        if change.applied:
                            applied.append(change)
                        else:
                            handle_unapplied_change(change)

            else:
                applied.extend(changes)

        with self.m.step.nest('status') as pres:
            pres.step_summary_text = (
                f'applied {applied}\nnot applied {failed_to_apply}'
            )

        return StatusOfChanges(
            applied=tuple(applied),
            not_applied=tuple(failed_to_apply),
        )

    def _cached_checkout(
        self,
        remote: str,
        path: config_types.Path,
        ref: str,
        submodules: bool,
        included_submodules: Sequence[str] | None = None,
        excluded_submodules: Sequence[str] | None = None,
        submodule_timeout_sec: int = 10 * 60,
        cache: bool = True,
        use_packfiles: bool = True,
        **kwargs,
    ):
        submodule_paths = included_submodules = included_submodules or []

        if cache:
            with self.m.step.nest('cache') as pres, self.m.cache.guard('git'):
                parsed_remote = urllib.parse.urlparse(remote)
                cache_name = (
                    parsed_remote.hostname
                    + parsed_remote.path.replace('-', '--').replace('/', '-')
                )
                cache_path = self.m.path.cache_dir / 'git' / cache_name
                self.m.file.ensure_directory('makedirs', cache_path)

                with self.m.context(cwd=cache_path):
                    dotgit = cache_path / '.git'
                    if self.m.path.exists(dotgit):  # pragma: no cover
                        self.m.git.config_remove_section(
                            'remote.origin', **kwargs
                        )
                        pres.step_summary_text = 'hit'
                    else:
                        self.m.git.init(bare=False, **kwargs)
                        pres.step_summary_text = 'miss'

                    self.m.git.config(
                        'remote.origin.url',
                        remote,
                        step_name='remote set-url',
                        **kwargs,
                    )

                    if use_packfiles:
                        self.m.git.config(
                            'fetch.uriprotocols',
                            'https',
                            step_name='set fetch.uriprotocols',
                            **kwargs,
                        )

                    with self.m.default_timeout():
                        try:
                            self.m.git.fetch(
                                repository='origin',
                                prune=True,
                                tags=True,
                                recurse_submodules=submodules,
                                **kwargs,
                            )

                        # If the checkout failed save the git config. It might
                        # not be helpful, but it shouldn't hurt.
                        except self.m.step.StepFailure as exc:
                            with self.m.step.nest('git config'):
                                self.m.git.config(
                                    '--list', '--local', step_name='local'
                                )
                                self.m.git.config(
                                    '--list', '--global', step_name='global'
                                )
                            raise

                    self.m.git.merge(ref='FETCH_HEAD', **kwargs)

                    if included_submodules and excluded_submodules:
                        raise self.m.step.InfraFailure(
                            'cannot specify both included_submodules and '
                            'excluded_submodules'
                        )

                    submodule_paths = included_submodules
                    if excluded_submodules:
                        submodule_status = self.m.git(
                            'submodule status',
                            'submodule',
                            'status',
                            stdout=self.m.raw_io.output_text(),
                            step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                                '-0000000000000000000000000000000000000000 pigweed (abc123)\n'
                                '-1111111111111111111111111111111111111111 nanopb (heads/branch)\n'
                            ),
                        ).stdout.splitlines()

                        submodule_paths = [
                            x.split()[1] for x in submodule_status
                        ]
                        for sub in excluded_submodules:
                            if sub not in submodule_paths:
                                raise self.m.step.InfraFailure(
                                    f'excluded submodule {sub} is not a submodule'
                                )
                            with self.m.step.nest(f'excluding submodule {sub}'):
                                pass
                            submodule_paths.remove(sub)

                    for sub in submodule_paths:
                        with self.m.step.nest(f'including submodule {sub}'):
                            pass

                    if submodules or submodule_paths:
                        self.m.git.sync_submodule(recursive=True, **kwargs)
                        with self.m.default_timeout():
                            self.m.git.update_submodule(
                                recursive=True,
                                force=True,
                                paths=submodule_paths,
                                timeout=submodule_timeout_sec,
                                **kwargs,
                            )

                    if not submodules:
                        # Even though submodules weren't requested, if the cache
                        # had any active submodules we need to update them.
                        # Otherwise we'll get weird situations in rolls where an
                        # uninvolved submodule will be rolled back.
                        with self.m.default_timeout():
                            self.m.git.update_submodule(
                                recursive=True,
                                force=True,
                                init=False,
                                timeout=submodule_timeout_sec,
                                **kwargs,
                            )

            self.m.file.copytree(
                'copy from cache', cache_path, path, symlinks=True
            )

        # Deliberately not combining contexts into one line so it's obvious to
        # both devs and Python which one is "outer" and which is "inner".
        with self.m.step.nest('git checkout'):
            with self.m.default_timeout():
                self.m.git_checkout(
                    repo=remote,
                    path=path,
                    cache=False,
                    revision=ref,
                    recursive=submodules,
                    submodules=submodules,
                    submodule_force=submodules,
                    submodule_paths=submodule_paths,
                    submodule_timeout=submodule_timeout_sec,
                    step_name="",
                    use_packfiles=use_packfiles,
                )

    def _git(self, ctx: CheckoutContext):
        """Checkout code from git."""

        super_branch = self._matching_branch(ctx) or ctx.options.branch

        with self.m.context(infra_steps=True):
            self._cached_checkout(
                ctx.options.remote,
                path=ctx.root,
                ref=super_branch,
                cache=not ctx.options.do_not_cache,
                submodules=ctx.options.initialize_submodules,
                submodule_timeout_sec=ctx.options.submodule_timeout_sec,
                included_submodules=ctx.options.included_submodules,
                excluded_submodules=ctx.options.excluded_submodules,
                use_packfiles=not ctx.options.do_not_use_packfiles,
            )

        with self.m.context(cwd=ctx.root):
            got_revision = None
            got_revision_type = 'no_trigger'

            submodules = []

            if ctx.options.use_trigger:
                got_revision = self.m.git.rev_parse(
                    'HEAD',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'HEAD' * 10,
                    ),
                )

                # Check for CLs for the top-level repository.
                for change in ctx.changes:
                    if ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):
                        self._apply_change(ctx, change)
                        got_revision = change.base
                        got_revision_type = change.base_type

                submodules = ctx.submodules(recursive=True)

                # Check for CLs for submodules.
                # There are three rough cases:
                # 1. Zero submodules have matching remotes. In this case we do
                #    nothing.
                # 2. Exactly one submodule has a matching remote. In this case,
                #    the change is applied to that submodule, even if the branch
                #    does not match.
                # 3. Multiple submodules have matching remotes.
                #    1. Exactly one of them matches the triggering change's
                #       branch. In this case, the change is applied to this
                #       submodule.
                #    2. Zero or multiple submodules match both the remote and
                #       the branch of the triggering change. In these cases we
                #       error out.
                for change in ctx.changes:
                    with self.m.step.nest(f'matching {change.name}') as pres:
                        pres.links['gerrit'] = change.gerrit_url
                        pres.links['gitiles'] = change.gitiles_url

                        matching_submodules = []
                        for submodule in submodules:
                            if submodule.initialized:
                                if ctx.remotes_equivalent(
                                    submodule.remote, change.remote
                                ):
                                    self.m.step.empty(
                                        f'match: {submodule.path} '
                                        f'({submodule.remote})'
                                    )
                                    matching_submodules.append(submodule)
                                else:
                                    self.m.step.empty(
                                        f'no match: {submodule.path} '
                                        f'({submodule.remote})'
                                    )

                        if not matching_submodules:
                            pres.step_summary_text = 'no matching submodules'
                            continue

                        if len(matching_submodules) > 1:
                            submodule_info = ', '.join(
                                f'{self.m.path.relpath(sub.path, ctx.root)} '
                                f'(branch {sub.branch})'
                                for sub in matching_submodules
                            )

                            matching_with_branch = []
                            for submodule in matching_submodules:
                                if submodule.branch == change.branch:
                                    matching_with_branch.append(submodule)

                            if len(matching_with_branch) == 1:
                                pres.step_summary_text = (
                                    'one matching submodule'
                                )
                                matching_submodules = matching_with_branch

                            elif len(matching_with_branch) > 1:
                                pres.step_summary_text = (
                                    'too many submodules match the branch'
                                )
                                raise self.m.step.StepFailure(
                                    f'change {change.name} (branch '
                                    f'{change.branch}) matches multiple '
                                    f'submodules ({submodule_info}), but too '
                                    'many branches match'
                                )

                            else:
                                pres.step_summary_text = (
                                    'zero submodules match the branch'
                                )
                                raise self.m.step.StepFailure(
                                    f'change {change.name} '
                                    f'(branch {change.branch}) matches '
                                    f'multiple submodules ({submodule_info}) '
                                    'but no branches match'
                                )

                    if len(matching_submodules) == 1:
                        submodule = matching_submodules[0]
                        if not ctx.options.initialize_submodules:
                            with self.m.default_timeout():
                                self.m.git.update_submodule(
                                    paths=(submodule.path,)
                                )
                        self._apply_change(ctx, change, cwd=submodule.path)

                ctx.status = self._check_unapplied_changes(ctx.changes)

                def _vars_primitive_only(x):
                    return {
                        k: v
                        for k, v in to_dict(x).items()
                        if isinstance(v, (int, str, bool, type(None)))
                    }

                applied_changes = [
                    _vars_primitive_only(x) for x in ctx.changes if x.applied
                ]
                ctx.changes_json = self.m.path.mkstemp()
                self.m.file.write_json(
                    'write changes.json',
                    ctx.changes_json,
                    applied_changes,
                )

            # Run git log for both the top-level checkout and every submodule.
            with self.m.step.nest('git log'):
                self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10')
                for submodule in sorted(submodules):
                    with self.m.context(cwd=submodule.path):
                        self.m.git(
                            str(submodule.path),
                            'log',
                            '--oneline',
                            '-n',
                            '10',
                        )

            if got_revision:
                with self.m.step.nest('base') as pres:
                    pres.properties['got_revision'] = got_revision
                    # got_revision_type isn't needed by anything but helps
                    # explain why got_revision is the value it is.
                    pres.properties['got_revision_type'] = got_revision_type

    def _matching_branch(self, ctx: CheckoutContext):
        """Return if there are manifest branches that match the triggering CLs.

        If the triggering change is on a branch name that is also present in the
        manifest or superproject remote, use that branch when checking out the
        project.

        Args:
            ctx (CheckoutContext): Context object.

        Raises:
            StepFailure if there are multiple matching branches.

        Returns:
            One matching branch name, or None.
        """
        if not ctx.options.match_branch or not ctx.options.use_trigger:
            with self.m.step.nest('not matching branch names'):
                return

        kind = 'manifest' if ctx.options.use_repo else 'superproject'

        manifest_branch = None
        branch_names = sorted(
            set(
                x.branch
                for x in ctx.changes
                if x.branch not in ('master', 'main', None)
            )
        )

        if not branch_names:
            with self.m.step.nest('no non-standard branch names'):
                return

        with self.m.step.nest('branch names') as pres:
            pres.step_summary_text = str(branch_names)

        matching_branches = self._matching_branches(
            ctx.options.remote, branch_names, name=f'{kind} has branch'
        )
        if not matching_branches:
            with self.m.step.nest('no branch names match'):
                return

        if len(matching_branches) > 1:
            with self.m.step.nest(
                f"too many matching branches ({', '.join(matching_branches)})"
            ) as pres:
                pres.step_summary_text = (
                    "Can't figure out which {} branch to use. Remove some "
                    '"Requires:" lines to simplify the checkout.'.format(kind)
                )
                raise self.m.step.StepFailure('multiple matching branches')

        manifest_branch = matching_branches.pop()
        self.m.step(
            f'changing {kind} branch to {manifest_branch}',
            None,
        )
        return manifest_branch

    def _repo(self, ctx: CheckoutContext):
        """Checkout code from an Android Repo Tool manifest.

        Args:
            remote (str): URL of git repository.
            branch (str): Remote branch to retrieve.
            manifest_file (str): Name of manifest XML file.
            use_trigger (bool): Attempt to apply the triggering change to the
                checkout.
            root (Path): Path to checkout into.
            changes (sequence[Change]): List of triggering changes.

        Returns:
            StatusOfChanges with applied and not applied CLs.
        """

        # Git makes the top-level folder, Repo requires caller to make it.
        self.m.file.ensure_directory('mkdir checkout', ctx.root)

        with self.m.context(cwd=ctx.root):
            manifest_branch = self._matching_branch(ctx) or ctx.options.branch

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_init_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_init_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                if ctx.options.manifest_groups:
                    kwargs['groups'] = ctx.options.manifest_groups

                self.m.repo.init(
                    manifest_url=ctx.options.remote,
                    manifest_branch=manifest_branch,
                    manifest_name=ctx.options.manifest_file,
                    **kwargs,
                )

            manifests_dir = ctx.root / '.repo' / 'manifests'
            # If the triggering CL is a manifest change, apply it before running
            # sync.
            if ctx.options.use_trigger:
                for change in ctx.changes:
                    if change.remote and ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):
                        with self._apply_change_context(
                            ctx, change, cwd=manifests_dir
                        ):
                            # Right now the upstream of 'working' is the local
                            # 'default' branch. 'repo sync' complains if the
                            # upstream isn't remote, so it's changed to the
                            # remote branch that's identical to 'default'.
                            self.m.git(
                                'git branch',
                                'branch',
                                f'--set-upstream-to=origin/{manifest_branch}',
                            )

            ctx.manifest = self._read_manifest(
                ctx.options.remote,
                manifests_dir / ctx.options.manifest_file,
            )

            for _, remote_host in sorted(ctx.manifest.remotes.items()):
                if remote_host.fetch.url.startswith('sso://'):
                    self.m.sso.configure_insteadof(remote_host.fetch.url)

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_sync_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_sync_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                self.m.repo.sync(
                    force_sync=True, current_branch=True, jobs=2, **kwargs
                )
                self.m.repo.start('base')

            if ctx.options.use_trigger:
                for change in ctx.changes:
                    for entry in ctx.manifest.projects:
                        if ctx.remotes_equivalent(entry.url, change.remote):
                            with self._apply_change_context(
                                ctx,
                                change,
                                cwd=entry.path_object(ctx.root),
                            ):
                                with self.m.step.nest(
                                    'compare branch name'
                                ) as pres:
                                    pres.step_summary_text = (
                                        'CL branch: {}\nupstream branch: {}'
                                    ).format(change.branch, entry.upstream)

                ctx.status = self._check_unapplied_changes(ctx.changes)

        # Some dependent projects have everything inside one top-level folder
        # in their repo workspace. For those projects pretend that top-level
        # folder is actually the checkout root. The top member will always
        # point to the actual repo workspace root.
        ctx.top = ctx.root
        files = set(self.m.file.listdir('ls', ctx.root))
        dotrepo = ctx.root / '.repo'
        if dotrepo in files:
            files.remove(dotrepo)
        orig_root = ctx.root
        if len(files) == 1:
            ctx.root = files.pop()

    def _configure_insteadof(self, ctx: CheckoutContext):
        """Configure git to use some urls in place of others."""
        if not ctx.options.rewrites:
            return

        with self.m.step.nest('insteadof'):
            for rewrite in ctx.options.rewrites:
                self.m.git(
                    f"{rewrite.original} to {rewrite.final}",
                    "config",
                    "--global",
                    "--add",
                    f"url.{rewrite.final}.insteadof",
                    rewrite.original,
                )

            self.m.git("rewrites", "config", "--get-regexp", "^url.*")

    def _name(self, options: Options):
        """Turn "https://foo/bar/baz.git" into "baz"."""
        name = options.remote.rstrip('/').removesuffix('.git')
        parts = name.split('/')
        if options.use_repo and parts[-1] == 'manifest':
            parts.pop(-1)
        return f'checkout {parts[-1]}'

    def __call__(
        self,
        options: Options,
        root: config_types.Path | None = None,
        name: str = None,
    ):
        """Checkout code."""

        checkout_name = name or self._name(options)

        assert options.remote

        initial_options = repr(options)
        options.manifest_file = options.manifest_file or 'default.xml'
        options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20
        options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60
        options.number_of_attempts = options.number_of_attempts or 3
        options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60
        final_options = repr(options)

        ctx = CheckoutContext(api=self.m)
        ctx.options = options
        ctx.changes = []
        ctx.root = root or self.m.path.start_dir / 'co'

        for remotes in options.equivalent_remotes:
            new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
            for remote in new_remotes:
                assert remote not in ctx.equivalent_remotes
                ctx.equivalent_remotes[remote] = new_remotes

        with self.m.step.nest(checkout_name) as pres:
            with self.m.step.nest('options') as options_pres:
                options_pres.step_summary_text = initial_options

            with self.m.step.nest('options with defaults') as options_pres:
                options_pres.step_summary_text = final_options

            if options.remote.endswith('.git'):
                options.remote = options.remote[:-4]

            if options.use_trigger:
                ctx.changes = self._change_data(
                    ctx, options.remote, options.branch
                )

            self._configure_insteadof(ctx)

            if options.use_repo:
                self._repo(ctx)

            else:
                self._git(ctx)

            if ctx.status:
                for change in ctx.status.applied:
                    pres.links[f'applied {change.name_with_path}'] = (
                        change.gerrit_url
                    )

                for change in ctx.status.not_applied:
                    pres.links[f'failed to apply {change.name}'] = (
                        change.gerrit_url
                    )

            snapshot_dir = self.m.path.start_dir / 'snapshot'
            ctx.snapshot_to_dir(snapshot_dir)

            ctx.top = ctx.root
            if ctx.options.root_subdirectory:
                ctx.root = ctx.root / ctx.options.root_subdirectory

        return ctx

    def get_revision(
        self,
        root: config_types.Path,
        name: str = 'git log',
        test_data: str = 'HASH',
    ):
        """Like self.revision, but works for secondary checkouts."""
        with self.m.context(cwd=root):
            step = self.m.git(
                name,
                'log',
                '--max-count=1',
                '--pretty=format:%H',
                stdout=self.m.raw_io.output_text(),
                step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                    test_data,
                ),
            )

            result = step.stdout.strip()
            step.presentation.step_summary_text = result
            return result
