blob: 7e0e15f4ab72ad143b2a2bbd6c26db6c0c372c3f [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Assorted checks that do not require checking out code."""
from __future__ import annotations
import functools
import re
from typing import TYPE_CHECKING
from PB.recipes.pigweed.static_checks import InputProperties
from recipe_engine import post_process
if TYPE_CHECKING: # pragma: no cover
from typing import Any, Generator, Sequence
from recipe_engine import recipe_api, recipe_test_api
from RECIPE_MODULES.pigweed.util import api as util_api
DEPS = [
'fuchsia/gerrit',
'pigweed/checkout',
'pigweed/util',
'recipe_engine/buildbucket',
'recipe_engine/cv',
'recipe_engine/json',
'recipe_engine/path',
'recipe_engine/properties',
'recipe_engine/step',
]
PROPERTIES = InputProperties
def nest_step(func):
@functools.wraps(func)
def wrapper(api, *args, **kwargs):
name = func.__name__.strip('_').replace('_', ' ')
with api.step.nest(name):
return func(api, *args, **kwargs)
return wrapper
@nest_step
def _check_docs(
api: recipe_api.RecipeScriptApi,
details: dict[str, Any],
commit_message: str,
comments: Sequence[str],
doc_patterns: Sequence[str] = (),
) -> None:
"""Check that docs are included in the CL.
If doc_patterns is set make sure at least one document change is in
the CL. Can be overridden by adding "No-Docs-Update-Reason:" to the commit
message, a Gerrit comment, or a patchset description.
"""
if not doc_patterns:
with api.step.nest('doc changes not required'):
return
if 'no-docs-update-reason:' in commit_message.lower():
raise api.step.StepFailure(
'Found "No-Docs-Update-Reason:" in the commit message which is no '
'longer allowed. Please use a Gerrit comment instead (i.e., click '
'"Reply" in the Gerrit UI).'
)
match: re.Match = api.util.find_matching_comment(
re.compile(r'No-Docs-Update-Reason: \w.*'),
comments,
)
if match:
with api.step.nest('found No-Docs-Update-Reason'):
return
with api.step.nest('checking changed files') as pres:
current_revision: str = details['revisions'][
details['current_revision']
]
files: tuple[str] = tuple(current_revision.get('files', ()))
pres.step_summary_text = '\n'.join(files)
for path in files:
if any(re.search(x, path) for x in doc_patterns):
with api.step.nest('found') as pres:
pres.step_summary_text = path
return
# Only check that all files are OWNERS files if there are files. This
# makes testing easier and should have no effect in production except
# to fail some CLs that don't modify files.
if files and all(api.path.basename(x) == 'OWNERS' for x in files):
with api.step.nest('no non-OWNERS files'):
return
error: str = (
'No *.rst or *.md changes in CL and no exception applies. Add docs '
"or explain why they're not needed in a Gerrit comment with "
'"No-Docs-Update-Reason: <reason>".'
)
raise_failure = True
if api.cv.active and api.cv.run_mode != api.cv.FULL_RUN:
if details.get("work_in_progress"):
raise_failure = False
if api.cv.run_mode == api.cv.NEW_PATCHSET_RUN:
raise_failure = False
if raise_failure:
raise api.step.StepFailure(error)
pres = api.step.empty('failure', raise_on_failure=False).presentation
pres.status == 'FAILURE'
pres.step_summary_text = error
return
@nest_step
def _check_requires(
api: recipe_api.RecipeScriptApi,
change: util_api.ChangeWithComments,
details: dict[str, Any],
dry_run: bool = False,
allow_requires: bool = False,
) -> None:
"""Check the format of "Requires:" lines and warn for common mistakes."""
warnings: list[str] = []
def _warn(warning: str) -> None:
with api.step.nest('warning') as pres:
pres.step_summary_text = warning
warnings.append(warning)
has_requires_line: bool = False
for line in details['message'].split('\n'):
if line.startswith('Requires:'):
has_requires_line = True
if not allow_requires and has_requires_line:
_warn(
'Found "Requires:" line {!r} but they are not allowed. See '
'https://pigweed.dev/docs/infra/ci_cq_intro.html#dependent-changes '
'for the new dependent CL process.'.format(line)
)
all_warnings: str = '\n\n'.join(warnings)
if warnings and not dry_run:
api.gerrit.set_review(
'set review',
str(change.change),
host=change.host,
message=all_warnings,
revision=details['current_revision'],
notify='OWNER',
)
if warnings:
raise api.step.StepFailure(all_warnings)
@nest_step
def _check_regexp(
api: recipe_api.RecipeScriptApi,
change: util_api.ChangeWithComments,
details: dict[str, Any],
regexp: re.Pattern | str,
failure_message: str,
) -> None:
if re.match(regexp, details['message']):
with api.step.nest('matches') as pres:
pres.step_summary_text = regexp
return
with api.step.nest('does not match') as pres:
pres.step_summary_text = (
f"{details['message']!r} does not match {regexp!r}"
)
if failure_message:
with api.step.nest('more details') as details:
details.step_summary_text = failure_message
details.status = 'FAILURE'
raise api.step.StepFailure('commit message regexp match failed')
@nest_step
def _check_dns(api, details):
"""Check that the commit message doesn't contain "do not submit"."""
regexes = [
re.compile(r'do.not.(submit|com+it|merge)', re.IGNORECASE),
re.compile(r'\bWIP\b', re.IGNORECASE),
]
for regex in regexes:
match = regex.search(details['message'])
if match:
raise api.step.StepFailure(
f'found "{match.group(0)}" in commit message'
)
@nest_step
def _check_tested(api, details):
"""Check for a "Tested:" line in the commit message."""
for line in details['message'].split('\n'):
if line.lower().startswith('tested:'):
with api.step.nest('found tested line in commit message') as pres:
pres.step_summary_text = line
return
raise api.step.StepFailure(
'could not find "Tested:" line in commit message'
)
def _check_readability(api, details, comments, readability):
"""Set the trivial readability label if the CL is trivial."""
extensions = tuple(str(x) for x in readability.file_extensions)
with api.step.nest(readability.trivial_label):
if readability.trivial_label not in details['labels']:
with api.step.nest('trivial label not present for repo'):
return
current_revision = details['revisions'][details['current_revision']]
changed_lines = 0
changed_files = 0
for filename, entry in current_revision['files'].items():
# TODO: b/240290899 - Maybe don't count deleted files?
if filename.endswith(extensions):
changed_files += 1
changed_lines += entry.get('lines_deleted', 0)
changed_lines += entry.get('lines_inserted', 0)
with api.step.nest(f'changed files {changed_files}'):
pass
with api.step.nest(f'changed lines {changed_lines}'):
pass
with api.step.nest(f'threshold {readability.line_number_threshold}'):
pass
if changed_files and changed_lines < readability.line_number_threshold:
api.gerrit.set_review(
f'set label',
str(details['_number']),
labels={readability.trivial_label: 1},
notify='OWNER',
)
def RunSteps(api, props):
res = api.util.get_change_with_comments()
change = res.change
details = res.details
commit_message = res.commit_message
comments = res.comments
api.cv.set_do_not_retry_build()
# Make it just a little easier to retrieve the current commit message later.
current_revision = details['revisions'][details['current_revision']]
details['message'] = current_revision['commit']['message']
with api.step.nest('checking if revert') as pres:
pres.step_summary_text = str(details['revert_of'])
if details['revert_of']:
with api.step.nest('ignored'):
return
with api.step.nest('checking if merge') as pres:
parents = [x['commit'] for x in current_revision['commit']['parents']]
pres.step_summary_text = str(parents)
if len(parents) > 1:
with api.step.nest('ignored'):
return
with api.step.nest('checking owner account') as pres:
owner = details['owner']['email']
pres.step_summary_text = owner
if owner in props.ignored_accounts:
with api.step.nest('ignored'):
return
_check_docs(api, details, commit_message, comments, props.doc_patterns)
_check_requires(
api,
change,
details,
dry_run=props.dry_run,
allow_requires=False,
)
if props.require_tested:
_check_tested(api, details)
if props.commit_message_regexp:
_check_regexp(
api,
change,
details,
str(props.commit_message_regexp),
str(props.commit_message_regexp_failure_message),
)
_check_dns(api, details)
for readability in props.readability:
_check_readability(api, details, comments, readability)
def change_details(
api,
msg,
description='',
files=(),
owner='nobody@example.com',
revert_of=0,
votes=None,
work_in_progress=None,
num_parents=1,
):
labels = {}
if votes:
for k, v in votes.items():
labels[k] = {}
if v is not None: # pragma: no cover
assert v in "approved recommended disliked rejected".split()
labels[k][v] = True
if isinstance(files, (list, tuple)):
files = {x: {'lines_inserted': 20, 'lines_deleted': 20} for x in files}
details = {
'current_revision': 'HASH',
'revisions': {
'HASH': {
'_number': 1,
'commit': {
'subject': msg.split('\n')[0],
'message': msg,
'parents': [
{'commit': f'PARENT{i}'} for i in range(num_parents)
],
},
'files': files,
'description': description,
}
},
'owner': {'email': owner},
'revert_of': revert_of,
'labels': labels,
'_number': 1234,
}
if work_in_progress:
details['work_in_progress'] = work_in_progress
return api.step_data('change details', api.json.output(details))
def must_run(api, step):
return api.post_process(post_process.MustRun, step)
def does_not_run(api, step):
return api.post_process(post_process.DoesNotRun, step)
def requires_tests(api):
def check_num_warnings(n):
if not n:
return api.post_process(
post_process.DoesNotRun, 'check requires.warning'
)
else:
# Make sure the first warning is found.
result = must_run(api, 'check requires.warning')
# Make sure the nth warning is also found.
if n > 1: # pragma: no cover
result += must_run(api, f'check requires.warning ({n})')
# Make sure the nth warning is the last one.
result += api.post_process(
post_process.DoesNotRun,
f'check requires.warning ({n + 1})',
)
return result
def requires_test(name, msg, warnings=0, votes=None, **kwargs):
return api.test(
f'requires.{name}',
api.properties(InputProperties(**kwargs)),
api.checkout.try_test_data(),
change_details(api, msg, votes=votes),
check_num_warnings(warnings),
status='FAILURE' if warnings else 'SUCCESS',
)
yield requires_test('no-requires', '')
yield requires_test(
'first-paragraph',
'Requires: foo:123\n\nChange-Id: I1',
warnings=1,
)
yield requires_test(
'middle-paragraph',
'Foo\n\nRequires: foo:123\n\nChange-Id: I1',
warnings=1,
)
def tested_tests(api):
def test(name, msg, *args, status='SUCCESS', require_tested=True, **kwargs):
return api.test(
f'tested.{name}',
api.properties(
InputProperties(require_tested=require_tested, **kwargs)
),
api.checkout.try_test_data(),
change_details(api, msg),
status=status,
)
def tested_passes():
return must_run(api, 'check tested.found tested line in commit message')
def owner_ignored():
return must_run(api, 'checking owner account.ignored')
yield test(
'tested-pass',
'Tested: maybe',
tested_passes(),
)
yield test('tested-fail', 'Test: nope', status='FAILURE')
yield test(
'tested-skip',
'Test: nope',
owner_ignored(),
ignored_accounts=['nobody@example.com'],
)
def docs_tests(api):
def test(name, *args, msg='', status='SUCCESS', properties=None, **kwargs):
final_props = {'doc_patterns': [r'\.rst$', r'\.md$']}
if properties:
final_props.update(properties)
return api.test(
f'docs.{name}',
api.properties(InputProperties(**final_props)),
change_details(api, msg=msg, **kwargs),
*args,
status=status,
)
def comment(with_exception):
message = 'Unrelated comment'
if with_exception:
message = 'No-Docs-Update-Reason: laziness'
return api.util.change_comment(message, prefix='')
yield test(
'no_doc_changes',
comment(with_exception=False),
api.cv(run_mode='DRY_RUN'),
api.checkout.try_test_data(),
status='FAILURE',
)
yield test(
'no_doc_changes_new_patchset_run',
api.cv(run_mode=api.cv.NEW_PATCHSET_RUN),
api.checkout.try_test_data(),
)
yield test(
'has_doc_changes',
api.checkout.try_test_data(),
must_run(api, 'check docs.checking changed files.found'),
files=('foo.rst',),
)
yield test(
'no_doc_changes_properties',
api.cv(run_mode='DRY_RUN'),
api.checkout.try_test_data(),
status='FAILURE',
files=('foo.rst',),
properties={'doc_patterns': [r'\.gn$']},
)
yield test(
'has_doc_changes_properties',
api.checkout.try_test_data(),
must_run(api, 'check docs.checking changed files.found'),
files=('foo.gn',),
properties={'doc_patterns': [r'\.gn$']},
)
yield test(
'commit-message-ignored',
api.checkout.try_test_data(),
status='FAILURE',
msg='No-Docs-Update-Reason: foo',
)
yield test(
'owners-files',
api.checkout.try_test_data(),
must_run(api, 'check docs.checking changed files.no non-OWNERS files'),
files=('foo/OWNERS', 'OWNERS'),
)
yield test(
'gerrit-comment',
api.checkout.try_test_data(),
comment(with_exception=True),
must_run(api, 'check docs.checking comments.found'),
)
yield test(
'owner',
api.checkout.try_test_data(),
must_run(api, 'checking owner account.ignored'),
owner='roller@example.com',
properties={'ignored_accounts': ['roller@example.com']},
)
yield test(
'revert',
api.checkout.try_test_data(),
must_run(api, 'checking if revert.ignored'),
revert_of=123456,
)
yield test(
'merge',
api.checkout.try_test_data(),
must_run(api, 'checking if merge.ignored'),
num_parents=2,
)
yield test(
'wip_dry_run',
api.cv(run_mode='DRY_RUN'),
api.checkout.try_test_data(),
must_run(api, 'check docs.failure'),
work_in_progress=True,
)
yield test(
'wip_full_run',
api.cv(run_mode='FULL_RUN'),
api.checkout.try_test_data(),
does_not_run(api, 'check_docs.failure'),
work_in_progress=True,
status='FAILURE',
)
def readability_tests(api):
def test(name, *args, msg='', status='SUCCESS', **kwargs):
final_props = {
'readability': [
{
'trivial_label': 'Readability-Trivial',
'file_extensions': (
'.c .cc .cpp .cxx .C .h .hh .hpp .hxx .H'.split()
),
'line_number_threshold': 10,
},
],
}
votes = kwargs.pop('votes', {'Readability-Trivial': None})
return api.test(
f'readability.{name}',
api.properties(**final_props),
change_details(
api,
msg=msg,
votes=votes,
**kwargs,
),
api.cv(api.cv.FULL_RUN),
*args,
status=status,
)
def assert_changed_files(num):
return api.post_process(
post_process.MustRun,
f'Readability-Trivial.changed files {num}',
)
def assert_changed_lines(num):
return api.post_process(
post_process.MustRun,
f'Readability-Trivial.changed lines {num}',
)
def assert_set():
return api.post_process(
post_process.MustRun,
'Readability-Trivial.set label',
)
def assert_not_set():
return api.post_process(
post_process.DoesNotRun,
'Readability-Trivial.set label',
)
yield test(
'no_label',
api.checkout.try_test_data(),
assert_not_set(),
votes={},
)
yield test(
'no_cpp_changes',
api.checkout.try_test_data(),
assert_changed_files(0),
assert_changed_lines(0),
assert_not_set(),
)
yield test(
'inserted_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(15),
assert_not_set(),
files={'foo.cpp': {'lines_inserted': 15}},
)
yield test(
'inserted_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(5),
assert_set(),
files={'foo.cpp': {'lines_inserted': 5}},
)
yield test(
'removed_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(15),
assert_not_set(),
files={'foo.cpp': {'lines_deleted': 15}},
)
yield test(
'removed_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(5),
assert_set(),
files={'foo.cpp': {'lines_deleted': 5}},
)
yield test(
'mixed_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(10),
assert_not_set(),
files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 5}},
)
yield test(
'mixed_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(9),
assert_set(),
files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 4}},
)
yield test(
'multiple_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(5),
assert_changed_lines(10),
assert_not_set(),
files={
'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.c': {'lines_inserted': 1, 'lines_deleted': 1},
},
)
yield test(
'multiple_trivial',
api.checkout.try_test_data(),
assert_changed_files(4),
assert_changed_lines(8),
assert_set(),
files={
'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
},
)
def regexp_tests(api):
def test(
name,
msg,
*args,
regexp,
failure_msg='',
status='SUCCESS',
**kwargs,
):
return api.test(
f'regexp.{name}',
api.properties(
commit_message_regexp=regexp,
commit_message_regexp_failure_message=failure_msg,
),
api.checkout.try_test_data(),
change_details(api, msg),
*args,
status=status,
)
yield test(
'pass',
'Bug: b/99999999999999999999999999999999999999999999999999999',
must_run(api, 'check regexp.matches'),
regexp='.*Bug: b/\d+.*',
)
yield test(
'fail',
'message',
must_run(api, 'check regexp.does not match'),
regexp='.*Bug: b/\d+.*',
status='FAILURE',
)
yield test(
'fail-msg',
'message',
must_run(api, 'check regexp.does not match'),
must_run(api, 'check regexp.more details'),
regexp='.*Bug: b/\d+.*',
failure_msg='Did not reference a bug.',
status='FAILURE',
)
def GenTests(api) -> Generator[recipe_test_api.TestData, None, None]:
for test in requires_tests(api):
yield test
for test in tested_tests(api):
yield test
for test in docs_tests(api):
yield test
for test in readability_tests(api):
yield test
for test in regexp_tests(api):
yield test
yield api.test(
'do-not-submit',
api.buildbucket.try_build(),
change_details(api, 'Do_NoT-sUbMiT'),
status='FAILURE',
)