# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Calls to checkout code.

Usage:
api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed')
"""

import re
import xml.etree.ElementTree

import attr
from PB.recipe_modules.pigweed.checkout.options import Options
from recipe_engine import config_types, recipe_api
from RECIPE_MODULES.fuchsia.utils import memoize
from six.moves import urllib

PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed'


@attr.s
class Manifest(object):
    remotes = attr.ib(default=attr.Factory(dict))
    projects = attr.ib(default=attr.Factory(list))

    def dict(self):
        return {
            'remotes': {k: v.dict() for k, v in self.remotes.items()},
            'projects': [x.dict() for x in self.projects],
        }


class Url(object):
    def __init__(self, url, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url = url
        self.https = None

    def dict(self):
        return self.__dict__.copy()


@attr.s
class Remote(object):
    """Remote config from manifest."""

    name = attr.ib(type=str)
    fetch = attr.ib(type=Url)
    review = attr.ib(type=str, default=None)
    revision = attr.ib(type=str, default=None)
    alias = attr.ib(type=str, default=None)

    def dict(self):
        res = self.__dict__.copy()
        res['fetch'] = res['fetch'].dict()
        return res


@attr.s
class Project(object):
    """Key variables describing a repository/project."""

    name = attr.ib(type=str)
    path = attr.ib(type=str)
    remote = attr.ib(type=str)
    revision = attr.ib(type=str)
    upstream = attr.ib(type=str)
    url = attr.ib(type=str, default=None)

    def path_object(self, root):
        return root.join(self.path)

    def dict(self):
        return self.__dict__.copy()


def _str_or_none(x):
    if x is None:
        return x
    return str(x)


@attr.s
class Change(object):
    """Data from buildbucket."""

    # TODO(pwbug/465) Remove converters after switch to Python 3.
    number = attr.ib(converter=int)
    bb_input = attr.ib(repr=False)
    remote = attr.ib(converter=_str_or_none)
    ref = attr.ib(converter=_str_or_none)
    rebase = attr.ib(type=bool)
    branch = attr.ib(converter=_str_or_none)
    gerrit_name = attr.ib(converter=_str_or_none)
    submitted = attr.ib(type=bool)
    applied = attr.ib(type=bool, default=False, repr=False)
    base = attr.ib(converter=_str_or_none, default=None)
    base_type = attr.ib(converter=_str_or_none, default=None)
    is_merge = attr.ib(type=bool, default=False)

    @property
    def gerrit_url(self):
        if not self.number:
            return self.gitiles_url
        return 'https://{}-review.googlesource.com/c/{}'.format(
            self.gerrit_name, self.number
        )

    @property
    def gitiles_url(self):
        return '{}/+/{}'.format(self.remote, self.ref)

    @property
    def name(self):
        return '{}:{}'.format(self.gerrit_name, self.number)


@attr.s
class Submodule(object):
    """Submodule properties."""

    api = attr.ib(type=recipe_api.RecipeApi, repr=False)
    hash = attr.ib(type=str)
    relative_path = attr.ib(type=str)
    path = attr.ib(type=config_types.Path)
    name = attr.ib(type=str)
    describe = attr.ib(type=str)
    remote = attr.ib(type=str)
    initialized = attr.ib(type=bool)
    modified = attr.ib(type=bool)
    conflict = attr.ib(type=bool)
    branch = attr.ib(type=str)
    url = attr.ib(type=str)
    update = attr.ib(type=str)
    ignore = attr.ib(type=str)
    shallow = attr.ib(type=bool)
    fetchRecurseSubmodules = attr.ib(type=bool)
    describe = attr.ib(type=str)


@attr.s
class StatusOfChanges(object):
    """Changes that were applied or not applied."""

    applied = attr.ib()  # Tuple of Change.
    not_applied = attr.ib()  # Tuple of Change.


@attr.s
class CheckoutContext(object):
    _api = attr.ib(repr=False)

    # Options protobuf passed in to checkout module.
    options = attr.ib(default=None)

    # List of triggering changes.
    changes = attr.ib(default=None)

    # Actual checkout root.
    top = attr.ib(default=None)

    # Logical checkout root. Usually identical to 'top', but occasionally a
    # subdirectory instead.
    root = attr.ib(default=None)

    # Which triggering changes were applied or not applied.
    status = attr.ib(default=None)

    # Remotes that should be treated identically.
    equivalent_remotes = attr.ib(default=attr.Factory(dict))

    # Parsed repo manifest.
    manifest = attr.ib(default=None)

    # Current revision number.
    def revision(self):
        if hasattr(self, '_revision'):
            return self._revision

        self._revision = self._api.checkout.get_revision(self.root)
        return self._revision

    # Repo manifest with all projects pinned.
    def manifest_snapshot(self):
        if not self.options.use_repo:
            return None

        if hasattr(self, '_manifest_snapshot'):
            return self._manifest_snapshot

        with self._api.context(cwd=self.top):
            self._manifest_snapshot = self._api.repo.manifest_snapshot()
            return self._manifest_snapshot

    # Equivalent of manifest_snapshot() but not as strictly formatted.
    def submodule_snapshot(self):
        if self.options.use_repo:
            return None

        if hasattr(self, '_submodule_snapshot'):
            return self._submodule_snapshot

        with self._api.context(cwd=self.root):
            # To get step_test_data line to pass pylint.
            raw_io_stream_output = self._api.raw_io.test_api.stream_output_text

            self._submodule_snapshot = (
                self._api.git(
                    'submodule-status',
                    'submodule',
                    'status',
                    '--recursive',
                    stdout=self._api.raw_io.output_text(),
                    step_test_data=lambda: raw_io_stream_output(
                        'submodule status filler text',
                    ),
                    ok_ret='any',
                ).stdout.strip()
                or ''
            )
            return self._submodule_snapshot

    def snapshot_to_dir(self, directory):
        self._api.file.ensure_directory('mkdir', directory)
        if self.manifest_snapshot():
            self._api.file.write_text(
                'write manifest.xml',
                directory.join('manifest.xml'),
                self.manifest_snapshot(),
            )

        if self.submodule_snapshot():
            self._api.file.write_text(
                'write submodule snapshot',
                directory.join('submodules.log'),
                self.submodule_snapshot(),
            )

        with self._api.context(cwd=self.root):
            log = self._api.git(
                'log',
                'log',
                '--oneline',
                '-n',
                '10',
                stdout=self._api.raw_io.output_text(),
                ok_ret='any',
            ).stdout
        self._api.file.write_text(
            'write git log', directory.join('git.log'), log,
        )

    def submodules(self, recursive=False):
        """Return data about all submodules."""

        cmd = [
            'python3',
            self._api.checkout.resource('submodule_status.py'),
            self.root,
            self._api.json.output(),
        ]

        if recursive:
            cmd.append('--recursive')

        submodules = []
        submodule_status = self._api.step(
            'submodule status',
            cmd,
            step_test_data=lambda: self._api.json.test_api.output({}),
        ).json.output
        for sub in submodule_status.values():
            sub['remote'] = self._api.sso.sso_to_https(sub['remote'])
            if sub['remote'].endswith('.git'):
                sub['remote'] = sub['remote'][:-4]
            sub['relative_path'] = sub['path']
            sub['path'] = self.root.join(sub['path'])
            submodules.append(Submodule(self._api, **sub))

        return submodules

    _REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$')

    def gerrit_host(self):
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        gerrit_review_host = '{}'.format(match.group('host'))
        if '-review' not in gerrit_review_host:
            gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1)
        return gerrit_review_host

    def gerrit_project(self):
        match = self._REMOTE_REGEX.match(self.options.remote)
        if not match:
            return  # pragma: no cover

        return match.group('project')

    def remotes_equivalent(self, remote1, remote2):
        # Sometimes remote1 or remote2 is None. In that case we shouldn't
        # convert sso to https.
        if remote1:
            remote1 = self._api.sso.sso_to_https(remote1)
        if remote2:
            remote2 = self._api.sso.sso_to_https(remote2)
        if remote1 == remote2:
            return True
        return remote1 in self.equivalent_remotes.get(remote2, ())


class CheckoutApi(recipe_api.RecipeApi):
    """Calls to checkout code."""

    def _read_manifest(self, manifest_remote, manifest_file):
        """Reads manifest file to get git repo locations."""

        with self.m.step.nest('read manifest') as read_step:
            manifest_text = self.m.file.read_text('read file', manifest_file)
            read_step.logs['raw'] = manifest_text

            xml_tree = xml.etree.ElementTree.fromstring(manifest_text)

            manifest = Manifest()

            for remote in xml_tree.iter('remote'):
                remote = Remote(**remote.attrib)
                if remote.fetch.startswith('..'):
                    rest = remote.fetch[2:]
                    parsed = urllib.parse.urlparse(manifest_remote)
                    remote.fetch = (
                        '{}://{}'.format(parsed.scheme, parsed.netloc,) + rest
                    )
                remote.fetch = Url(remote.fetch)
                remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url)
                manifest.remotes[remote.name] = remote

            defaults = {}
            for default in xml_tree.iter('default'):
                defaults.update(default.attrib)

            for project in xml_tree.iter('project'):
                name = project.attrib['name']
                path = project.attrib.get('path', name)

                if 'remote' in project.attrib:
                    remote = project.attrib['remote']
                elif 'remote' in defaults:
                    remote = defaults['remote']
                else:  # pragma: no cover
                    assert False, 'remote not specified for {}'.format(name)

                assert (
                    remote in manifest.remotes
                ), 'Remote {} does not exist'.format(remote)

                if 'revision' in project.attrib:
                    revision = project.attrib['revision']
                elif manifest.remotes[remote].revision:
                    revision = manifest.remotes[remote].revision
                elif 'revision' in defaults:
                    revision = defaults['revision']
                else:  # pragma: no cover
                    assert False, 'revision not specified for {}'.format(name)

                if 'upstream' in project.attrib:
                    upstream = project.attrib['upstream']
                elif 'upstream' in defaults:  # pragma: no cover
                    # This is unlikely to be used and hard to test--it requires
                    # a completely separate manifest definition, otherwise the
                    # 'else' condition won't be covered. It's also simple.
                    upstream = defaults['upstream']
                else:
                    upstream = revision

                # urllib.urljoin does something different than what's desired
                # here.
                url = '/'.join(
                    (
                        manifest.remotes[remote].fetch.https.rstrip('/'),
                        name.lstrip('/'),
                    )
                )
                manifest.projects.append(
                    Project(
                        name=name,
                        path=path,
                        remote=remote,
                        revision=revision,
                        upstream=upstream,
                        url=url,
                    )
                )

            self.m.file.write_json(
                'manifest json',
                self.m.path['start_dir'].join('manifest.json'),
                manifest.dict(),
            )

        return manifest

    def _process_gerrit_change(self, ctx, bb_input, change):
        """Process a LUCI GerritChange and return a Change object."""

        assert change.host
        ref = 'refs/changes/{:02}/{}/{}'.format(
            change.change % 100, change.change, change.patchset,
        )
        host = change.host.replace(
            '-review.googlesource.com', '.googlesource.com'
        )
        remote = 'https://{}/{}'.format(host, change.project).strip('/')
        gerrit_name = host.split('.')[0]
        details = self.m.gerrit.change_details(
            'details',
            change_id=str(change.change),
            host=change.host,
            max_attempts=5,
            query_params=['CURRENT_COMMIT', 'CURRENT_REVISION',],
            timeout=30,
            test_data=self.m.json.test_api.output(
                {
                    'branch': 'main',
                    'current_revision': 'f' * 40,
                    'revisions': {'f' * 40: {'commit': {'parents': [{}],},},},
                }
            ),
        ).json.output
        branch = details['branch']

        rebase = not ctx.options.force_no_rebase

        current_revision = details['revisions'][details['current_revision']]
        is_merge = len(current_revision['commit']['parents']) > 1
        if is_merge:
            rebase = False

        return Change(
            number=change.change,
            bb_input=bb_input,
            remote=remote,
            ref=ref,
            rebase=rebase,
            is_merge=is_merge,
            branch=branch,
            gerrit_name=gerrit_name,
            submitted=False,
        )

    def _process_gerrit_changes(self, ctx, bb_input):
        seen = set()
        for i, change in enumerate(bb_input.gerrit_changes):
            with self.m.step.nest(str(i)):
                result = self._process_gerrit_change(ctx, bb_input, change)
                yield result
                seen.add(result.name)

        deps, unresolved = self.m.cq_deps.resolve(
            result.gerrit_name, result.number,
        )
        for dep in deps:
            # dep.name should only appear in seen if there are multiple
            # gerrit_changes from buildbucket and a later one depends on an
            # earlier one. If buildbucket has multiple gerrit_changes the
            # cq_deps module is not needed here, so this is just double-checking
            # something that shouldn't happen.
            if dep.name in seen:  # pragma: no cover
                continue
            seen.add(dep.name)
            yield self._process_gerrit_change(ctx, bb_input, dep)

        for cl in unresolved:
            yield Change(
                number=cl.change,
                bb_input=None,
                remote=None,
                ref=None,
                rebase=None,
                branch=None,
                gerrit_name=cl.gerrit_name,
                submitted=False,
            )

    def _number_details(self, host, commit_hash, branch='main'):
        try:
            results = self.m.gerrit.change_query(
                'number',
                'commit:{}'.format(commit_hash),
                host=host,
                max_attempts=5,
                timeout=30,
                test_data=self.m.json.test_api.output(
                    [{'_number': '1234', 'branch': branch}]
                ),
            ).json.output
            # Skip this change if it didn't go through Gerrit.
            if results and len(results) == 1:
                return results[0]
        except self.m.step.StepFailure:  # pragma: no cover
            pass

        return None

    def _change_data(self, ctx, remote=None, branch=None):
        bb_input = self.m.buildbucket.build.input
        results = []

        with self.m.step.nest('change data'):
            if bb_input.gerrit_changes:
                with self.m.step.nest('process gerrit changes'):
                    results.extend(self._process_gerrit_changes(ctx, bb_input))

            elif bb_input.gitiles_commit.id:
                with self.m.step.nest('process gitiles commit'):
                    commit = bb_input.gitiles_commit
                    assert commit.host
                    if commit.project:
                        remote = 'https://{}/{}'.format(
                            commit.host, commit.project
                        )

                    host = commit.host.replace(
                        '.googlesource.com', '-review.googlesource.com'
                    )
                    gerrit_name = commit.host.split('.')[0]

                    result = self._number_details(host, commit.id)
                    if result:
                        results.append(
                            Change(
                                number=result['_number'],
                                bb_input=bb_input,
                                remote=remote,
                                ref=commit.id,
                                rebase=False,
                                branch=result['branch'],
                                gerrit_name=gerrit_name,
                                submitted=True,
                            )
                        )

            else:
                # If not triggered by a gitiles_poller gitiles_commit may be
                # empty. In that case treat the most recent commit on the
                # remote as the triggering commit. This is a good assumption
                # except for Android Repo Tool projects, unless all projects
                # are pinned to commits instead of tracking branches. However,
                # even if this is wrong it's close enough to have utility.
                head = self.m.git.get_remote_branch_head(remote, branch)
                gerrit_name = urllib.parse.urlparse(remote).netloc.split('.')[0]
                host = '{}-review.googlesource.com'.format(gerrit_name)
                result = self._number_details(host, head)

                results.append(
                    Change(
                        number=result['_number'] if result else 0,
                        bb_input=bb_input,
                        remote=remote,
                        ref=head,
                        rebase=False,
                        branch=result['branch'] if result else branch,
                        gerrit_name=gerrit_name,
                        submitted=True,
                    )
                )

            with self.m.step.nest('changes'):
                for result in results:
                    with self.m.step.nest(result.name) as change_data_pres:
                        change_data_pres.step_summary_text = repr(result)

            return tuple(results)

    def _matching_branches(self, repo, branches, name='has branch', **kwargs):
        """Returns the subset of the given branches that exist on gitiles."""
        matches = set()
        with self.m.step.nest(name), self.m.context(infra_steps=True):
            for branch in branches:
                head = self.m.git.get_remote_branch_head(
                    repo,
                    branch,
                    step_name='git ls-remote {}'.format(branch),
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        ''
                    ),
                    **kwargs
                )
                with self.m.step.nest('head') as pres:
                    pres.step_summary_text = repr(head)

                if head:
                    matches.add(branch)
        return sorted(matches)

    def _apply_change(self, ctx, change, cwd=None, extra_calls=None):
        """Applies the given change to the given directory.

        Args:
            change (Change): Change to apply.
            cwd (Path): Working directory, defaults to current directory.
            extra_calls (callable): Additional steps to run within the nested
                'apply ...' step and, if specified, within directory cwd.
        """
        kwargs = {'cwd': cwd} if cwd else {}
        change.applied = True

        apply_step = 'apply {}'.format(change.name)
        with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres:
            pres.links['gerrit'] = change.gerrit_url
            pres.links['gitiles'] = change.gitiles_url

            with self.m.context(infra_steps=True):
                # 'git fetch' fails if a submodule pin in the patch isn't
                # present in the remote (for example, if the pin is only
                # present in the uploader's workspace). Use
                # '--no-recurse-submodules' here so 'git fetch' doesn't fail
                # but instead 'git rebase' or 'git submodule update' fails
                # later (important because those are not infra steps). Also
                # don't use '--recurse-submodules' in 'git checkout' for
                # similar reasons.
                self.m.git.fetch(
                    change.remote,
                    change.ref,
                    recurse_submodules=False,
                    step_name='git fetch patch',
                )
                self.m.git(
                    'git checkout patch',
                    'checkout',
                    '--force',
                    '-b',
                    'working',
                    'FETCH_HEAD',
                )

            # These remain unused if change.submitted is False.
            remote = remote_branch = None

            if not change.submitted:
                with self.m.context(infra_steps=True):
                    # Change "https://foo.googlesource.com/bar"
                    #     to "https___foo_googlesource_com_bar".
                    remote = re.sub(r'[^\w]', '_', change.remote)
                    remote_branch = '/'.join((remote, change.branch))
                    self.m.git(
                        'git remote add',
                        'remote',
                        'add',
                        remote,
                        change.remote,
                    )

                    self.m.git('pre-rebase log', 'log', '--oneline', '-n', '10')
                    self.m.git.fetch(
                        remote,
                        'refs/heads/{}'.format(change.branch),
                        prune=False,
                        step_name='git fetch branch',
                    )

                    self.m.git(
                        'git set upstream',
                        'branch',
                        '--set-upstream-to={}'.format(remote_branch),
                    )

            if change.submitted:
                change.base = self.m.git.rev_parse(
                    'HEAD',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'HEAD_' * 8,
                    ),
                )
                change.base_type = 'submitted_commit_hash'

            elif change.rebase:
                self.m.git('git rebase', 'rebase', remote_branch)

                change.base = self.m.git.rev_parse(
                    remote_branch,
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'REMOTE_BRANCH_' * 3,
                    ),
                )
                change.base_type = 'remote_branch_tip'

            else:
                change.base = self.m.git(
                    'merge-base',
                    'merge-base',
                    'HEAD',
                    remote_branch,
                    stdout=self.m.raw_io.output_text(),
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'MERGEBASE_' * 4,
                    ),
                ).stdout
                change.base_type = 'merge-base'

            # In most cases this is redundant or unnecessary, but it shouldn't
            # cause problems. It's necessary when a superproject CL is updating
            # a submodule pin and we need to sync the submodule to the new
            # revision.
            self.m.git.update_submodule(
                recursive=True, timeout=ctx.options.submodule_timeout_sec,
            )

            # TODO(pwbug/233) Make this function a context manager so callers
            # can do the following:
            # with self._apply_change(...):
            #   extra_calls()
            if extra_calls:
                extra_calls()

    def _check_unapplied_changes(self, changes):
        applied = []
        failed_to_apply = []
        if not changes:  # pragma: no cover
            return None

        def handle_unapplied_change(change):
            with self.m.step.nest(
                'failed to apply {}'.format(change.name)
            ) as pres:
                pres.status = 'WARNING'
                pres.links['gerrit'] = change.gerrit_url
                pres.links['gitiles'] = change.gitiles_url
            failed_to_apply.append(change)

        with self.m.context(infra_steps=True):
            if all(not x.applied for x in changes):
                with self.m.step.nest('no changes were applied') as pres:
                    pres.status = 'FAILURE'
                    for change in changes:
                        handle_unapplied_change(change)
                    pres.properties['changes'] = [x.name for x in changes]

                raise self.m.step.InfraFailure(
                    'could not find triggering changes in checkout'
                )

            elif any(not x.applied for x in changes):
                with self.m.step.nest('some changes were not applied') as pres:
                    pres.status = 'WARNING'
                    for change in changes:
                        if change.applied:
                            applied.append(change)
                        else:
                            handle_unapplied_change(change)

            else:
                applied.extend(changes)

        with self.m.step.nest('status') as pres:
            pres.step_summary_text = 'applied {}\nnot applied {}'.format(
                applied, failed_to_apply,
            )

        return StatusOfChanges(
            applied=tuple(applied), not_applied=tuple(failed_to_apply),
        )

    def _cached_checkout(
        self,
        remote,
        path,
        ref,
        submodules,
        included_submodules=None,
        excluded_submodules=None,
        submodule_timeout_sec=10 * 60,
        cache=True,
        **kwargs
    ):
        submodule_paths = included_submodules = included_submodules or []

        if cache:
            with self.m.step.nest('cache'), self.m.cache.guard('git'):
                parsed_remote = urllib.parse.urlparse(remote)
                cache_name = parsed_remote.hostname + parsed_remote.path.replace(
                    '-', '--'
                ).replace(
                    '/', '-'
                )
                cache_path = self.m.path['cache'].join('git', cache_name)
                self.m.file.ensure_directory('makedirs', cache_path)

                with self.m.context(cwd=cache_path):
                    dotgit = cache_path.join('.git')
                    if self.m.path.exists(dotgit):  # pragma: no cover
                        self.m.git.config_remove_section(
                            'remote.origin', **kwargs
                        )
                    else:
                        self.m.git.init(bare=False, **kwargs)

                    self.m.git.config(
                        'remote.origin.url',
                        remote,
                        step_name='remote set-url',
                        **kwargs
                    )
                    self.m.git.config(
                        'fetch.uriprotocols',
                        'https',
                        step_name='set fetch.uriprotocols',
                        **kwargs
                    )

                    self.m.git.fetch(
                        repository='origin',
                        prune=True,
                        tags=True,
                        recurse_submodules=submodules,
                        **kwargs
                    )

                    self.m.git.raw_checkout(
                        ref='FETCH_HEAD', force=True, **kwargs
                    )

                    if included_submodules and excluded_submodules:
                        raise self.m.step.InfraFailure(
                            'cannot specify both included_submodules and '
                            'excluded_submodules'
                        )

                    submodule_paths = included_submodules
                    if excluded_submodules:
                        submodule_status = self.m.git(
                            'submodule status',
                            'submodule',
                            'status',
                            stdout=self.m.raw_io.output_text(),
                            step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                                '-0000000000000000000000000000000000000000 pigweed\n'
                                '-1111111111111111111111111111111111111111 nanopb\n'
                            ),
                        ).stdout.splitlines()

                        submodule_paths = [
                            x.split(None, 1)[1] for x in submodule_status
                        ]
                        for sub in excluded_submodules:
                            if sub not in submodule_paths:
                                raise self.m.step.InfraFailure(
                                    'excluded submodule {} is not a '
                                    'submodule'.format(sub)
                                )
                            with self.m.step.nest(
                                'excluding submodule {}'.format(sub)
                            ):
                                pass
                            submodule_paths.remove(sub)

                    for sub in submodule_paths:
                        with self.m.step.nest(
                            'including submodule {}'.format(sub)
                        ):
                            pass

                    if submodules or submodule_paths:
                        self.m.git.sync_submodule(**kwargs)
                        self.m.git.update_submodule(
                            recursive=True,
                            force=True,
                            paths=submodule_paths,
                            timeout=submodule_timeout_sec,
                            **kwargs
                        )

                    if not submodules:
                        # Even though submodules weren't requested, if the cache
                        # had any active submodules we need to update them.
                        # Otherwise we'll get weird situations in rolls where an
                        # uninvolved submodule will be rolled back.
                        self.m.git.update_submodule(
                            recursive=True,
                            force=True,
                            init=False,
                            timeout=submodule_timeout_sec,
                            **kwargs
                        )

            self.m.file.copytree(
                'copy from cache', cache_path, path, symlinks=True
            )

        self.m.git_checkout(
            repo=remote,
            path=path,
            cache=False,
            revision=ref,
            recursive=submodules,
            submodules=submodules,
            submodule_force=submodules,
            submodule_paths=submodule_paths,
            step_name="",
        )

    def _git(self, ctx):
        """Checkout code from git."""

        super_branch = self._matching_branch(ctx) or ctx.options.branch

        with self.m.context(infra_steps=True):
            self._cached_checkout(
                ctx.options.remote,
                path=ctx.root,
                ref=super_branch,
                cache=not ctx.options.do_not_cache,
                submodules=ctx.options.initialize_submodules,
                submodule_timeout_sec=ctx.options.submodule_timeout_sec,
                included_submodules=ctx.options.included_submodules,
                excluded_submodules=ctx.options.excluded_submodules,
            )

        with self.m.context(cwd=ctx.root):
            got_revision = None
            got_revision_type = 'no_trigger'

            submodules = []

            if ctx.options.use_trigger:
                got_revision = self.m.git.rev_parse(
                    'HEAD',
                    step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
                        'HEAD' * 10,
                    ),
                )

                # Check for CLs for the top-level repository.
                for change in ctx.changes:
                    if ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):
                        self._apply_change(ctx, change)
                        got_revision = change.base
                        got_revision_type = change.base_type

                submodules = ctx.submodules(recursive=True)

                # Check for CLs for submodules.
                for change in ctx.changes:
                    for submodule in submodules:
                        if ctx.remotes_equivalent(
                            submodule.remote, change.remote
                        ):
                            if not ctx.options.initialize_submodules:
                                self.m.git.update_submodule(
                                    paths=(submodule.path,)
                                )
                            self._apply_change(ctx, change, cwd=submodule.path)

                ctx.status = self._check_unapplied_changes(ctx.changes)

            # Run git log for both the top-level checkout and every submodule.
            with self.m.step.nest('git log'):
                self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10')
                for submodule in sorted(submodules):
                    with self.m.context(cwd=submodule.path):
                        self.m.git(
                            str(submodule.path), 'log', '--oneline', '-n', '10',
                        )

            if got_revision:
                with self.m.step.nest('base') as pres:
                    pres.properties['got_revision'] = got_revision
                    # got_revision_type isn't needed by anything but helps
                    # explain why got_revision is the value it is.
                    pres.properties['got_revision_type'] = got_revision_type

    def _matching_branch(self, ctx):
        """Return if there are manifest branches that match the triggering CLs.

        If the triggering change is on a branch name that is also present in the
        manifest or superproject remote, use that branch when checking out the
        project.

        Args:
            ctx (CheckoutContext): Context object.

        Raises:
            StepFailure if there are multiple matching branches.

        Returns:
            One matching branch name, or None.
        """
        if not ctx.options.match_branch or not ctx.options.use_trigger:
            with self.m.step.nest('not matching branch names'):
                return

        kind = 'manifest' if ctx.options.use_repo else 'superproject'

        manifest_branch = None
        branch_names = sorted(
            set(
                x.branch
                for x in ctx.changes
                if x.branch not in ('master', 'main', None)
            )
        )

        if not branch_names:
            with self.m.step.nest('no non-standard branch names'):
                return

        with self.m.step.nest('branch names') as pres:
            pres.step_summary_text = str(branch_names)

        matching_branches = self._matching_branches(
            ctx.options.remote, branch_names, name='{} has branch'.format(kind)
        )
        if not matching_branches:
            with self.m.step.nest('no branch names match'):
                return

        if len(matching_branches) > 1:
            with self.m.step.nest(
                'too many matching branches ({})'.format(
                    ', '.join(matching_branches)
                )
            ) as pres:
                pres.step_summary_text = (
                    "Can't figure out which {} branch to use. Remove some "
                    '"Requires:" lines to simplify the checkout.'.format(kind)
                )
                raise self.m.step.StepFailure('multiple matching branches')

        manifest_branch = matching_branches.pop()
        self.m.step(
            'changing {} branch to {}'.format(kind, manifest_branch), None,
        )
        return manifest_branch

    def _repo(self, ctx):
        """Checkout code from an Android Repo Tool manifest.

        Args:
            remote (str): URL of git repository.
            branch (str): Remote branch to retrieve.
            manifest_file (str): Name of manifest XML file.
            use_trigger (bool): Attempt to apply the triggering change to the
                checkout.
            root (Path): Path to checkout into.
            changes (sequence[Change]): List of triggering changes.

        Returns:
            _StatusOfChanges with applied and not applied CLs.
        """

        # Git makes the top-level folder, Repo requires caller to make it.
        self.m.file.ensure_directory('mkdir checkout', ctx.root)

        with self.m.context(cwd=ctx.root):
            manifest_branch = self._matching_branch(ctx) or ctx.options.branch

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_init_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_init_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                self.m.repo.init(
                    manifest_url=ctx.options.remote,
                    manifest_branch=manifest_branch,
                    manifest_name=ctx.options.manifest_file,
                    **kwargs
                )

            manifests_dir = ctx.root.join('.repo', 'manifests')
            # If the triggering CL is a manifest change, apply it before running
            # sync.
            if ctx.options.use_trigger:
                for change in ctx.changes:
                    if change.remote and ctx.remotes_equivalent(
                        ctx.options.remote, change.remote
                    ):

                        def update_upstream():
                            # Right now the upstream of 'working' is the local
                            # 'default' branch. 'repo sync' complains if the
                            # upstream isn't remote, so it's changed to the
                            # remote branch that's identical to 'default'.
                            self.m.git(
                                'git branch',
                                'branch',
                                '--set-upstream-to=origin/{}'.format(
                                    manifest_branch
                                ),
                            )

                        self._apply_change(
                            ctx,
                            change,
                            cwd=manifests_dir,
                            extra_calls=update_upstream,
                        )

            ctx.manifest = self._read_manifest(
                ctx.options.remote,
                manifests_dir.join(ctx.options.manifest_file),
            )

            for _, remote_host in sorted(ctx.manifest.remotes.items()):
                if remote_host.fetch.url.startswith('sso://'):
                    self.m.sso.configure_insteadof(remote_host.fetch.url)

            with self.m.context(infra_steps=True):
                kwargs = {}
                if ctx.options.repo_sync_timeout_sec:
                    kwargs['timeout'] = ctx.options.repo_sync_timeout_sec
                    kwargs['attempts'] = ctx.options.number_of_attempts
                self.m.repo.sync(
                    force_sync=True, current_branch=True, jobs=2, **kwargs
                )
                self.m.repo.start('base')

            if ctx.options.use_trigger:
                for change in ctx.changes:
                    for entry in ctx.manifest.projects:
                        if ctx.remotes_equivalent(entry.url, change.remote):

                            def compare_branch_name():
                                with self.m.step.nest(
                                    'compare branch name'
                                ) as pres:
                                    pres.step_summary_text = (
                                        'CL branch: {}\nupstream branch: {}'
                                    ).format(change.branch, entry.upstream)

                            self._apply_change(
                                ctx,
                                change,
                                cwd=entry.path_object(ctx.root),
                                extra_calls=compare_branch_name,
                            )

                ctx.status = self._check_unapplied_changes(ctx.changes)

        # Some dependent projects have everything inside one top-level folder
        # in their repo workspace. For those projects pretend that top-level
        # folder is actually the checkout root. The top member will always
        # point to the actual repo workspace root.
        ctx.top = ctx.root
        files = set(self.m.file.listdir('ls', ctx.root))
        dotrepo = ctx.root.join('.repo')
        if dotrepo in files:
            files.remove(dotrepo)
        orig_root = ctx.root
        if len(files) == 1:
            ctx.root = files.pop()

    def _name(self, options):
        """Turn "https://foo/bar/baz.git" into "baz"."""
        name = options.remote.rstrip('/')
        if name.endswith('.git'):
            name = name[:-4]
        parts = name.split('/')
        if options.use_repo and parts[-1] == 'manifest':
            parts.pop(-1)
        return 'checkout {}'.format(parts[-1])

    def __call__(self, options, root=None, name=None):
        """Checkout code."""

        checkout_name = name or self._name(options)

        assert options.remote

        options.manifest_file = options.manifest_file or 'default.xml'
        options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20
        options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60
        options.number_of_attempts = options.number_of_attempts or 3
        options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60

        ctx = CheckoutContext(api=self.m)
        ctx.options = options
        ctx.changes = []
        ctx.root = root or self.m.path['start_dir'].join('checkout')

        for remotes in options.equivalent_remotes:
            new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes]
            for remote in new_remotes:
                assert remote not in ctx.equivalent_remotes
                ctx.equivalent_remotes[remote] = new_remotes

        with self.m.step.nest(checkout_name) as pres:
            if options.remote.endswith('.git'):
                options.remote = options.remote[:-4]

            if options.use_trigger:
                ctx.changes = self._change_data(
                    ctx, options.remote, options.branch
                )

            if options.use_repo:
                self._repo(ctx)

            else:
                self._git(ctx)

            if ctx.status:
                for change in ctx.status.applied:
                    pres.links[
                        'applied {}'.format(change.name)
                    ] = change.gerrit_url

                for change in ctx.status.not_applied:
                    pres.links[
                        'failed to apply {}'.format(change.name)
                    ] = change.gerrit_url

            snapshot_dir = self.m.path['start_dir'].join('snapshot')
            ctx.snapshot_to_dir(snapshot_dir)

            ctx.top = ctx.root
            if ctx.options.root_subdirectory:
                ctx.root = ctx.root.join(ctx.options.root_subdirectory)

        return ctx

    def get_revision(self, root, name='git log', test_data='HASH'):
        """Like self.revision, but works for secondary checkouts."""
        with self.m.context(cwd=root):
            step = self.m.git(
                name,
                'log',
                '--max-count=1',
                '--pretty=format:%H',
                stdout=self.m.raw_io.output_text(),
                step_test_data=lambda: self.test_api.m.raw_io.stream_output_text(
                    test_data,
                ),
            )

            result = step.stdout.strip()
            step.presentation.step_summary_text = result
            return result
