# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Assorted checks that do not require checking out code."""

import functools
import re
from typing import Any, Generator, Sequence

from PB.recipes.pigweed.static_checks import InputProperties
from recipe_engine import post_process, recipe_api, recipe_test_api
from RECIPE_MODULES.pigweed.util import api as util_api

DEPS = [
    'fuchsia/gerrit',
    'pigweed/checkout',
    'pigweed/util',
    'recipe_engine/buildbucket',
    'recipe_engine/cv',
    'recipe_engine/json',
    'recipe_engine/path',
    'recipe_engine/properties',
    'recipe_engine/step',
]

PROPERTIES = InputProperties


def nest_step(func):
    @functools.wraps(func)
    def wrapper(api, *args, **kwargs):
        name = func.__name__.strip('_').replace('_', ' ')
        with api.step.nest(name):
            return func(api, *args, **kwargs)

    return wrapper


@nest_step
def _check_docs(
    api: recipe_api.RecipeScriptApi,
    details: dict[str, Any],
    commit_message: str,
    comments: Sequence[str],
    doc_file_extensions: Sequence[str] = (),
) -> None:
    """Check that docs are included in the CL.

    If doc_file_extensions is set make sure at least one document change is in
    the CL. Can be overridden by adding "No-Docs-Update-Reason:" to the commit
    message, a Gerrit comment, or a patchset description.
    """
    if not doc_file_extensions:
        with api.step.nest('doc changes not required'):
            return

    if 'no-docs-update-reason:' in commit_message.lower():
        raise api.step.StepFailure(
            'Found "No-Docs-Update-Reason:" in the commit message which is no '
            'longer allowed. Please use a Gerrit comment instead (i.e., click '
            '"Reply" in the Gerrit UI).'
        )

    match: re.Match = api.util.find_matching_comment(
        re.compile(r'No-Docs-Update-Reason: \w.*'),
        comments,
    )
    if match:
        with api.step.nest('found No-Docs-Update-Reason'):
            return

    with api.step.nest('checking changed files') as pres:
        current_revision: str = details['revisions'][
            details['current_revision']
        ]
        files: tuple[str] = tuple(current_revision.get('files', ()))
        pres.step_summary_text = '\n'.join(files)
        for path in files:
            if path.endswith(tuple(str(x) for x in doc_file_extensions)):
                with api.step.nest('found') as pres:
                    pres.step_summary_text = path
                    return

        # Only check that all files are OWNERS files if there are files. This
        # makes testing easier and should have no effect in production except
        # to fail some CLs that don't modify files.
        if files and all(api.path.basename(x) == 'OWNERS' for x in files):
            with api.step.nest('no non-OWNERS files'):
                return

    error: str = (
        'No *.rst or *.md changes in CL and no exception applies. Add docs '
        "or explain why they're not needed in a Gerrit comment with "
        '"No-Docs-Update-Reason: <reason>".'
    )

    if api.cv.active and api.cv.run_mode == api.cv.NEW_PATCHSET_RUN:
        with api.step.nest('failure') as pres:
            pres.status == 'FAILURE'
            pres.step_summary_text = error
    else:
        raise api.step.StepFailure(error)


@nest_step
def _check_requires(
    api: recipe_api.RecipeScriptApi,
    change: util_api.ChangeWithComments,
    details: dict[str, Any],
    dry_run: bool = False,
    allow_requires: bool = False,
) -> None:
    """Check the format of "Requires:" lines and warn for common mistakes."""
    warnings: list[str] = []

    def _warn(warning: str) -> None:
        with api.step.nest('warning') as pres:
            pres.step_summary_text = warning
        warnings.append(warning)

    has_requires_line: bool = False
    for line in details['message'].split('\n'):
        if line.startswith('Requires:'):
            has_requires_line = True

    if not allow_requires and has_requires_line:
        _warn(
            'Found "Requires:" line {!r} but they are not allowed. See '
            'https://pigweed.dev/docs/infra/ci_cq_intro.html#dependent-changes '
            'for the new dependent CL process.'.format(line)
        )

    all_warnings: str = '\n\n'.join(warnings)

    if warnings and not dry_run:
        api.gerrit.set_review(
            'set review',
            str(change.change),
            host=change.host,
            message=all_warnings,
            revision=details['current_revision'],
            notify='OWNER',
        )

    if warnings:
        raise api.step.StepFailure(all_warnings)


