blob: 96b65cefd2ef649caced2f9dfd0a6970a3d99732 [file] [log] [blame] [edit]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Performs static checks on Gerrit changes without needing a full checkout.
This recipe module is designed to be run in a CI/CQ environment to validate
Gerrit changes against a set of configurable rules. These checks primarily focus
on the metadata of a change, such as the commit message and file modifications,
rather than the code's functional correctness.
Key checks performed by this module include:
- Commit message regex: Validates the commit message against a custom regular
expression.
- "Do Not Submit" (DNS) / "Work In Progress" (WIP): Checks for markers like
"DO NOT SUBMIT" or "WIP" in the commit message and can optionally mark the
change as WIP on Gerrit.
- "Tested:" line: Verifies the presence of a "Tested:" line in the commit
message.
- Readability labels: Automatically applies a "trivial" readability label
(e.g., "Readability-Trivial") if the changes to specific file types are
below a certain line count threshold.
The behavior of these checks is controlled by the `InputProperties` defined in
the `recipes.pigweed.static_checks.InputProperties` protocol buffer message.
The module interacts with Gerrit to post review comments, set labels, or
modify the change status (e.g., marking as WIP) based on the check results.
It also handles common scenarios like ignoring checks for revert changes,
merge commits, or changes made by allowlisted accounts.
"""
from __future__ import annotations
import functools
import re
from typing import TYPE_CHECKING
from PB.recipes.pigweed.static_checks import InputProperties
from recipe_engine import post_process
if TYPE_CHECKING: # pragma: no cover
from collections.abc import Iterator
from typing import Any, Sequence
from recipe_engine import recipe_api, recipe_test_api
from RECIPE_MODULES.pigweed.util import api as util_api
DEPS = [
'fuchsia/gerrit',
'pigweed/checkout',
'pigweed/util',
'recipe_engine/buildbucket',
'recipe_engine/cv',
'recipe_engine/json',
'recipe_engine/properties',
'recipe_engine/step',
]
PROPERTIES = InputProperties
def nest_step(func):
@functools.wraps(func)
def wrapper(api, *args, **kwargs):
name = func.__name__.strip('_').replace('_', ' ')
with api.step.nest(name):
return func(api, *args, **kwargs)
return wrapper
@nest_step
def _check_regexp(
api: recipe_api.RecipeScriptApi,
change: util_api.ChangeWithComments,
details: dict[str, Any],
regexp: re.Pattern | str,
failure_message: str,
) -> None:
if re.match(regexp, details['message']):
with api.step.nest('matches') as pres:
pres.step_summary_text = regexp
return
with api.step.nest('does not match') as pres:
pres.step_summary_text = (
f"{details['message']!r} does not match {regexp!r}"
)
if failure_message:
with api.step.nest('more details') as details:
details.step_summary_text = failure_message
details.status = 'FAILURE'
raise api.step.StepFailure('commit message regexp match failed')
@nest_step
def _check_dns(api, details, *, dry_run):
"""Check that the commit message doesn't contain "do not submit"."""
regexes = [
re.compile(r'do.not.(submit|com+it|merge)', re.IGNORECASE),
re.compile(r'\bWIP\b', re.IGNORECASE),
re.compile(r'\bwork.in.progres+\b', re.IGNORECASE),
]
match: re.Pattern | None = None
for regex in regexes:
if match := regex.search(details['message']):
break
else:
return
# We only get here if the current change has matched "work in progress",
# "do not commit", or similar.
if api.cv.active and api.cv.run_mode == api.cv.FULL_RUN:
raise api.step.StepFailure(
f'found "{match.group(0)}" in commit message'
)
if not details.get('work_in_progress', False):
if dry_run: # pragma: no cover
api.step.empty('would mark change as wip, but in dry run')
else:
try:
api.gerrit.wip('wip', details['_number'])
except api.step.StepFailure: # pragma: no cover
pass
@nest_step
def _check_tested(api, details):
"""Check for a "Tested:" line in the commit message."""
for line in details['message'].split('\n'):
if line.lower().startswith('tested:'):
with api.step.nest('found tested line in commit message') as pres:
pres.step_summary_text = line
return
raise api.step.StepFailure(
'could not find "Tested:" line in commit message'
)
def _check_readability(api, details, comments, readability):
"""Set the trivial readability label if the CL is trivial."""
extensions = tuple(str(x) for x in readability.file_extensions)
with api.step.nest(readability.trivial_label):
if readability.trivial_label not in details['labels']:
with api.step.nest('trivial label not present for repo'):
return
current_revision = details['revisions'][details['current_revision']]
changed_lines = 0
changed_files = 0
for filename, entry in current_revision['files'].items():
# TODO: b/240290899 - Maybe don't count deleted files?
if filename.endswith(extensions):
changed_files += 1
changed_lines += entry.get('lines_deleted', 0)
changed_lines += entry.get('lines_inserted', 0)
with api.step.nest(f'changed files {changed_files}'):
pass
with api.step.nest(f'changed lines {changed_lines}'):
pass
with api.step.nest(f'threshold {readability.line_number_threshold}'):
pass
if changed_files and changed_lines < readability.line_number_threshold:
api.gerrit.set_review(
f'set label',
str(details['_number']),
labels={readability.trivial_label: 1},
notify='OWNER',
)
def RunSteps(api, props):
res = api.util.get_change_with_comments()
change = res.change
details = res.details
commit_message = res.commit_message
comments = res.comments
api.cv.set_do_not_retry_build()
# Make it just a little easier to retrieve the current commit message later.
current_revision = details['revisions'][details['current_revision']]
details['message'] = current_revision['commit']['message']
with api.step.nest('checking if revert') as pres:
pres.step_summary_text = str(details['revert_of'])
if details['revert_of']:
with api.step.nest('ignored'): # pragma: no cover
return
with api.step.nest('checking if merge') as pres:
parents = [x['commit'] for x in current_revision['commit']['parents']]
pres.step_summary_text = str(parents)
if len(parents) > 1:
with api.step.nest('ignored'): # pragma: no cover
return
with api.step.nest('checking owner account') as pres:
owner = details['owner']['email']
pres.step_summary_text = owner
if owner in props.ignored_accounts:
with api.step.nest('ignored'):
return
if props.require_tested:
_check_tested(api, details)
if props.commit_message_regexp:
_check_regexp(
api,
change,
details,
str(props.commit_message_regexp),
str(props.commit_message_regexp_failure_message),
)
_check_dns(api, details, dry_run=props.dry_run)
for readability in props.readability:
_check_readability(api, details, comments, readability)
def change_details(
api,
msg,
description='',
files=(),
owner='nobody@example.com',
revert_of=0,
votes=None,
work_in_progress=None,
num_parents=1,
):
labels = {}
if votes:
for k, v in votes.items():
labels[k] = {}
if v is not None: # pragma: no cover
assert v in "approved recommended disliked rejected".split()
labels[k][v] = True
if isinstance(files, (list, tuple)):
files = {x: {'lines_inserted': 20, 'lines_deleted': 20} for x in files}
details = {
'current_revision': 'HASH',
'revisions': {
'HASH': {
'_number': 1,
'commit': {
'subject': msg.split('\n')[0],
'message': msg,
'parents': [
{'commit': f'PARENT{i}'} for i in range(num_parents)
],
},
'files': files,
'description': description,
}
},
'owner': {'email': owner},
'revert_of': revert_of,
'labels': labels,
'_number': 1234,
}
if work_in_progress:
details['work_in_progress'] = work_in_progress # pragma: no cover
return api.step_data('change details', api.json.output(details))
def must_run(api, step):
return api.post_process(post_process.MustRun, step)
def tested_tests(api):
def test(name, msg, *args, status='SUCCESS', require_tested=True, **kwargs):
return api.test(
f'tested.{name}',
api.properties(
InputProperties(require_tested=require_tested, **kwargs)
),
api.checkout.try_test_data(),
change_details(api, msg),
status=status,
)
def tested_passes():
return must_run(api, 'check tested.found tested line in commit message')
def owner_ignored():
return must_run(api, 'checking owner account.ignored')
yield test(
'tested-pass',
'Tested: maybe',
tested_passes(),
)
yield test('tested-fail', 'Test: nope', status='FAILURE')
yield test(
'tested-skip',
'Test: nope',
owner_ignored(),
ignored_accounts=['nobody@example.com'],
)
def readability_tests(api):
def test(name, *args, msg='', status='SUCCESS', **kwargs):
final_props = {
'readability': [
{
'trivial_label': 'Readability-Trivial',
'file_extensions': (
'.c .cc .cpp .cxx .C .h .hh .hpp .hxx .H'.split()
),
'line_number_threshold': 10,
},
],
}
votes = kwargs.pop('votes', {'Readability-Trivial': None})
return api.test(
f'readability.{name}',
api.properties(**final_props),
change_details(
api,
msg=msg,
votes=votes,
**kwargs,
),
api.cv(api.cv.FULL_RUN),
*args,
status=status,
)
def assert_changed_files(num):
return api.post_process(
post_process.MustRun,
f'Readability-Trivial.changed files {num}',
)
def assert_changed_lines(num):
return api.post_process(
post_process.MustRun,
f'Readability-Trivial.changed lines {num}',
)
def assert_set():
return api.post_process(
post_process.MustRun,
'Readability-Trivial.set label',
)
def assert_not_set():
return api.post_process(
post_process.DoesNotRun,
'Readability-Trivial.set label',
)
yield test(
'no_label',
api.checkout.try_test_data(),
assert_not_set(),
votes={},
)
yield test(
'no_cpp_changes',
api.checkout.try_test_data(),
assert_changed_files(0),
assert_changed_lines(0),
assert_not_set(),
)
yield test(
'inserted_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(15),
assert_not_set(),
files={'foo.cpp': {'lines_inserted': 15}},
)
yield test(
'inserted_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(5),
assert_set(),
files={'foo.cpp': {'lines_inserted': 5}},
)
yield test(
'removed_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(15),
assert_not_set(),
files={'foo.cpp': {'lines_deleted': 15}},
)
yield test(
'removed_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(5),
assert_set(),
files={'foo.cpp': {'lines_deleted': 5}},
)
yield test(
'mixed_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(10),
assert_not_set(),
files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 5}},
)
yield test(
'mixed_trivial',
api.checkout.try_test_data(),
assert_changed_files(1),
assert_changed_lines(9),
assert_set(),
files={'foo.cpp': {'lines_inserted': 5, 'lines_deleted': 4}},
)
yield test(
'multiple_nontrivial',
api.checkout.try_test_data(),
assert_changed_files(5),
assert_changed_lines(10),
assert_not_set(),
files={
'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.c': {'lines_inserted': 1, 'lines_deleted': 1},
},
)
yield test(
'multiple_trivial',
api.checkout.try_test_data(),
assert_changed_files(4),
assert_changed_lines(8),
assert_set(),
files={
'foo.cpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hpp': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.cc': {'lines_inserted': 1, 'lines_deleted': 1},
'foo.hh': {'lines_inserted': 1, 'lines_deleted': 1},
},
)
def regexp_tests(api):
def test(
name,
msg,
*args,
regexp,
failure_msg='',
status='SUCCESS',
**kwargs,
):
return api.test(
f'regexp.{name}',
api.properties(
commit_message_regexp=regexp,
commit_message_regexp_failure_message=failure_msg,
),
api.checkout.try_test_data(),
change_details(api, msg),
*args,
status=status,
)
yield test(
'pass',
'Bug: b/99999999999999999999999999999999999999999999999999999',
must_run(api, 'check regexp.matches'),
regexp=r'.*Bug: b/\d+.*',
)
yield test(
'fail',
'message',
must_run(api, 'check regexp.does not match'),
regexp=r'.*Bug: b/\d+.*',
status='FAILURE',
)
yield test(
'fail-msg',
'message',
must_run(api, 'check regexp.does not match'),
must_run(api, 'check regexp.more details'),
regexp=r'.*Bug: b/\d+.*',
failure_msg='Did not reference a bug.',
status='FAILURE',
)
def do_not_submit_tests(api):
def test(name, cv_mode, commit_message, status, *args, **kwargs):
return api.test(
f'do-not-submit.{name}',
api.buildbucket.try_build(),
api.cv(cv_mode),
change_details(api, commit_message),
status=status,
*args,
**kwargs,
)
yield test('full', api.cv.FULL_RUN, 'Do_NoT-sUbMiT', 'FAILURE')
yield test('dry', api.cv.DRY_RUN, 'Do_NoT-sUbMiT', 'SUCCESS')
def GenTests(api) -> Iterator[recipe_test_api.TestData]:
yield from tested_tests(api)
yield from readability_tests(api)
yield from regexp_tests(api)
yield from do_not_submit_tests(api)