@nest_step
def _check_regexp(
    api: recipe_api.RecipeScriptApi,
    change: util_api.ChangeWithComments,
    details: dict[str, Any],
    regexp: re.Pattern | str,
    failure_message: str,
) -> None:
    if re.match(regexp, details['message']):
        with api.step.nest('matches') as pres:
            pres.step_summary_text = regexp
            return

    with api.step.nest('does not match') as pres:
        pres.step_summary_text = (
            f"{details['message']!r} does not match {regexp!r}"
        )

    if failure_message:
        with api.step.nest('more details') as details:
            details.step_summary_text = failure_message
            details.status = 'FAILURE'

    raise api.step.StepFailure('commit message regexp match failed')


@nest_step
def _check_dns(api, details):
    """Check that the commit message doesn't contain "do not submit"."""

    regexes = [
        re.compile(r'do.not.(submit|com+it|merge)', re.IGNORECASE),
        re.compile(r'\bWIP\b', re.IGNORECASE),
    ]

    for regex in regexes:
        match = regex.search(details['message'])
        if match:
            raise api.step.StepFailure(
                f'found "{match.group(0)}" in commit message'
            )


@nest_step
def _check_tested(api, details):
    """Check for a "Tested:" line in the commit message."""

    for line in details['message'].split('\n'):
        if line.lower().startswith('tested:'):
            with api.step.nest('found tested line in commit message') as pres:
                pres.step_summary_text = line
                return

    raise api.step.StepFailure(
        'could not find "Tested:" line in commit message'
    )


def _check_readability(api, details, comments, readability):
    """Set the trivial readability label if the CL is trivial."""

    extensions = tuple(str(x) for x in readability.file_extensions)

    with api.step.nest(readability.trivial_label):
        if readability.trivial_label not in details['labels']:
            with api.step.nest('trivial label not present for repo'):
                return

        current_revision = details['revisions'][details['current_revision']]
        changed_lines = 0
        changed_files = 0
        for filename, entry in current_revision['files'].items():
            # TODO: b/240290899 - Maybe don't count deleted files?
            if filename.endswith(extensions):
                changed_files += 1
                changed_lines += entry.get('lines_deleted', 0)
                changed_lines += entry.get('lines_inserted', 0)

        with api.step.nest(f'changed files {changed_files}'):
            pass

        with api.step.nest(f'changed lines {changed_lines}'):
            pass

        with api.step.nest(f'threshold {readability.line_number_threshold}'):
            pass

        if changed_files and changed_lines < readability.line_number_threshold:
            api.gerrit.set_review(
                f'set label',
                str(details['_number']),
                labels={readability.trivial_label: 1},
                notify='OWNER',
            )


def RunSteps(api, props):
    res = api.util.get_change_with_comments()
    change = res.change
    details = res.details
    commit_message = res.commit_message
    comments = res.comments

    api.cv.set_do_not_retry_build()

    # Make it just a little easier to retrieve the current commit message later.
    current_revision = details['revisions'][details['current_revision']]
    details['message'] = current_revision['commit']['message']

    with api.step.nest('checking if revert') as pres:
        pres.step_summary_text = str(details['revert_of'])
        if details['revert_of']:
            with api.step.nest('ignored'):
                return

    with api.step.nest('checking owner account') as pres:
        owner = details['owner']['email']
        pres.step_summary_text = owner
        if owner in props.ignored_accounts:
            with api.step.nest('ignored'):
                return

    _check_docs(api, details, commit_message, comments, props.doc_extensions)

    _check_requires(
        api,
        change,
        details,
        dry_run=props.dry_run,
        allow_requires=False,
    )

    if props.require_tested:
        _check_tested(api, details)

    if props.commit_message_regexp:
        _check_regexp(
            api,
            change,
            details,
            str(props.commit_message_regexp),
            str(props.commit_message_regexp_failure_message),
        )

    _check_dns(api, details)

    for readability in props.readability:
        _check_readability(api, details, comments, readability)


def change_details(
    api,
    msg,
    description='',
    files=(),
    owner='nobody@example.com',
    revert_of=0,
    votes=None,
):
    labels = {}
    if votes:
        for k, v in votes.items():
            labels[k] = {}
            if v is not None:  # pragma: no cover
                assert v in "approved recommended disliked rejected".split()
                labels[k][v] = True

    if isinstance(files, (list, tuple)):
        files = {x: {'lines_inserted': 20, 'lines_deleted': 20} for x in files}

    return api.step_data(
        'change details',
        api.json.output(
            {
                'current_revision': 'HASH',
                'revisions': {
                    'HASH': {
                        '_number': 1,
                        'commit': {
                            'subject': msg.split('\n')[0],
                            'message': msg,
                            'parents': [
                                {
                                    'commit': 'PARENT',
                                }
                            ],
                        },
                        'files': files,
                        'description': description,
                    }
                },
                'owner': {'email': owner},
                'revert_of': revert_of,
                'labels': labels,
                '_number': 1234,
            },
        ),
    )


def must_run(api, step):
    return api.post_process(post_process.MustRun, step)


def requires_tests(api):
    def check_num_warnings(n):
        if not n:
            return api.post_process(
                post_process.DoesNotRun, 'check requires.warning'
            )

        else:
            # Make sure the first warning is found.
            result = must_run(api, 'check requires.warning')

            # Make sure the nth warning is also found.
            if n > 1:  # pragma: no cover
                result += must_run(api, f'check requires.warning ({n})')

            # Make sure the nth warning is the last one.
            result += api.post_process(
                post_process.DoesNotRun,
                f'check requires.warning ({n + 1})',
            )

            return result

    def requires_test(name, msg, warnings=0, votes=None, **kwargs):
        status = 'FAILURE' if warnings else 'SUCCESS'
        res = api.test(f'requires.{name}', status=status)
        res += api.properties(InputProperties(**kwargs))
        res += api.checkout.try_test_data()
        res += change_details(api, msg, votes=votes)
        res += check_num_warnings(warnings)
        return res

    yield requires_test('no-requires', '')

    yield requires_test(
        'first-paragraph',
        'Requires: foo:123\n\nChange-Id: I1',
        warnings=1,
    )
    yield requires_test(
        'middle-paragraph',
        'Foo\n\nRequires: foo:123\n\nChange-Id: I1',
        warnings=1,
    )


def tested_tests(api):
    def test(name, msg, status='SUCCESS', require_tested=True, **kwargs):
        res = api.test(f'tested.{name}', status=status)
        res += api.properties(
            InputProperties(require_tested=require_tested, **kwargs)
        )
        res += api.checkout.try_test_data()
        res += change_details(api, msg)
        return res

    def tested_passes():
        return must_run(api, 'check tested.found tested line in commit message')

    def owner_ignored():
        return must_run(api, 'checking owner account.ignored')

    yield test(
        'tested-pass',
        'Tested: maybe',
    ) + tested_passes()

    yield test('tested-fail', 'Test: nope', status='FAILURE')

    yield (
        test(
            'tested-skip',
            'Test: nope',
            ignored_accounts=['nobody@example.com'],
        )
        + owner_ignored()
    )


def docs_tests(api):
    def test(name, msg='', status='SUCCESS', properties=None, **kwargs):
        final_props = {'doc_extensions': ['.rst', '.md']}
        if properties:
            final_props.update(properties)
        return (
            api.test(f'docs.{name}', status=status)
            + api.properties(InputProperties(**final_props))
            + change_details(api, msg=msg, **kwargs)
        )

    def comment(with_exception):
        message = 'Unrelated comment'
        if with_exception:
            message = 'No-Docs-Update-Reason: laziness'

        return api.util.change_comment(message, prefix='')

    yield (
        test('no_doc_changes', status='FAILURE')
        + comment(with_exception=False)
        + api.cv(run_mode='DRY_RUN')
        + api.checkout.try_test_data()
    )

    yield (
        test('no_doc_changes_new_patchset_run')
        + api.cv(run_mode=api.cv.NEW_PATCHSET_RUN)
        + api.checkout.try_test_data()
    )

    yield (
        test('has_doc_changes', files=('foo.rst',))
        + api.checkout.try_test_data()
        + must_run(api, 'check docs.checking changed files.found')
    )

    yield (
        test(
            'no_doc_changes_properties',
            status='FAILURE',
            files=('foo.rst',),
            properties={'doc_extensions': ['.gn']},
        )
        + api.cv(run_mode='DRY_RUN')
        + api.checkout.try_test_data()
    )

    yield (
        test(
            'has_doc_changes_properties',
            files=('foo.gn',),
            properties={'doc_extensions': ['.gn']},
        )
        + api.checkout.try_test_data()
        + must_run(api, 'check docs.checking changed files.found')
    )

    yield (
        test(
            'commit-message-ignored',
            status='FAILURE',
            msg='No-Docs-Update-Reason: foo',
        )
        + api.checkout.try_test_data()
    )

    yield (
        test('owners-files', files=('foo/OWNERS', 'OWNERS'))
        + api.checkout.try_test_data()
        + must_run(api, 'check docs.checking changed files.no non-OWNERS files')
    )

    yield (
        test('gerrit-comment')
        + api.checkout.try_test_data()
        + comment(with_exception=True)
        + must_run(api, 'check docs.checking comments.found')
    )

    yield (
        test(
            'owner',
            owner='roller@example.com',
            properties={'ignored_accounts': ['roller@example.com']},
        )
        + api.checkout.try_test_data()
        + must_run(api, 'checking owner account.ignored')
    )

    yield (
        test('revert', revert_of=123456)
        + api.checkout.try_test_data()
        + must_run(api, 'checking if revert.ignored')
    )


def readability_tests(api):
    def test(name, msg='', status='SUCCESS', **kwargs):
        final_props = {
            'readability': [
                {
                    'trivial_label': 'Readability-Trivial',
                    'file_extensions': (
                        '.c .cc .cpp .cxx .C .h .hh .hpp .hxx .H'.split()
                    ),
                    'line_number_threshold': 10,
                },
            ],
        }

        votes = kwargs.pop('votes', {'Readability-Trivial': None})

        return (
            api.test(f'readability.{name}', status=status)
            + api.properties(**final_props)
            + change_details(
                api,
                msg=msg,
                votes=votes,
                **kwargs,
            )
            + api.cv(api.cv.FULL_RUN)
        )

    def assert_changed_files(num):
        return api.post_process(
            post_process.MustRun,
            f'Readability-Trivial.changed files {num}',
        )

    def assert_changed_lines(num):
        return api.post_process(
            post_process.MustRun,
            f'Readability-Trivial.changed lines {num}',
        )

    def assert_set():
        return api.post_process(
            post_process.MustRun,
            'Readability-Trivial.set label',
        )

    def assert_not_set():
        return api.post_process(
            post_process.DoesNotRun,
            'Readability-Trivial.set label',
        )

    yield (
        test('no_label', votes={})
        + api.checkout.try_test_data()
        + assert_not_set()
    )

    yield (
        test('no_cpp_changes')
        + api.checkout.try_test_data()
        + assert_changed_files(0)
        + assert_changed_lines(0)
        + assert_not_set()
    )

    yield (
        test(
            'inserted_nontrivial',
            files={'foo.cpp': {'lines_inserted': 15}},
        )
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(15)
        + assert_not_set()
    )

    yield (
        test(
            'inserted_trivial',
            files={'foo.cpp': {'lines_inserted': 5}},
        )
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(5)
        + assert_set()
    )

    yield (
        test('removed_nontrivial', files={'foo.cpp': {'lines_deleted': 15}})
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(15)
        + assert_not_set()
    )

    yield (
        test('removed_trivial', files={'foo.cpp': {'lines_deleted': 5}})
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(5)
        + assert_set()
    )

    yield (
        test(
            'mixed_nontrivial',
            files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 5}},
        )
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(10)
        + assert_not_set()
    )

    yield (
        test(
            'mixed_trivial',
            files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 4}},
        )
        + api.checkout.try_test_data()
        + assert_changed_files(1)
        + assert_changed_lines(9)
        + assert_set()
    )

    yield (
        test(
            'multiple_nontrivial',
            files={
                'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.c': {'lines_inserted': 1, 'lines_deleted': 1},
            },
        )
        + api.checkout.try_test_data()
        + assert_changed_files(5)
        + assert_changed_lines(10)
        + assert_not_set()
    )

    yield (
        test(
            'multiple_trivial',
            files={
                'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
                'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
            },
        )
        + api.checkout.try_test_data()
        + assert_changed_files(4)
        + assert_changed_lines(8)
        + assert_set()
    )


def regexp_tests(api):
    def test(name, msg, regexp, failure_msg='', status='SUCCESS', **kwargs):
        res = api.test(f'regexp.{name}', status=status)
        res += api.properties(
            commit_message_regexp=regexp,
            commit_message_regexp_failure_message=failure_msg,
        )
        res += api.checkout.try_test_data()
        res += change_details(api, msg)
        return res

    yield (
        test(
            'pass',
            'Bug: b/99999999999999999999999999999999999999999999999999999',
            regexp='.*Bug: b/\d+.*',
        )
        + must_run(api, 'check regexp.matches')
    )

    yield (
        test(
            'fail',
            'message',
            regexp='.*Bug: b/\d+.*',
            status='FAILURE',
        )
        + must_run(api, 'check regexp.does not match')
    )

    yield (
        test(
            'fail-msg',
            'message',
            regexp='.*Bug: b/\d+.*',
            failure_msg='Did not reference a bug.',
            status='FAILURE',
        )
        + must_run(api, 'check regexp.does not match')
        + must_run(api, 'check regexp.more details')
    )


def GenTests(api) -> Generator[recipe_test_api.TestData, None, None]:
    for test in requires_tests(api):
        yield test

    for test in tested_tests(api):
        yield test

    for test in docs_tests(api):
        yield test

    for test in readability_tests(api):
        yield test

    for test in regexp_tests(api):
        yield test

    yield (
        api.test('do-not-submit', status='FAILURE')
        + api.buildbucket.try_build()
        + change_details(api, 'Do_NoT-sUbMiT')
    )
