blob: f1556c290a8e85474855590e5292f347fa615feb [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Utility functions for rollers."""
import collections
import re
from typing import Any, Optional, Sequence
import urllib
import attrs
import enum
from recipe_engine import config_types, recipe_api
from PB.recipe_modules.fuchsia.auto_roller.options import (
Options as AutoRollerOptions,
)
# If we're embedding the original commit message, prepend 'Original-' to lines
# which begin with these tags.
ESCAPE_TAGS: tuple[re.Pattern | str, ...] = (
'Bug:',
'Fixed:',
'Fixes:',
'Requires:',
'Reviewed-on:',
)
# If we're embedding the original commit message, remove lines which contain
# these tags.
FILTER_TAGS: tuple[re.Pattern | str, ...] = (
'API-Review:',
'Acked-by:',
re.compile(r'^\w+-?Auto-Submit:', re.IGNORECASE),
'Build-Errors:',
'CC:',
'CQ-Do-Not-Cancel-Tryjobs:',
'Cq-Include-Trybots:',
'Change-Id:',
'Commit-Queue:',
'Cq-Cl-Tag:',
re.compile(r'Git[ -]?watcher:', re.IGNORECASE),
'No-Docs-Update-Reason:',
'No-Presubmit:',
'No-Tree-Checks: true',
'No-Try: true',
'Presubmit-Verified:',
re.compile(r'^\w+-?Readability-Trivial:', re.IGNORECASE),
'Reviewed-by:',
'Roller-URL:',
'Signed-off-by:',
'Testability-Review:',
'Tested-by:',
)
def _match_tag(line: str, tag: re.Pattern | str) -> bool:
if hasattr(tag, 'match'):
return bool(tag.match(line))
return line.startswith(tag)
def _sanitize_message(message: str) -> str:
"""Sanitize lines of a commit message.
Prepend 'Original-' to lines which begin with ESCAPE_TAGS. Filter
out lines which begin with FILTER_TAGS.
"""
lines = message.splitlines()
# If the first line is really long create a truncated version of it, but
# keep the original version of the commit message around.
if len(lines[0]) > 80:
lines = [lines[0][0:50], ''] + lines
return '\n'.join(
(
"Original-" + line
if any((_match_tag(line, tag) for tag in ESCAPE_TAGS))
else line
)
for line in lines
if not any((_match_tag(line, tag) for tag in FILTER_TAGS))
)
class Direction(enum.Enum):
CURRENT = 'CURRENT'
FORWARD = 'FORWARD'
BACKWARD = 'BACKWARD'
REBASE = 'REBASE'
@attrs.define(frozen=True)
class Account:
name: str
email: str
def __lt__(self, other) -> bool:
return (self.email, self.name) < (other.email, other.name)
@attrs.define
class Commit:
hash: str
message: str
author: str
owner: str
reviewers: tuple[Account]
@attrs.define
class Roll:
_api: recipe_api.RecipeApi
project_name: str
old_revision: str
new_revision: str
proj_dir: str
direction: str = attrs.field()
commits: tuple[Commit, ...] | None = None
remote: str | None = None
_nest_steps: bool = True
@direction.validator
def check(self, _, value: str) -> None: # pragma: no cover
if value not in Direction:
raise ValueError(f'invalid direction: {value}')
if value == Direction.CURRENT:
raise ValueError('attempt to do a no-op roll')
def __attrs_post_init__(self) -> None:
self._set_remote()
with self._api.context(cwd=self.proj_dir):
if self._nest_steps:
with self._api.step.nest(self.project_name):
self._set_commits()
else:
self._set_commits() # pragma: no cover
def _set_commits(self) -> None:
log_cmd: list[str] = [
'log',
'--pretty=format:%H\n%an\n%ae\n%B',
# Separate entries with null bytes since most entries
# will contain newlines ("%B" is the full commit
# message, not just the first line.)
'-z',
]
if _is_hash(self.old_revision) and self.direction == Direction.FORWARD:
log_cmd.append(f'{self.old_revision}..{self.new_revision}')
else:
log_cmd.extend(('--max-count', '5', self.new_revision))
log_kwargs: dict[str, Any] = {'stdout': self._api.raw_io.output_text()}
commit_log: str = (
self._api.git('git log', *log_cmd, **log_kwargs)
.stdout.strip('\0')
.split('\0')
)
commits: list[Commit] = []
for i, commit in enumerate(commit_log):
commit_hash: str
name: str
email: str
message: str
commit_hash, name, email, message = commit.split('\n', 3)
author = Account(name, email)
owner: Account | None = None
reviewers = []
full_host = f'{self.gerrit_name}-review.googlesource.com'
changes = []
# If there are a lot of CLs in this roll only get owner and
# reviewer data from the first 10 so we don't make too many
# requests of Gerrit.
if i < 10:
change_query_step = self._api.gerrit.change_query(
'get change-id',
f'commit:{commit_hash}',
host=full_host,
test_data=self._api.json.test_api.output(
[{'_number': 12345}]
),
ok_ret='any',
)
if change_query_step.exc_result.retcode == 0:
changes = change_query_step.json.output
if changes and len(changes) == 1:
number = changes[0]['_number']
step = self._api.gerrit.change_details(
f'get {number}',
number,
host=full_host,
test_data=self._api.json.test_api.output(
{
'owner': {
'name': 'author',
'email': 'author@example.com',
},
'reviewers': {
'REVIEWER': [
{
'name': 'reviewer',
'email': 'reviewer@example.com',
},
{
'name': 'nobody',
'email': 'nobody@google.com',
},
{
'name': 'robot',
'email': 'robot@gserviceaccount.com',
},
],
},
}
),
ok_ret='any',
)
if step.exc_result.retcode == 0:
details = step.json.output
owner = Account(
details['owner']['name'], details['owner']['email']
)
for reviewer in details['reviewers']['REVIEWER']:
reviewers.append(
Account(
reviewer['name'],
reviewer.get('email', 'robot@example.com'),
),
)
commits.append(
Commit(
hash=commit_hash,
author=author,
owner=owner,
reviewers=tuple(reviewers),
message=message,
)
)
self.commits = tuple(commits)
def _set_remote(self) -> None:
api = self._api
with api.step.nest('remote'), api.context(cwd=self.proj_dir):
# There may be multiple remote names. Only get the first one. They
# should refer to the same URL so it doesn't matter which we use.
name = (
api.git(
'name',
'remote',
stdout=api.raw_io.output_text(),
step_test_data=lambda: api.raw_io.test_api.stream_output_text(
'origin'
),
)
.stdout.strip()
.split('\n')[0]
)
remote = api.git(
'url',
'remote',
'get-url',
name,
stdout=api.raw_io.output_text(),
step_test_data=lambda: api.raw_io.test_api.stream_output_text(
'sso://pigweed/pigweed/pigweed'
),
).stdout.strip()
self.remote = api.sso.sso_to_https(remote)
@property
def gerrit_name(self) -> str:
return urllib.parse.urlparse(self.remote).netloc.split('.')[0]
@attrs.define
class Message:
name: str
template: str
kwargs: dict[str, Any]
num_commits: int
footer: tuple = ()
def render(self, with_footer: bool = True) -> str:
result = [self.template.format(**self.kwargs)]
if with_footer:
result.extend(x for x in self.footer)
return '\n'.join(result)
def _is_hash(value: str) -> bool:
return bool(re.match(r'^[0-9a-fA-F]{40}', value))
def _pprint_dict(d: dict) -> str:
result = []
for k, v in sorted(d.items()):
result.append(f'{k!r}: {v!r}\n')
return ''.join(result)
class RollUtilApi(recipe_api.RecipeApi):
Account = Account
Roll = Roll
Direction = Direction
def __init__(self, props, *args, **kwargs):
super().__init__(*args, **kwargs)
self.labels_to_set = collections.OrderedDict()
for label in sorted(props.labels_to_set, key=lambda x: x.label):
self.labels_to_set[str(label.label)] = label.value
self.labels_to_wait_on = sorted(str(x) for x in props.labels_to_wait_on)
self.footer = []
self._commit_divider = props.commit_divider
def authors(self, *rolls: Roll) -> set[Account]:
authors = set()
for roll in rolls:
for commit in roll.commits:
if commit.author:
authors.add(commit.author)
if commit.owner:
authors.add(commit.owner)
return authors
def fake_author(self, author: Account) -> Account:
# Update the author's email address so it can be used for attribution
# without literally attributing it to the author's account in Gerrit.
# Make sure not to add it twice, and there's no need to do this for
# service accounts.
email = author.email
prefix = 'pigweed.infra.roller.'
if prefix not in email and not email.endswith('gserviceaccount.com'):
user, domain = author.email.split('@')
email = f'{user}@{prefix}{domain}'
return Account(
author.name,
email,
)
def reviewers(self, *rolls: Roll) -> set[Account]:
reviewers = set()
for roll in rolls:
for commit in roll.commits:
reviewers.update(commit.reviewers)
return reviewers
def can_cc_on_roll(self, email: str, host: str) -> bool:
# Assume all queried accounts exist on Gerrit in testing except for
# nobody@google.com.
test_data = self.m.json.test_api.output([{'_account_id': 123}])
if email == 'nobody@google.com':
test_data = self.m.json.test_api.output([])
return bool(
self.m.gerrit.account_query(
email,
f'email:{email}',
host=host,
test_data=test_data,
).json.output
)
def include_cc(
self,
account: Account,
cc_domains: Sequence[str],
host: str,
):
with self.m.step.nest(f'cc {account.email}') as pres:
domain = account.email.split('@', 1)[1]
if domain.endswith('gserviceaccount.com'):
pres.step_summary_text = 'not CCing, robot account'
return False
if cc_domains and domain not in cc_domains:
pres.step_summary_text = 'not CCing, domain excluded'
return False
if not self.can_cc_on_roll(account.email, host=host):
pres.step_summary_text = 'not CCing, no account in Gerrit'
return False
pres.step_summary_text = 'CCing'
return True
def _single_commit_roll_message(self, roll: Roll) -> str:
template = """
[roll {project_name}] {sanitized_message}
{remote}
{project_name} Rolled-Commits: {old_revision:.15}..{new_revision:.15}
""".strip()
commit = roll.commits[0]
kwargs = {
'project_name': roll.project_name,
'remote': roll.remote,
'original_message': commit.message,
'sanitized_message': _sanitize_message(commit.message),
'old_revision': roll.old_revision,
'new_revision': roll.new_revision,
}
message = Message(
name=roll.project_name,
template=template,
kwargs=kwargs,
num_commits=1,
footer=tuple(self.footer),
)
with self.m.step.nest(f'message for {roll.project_name}') as pres:
pres.logs['template'] = template
pres.logs['kwargs'] = _pprint_dict(kwargs)
pres.logs['message'] = message.render()
return message
def _multiple_commits_roll_message(self, roll: Roll) -> str:
template = """
[{project_name}] Roll {num_commits} commits
{one_liners}
{remote}
{project_name} Rolled-Commits: {old_revision:.15}..{new_revision:.15}
""".strip()
one_liners = []
for commit in roll.commits:
# Handle case where the commit message is empty. Example:
# https://github.com/google/googletest/commit/148ab827cacc7a879832f40313bda87a65b1e8a3
first_line = '(empty commit message)'
if commit.message:
first_line = commit.message.splitlines()[0]
one_liners.append(f'{commit.hash:.15} {first_line[0:50]}')
num_commits = len(roll.commits)
if not _is_hash(roll.old_revision):
num_commits = 'multiple'
one_liners.append('...')
if len(one_liners) > 100:
one_liners = one_liners[0:5] + ['...'] + one_liners[-5:]
# In case both this and the previous condition match.
if one_liners[-1] == '...':
one_liners.pop() # pragma: no cover
kwargs = {
'project_name': roll.project_name,
'remote': roll.remote,
'num_commits': num_commits,
'one_liners': '\n'.join(one_liners),
'old_revision': roll.old_revision,
'new_revision': roll.new_revision,
}
message = Message(
name=roll.project_name,
template=template,
kwargs=kwargs,
num_commits=num_commits,
footer=tuple(self.footer),
)
with self.m.step.nest('message') as pres:
pres.logs['template'] = template
pres.logs['kwargs'] = _pprint_dict(kwargs)
pres.logs['message'] = message.render()
return message
def _single_roll_message(self, roll: Roll) -> str:
if len(roll.commits) > 1:
return self._multiple_commits_roll_message(roll)
return self._single_commit_roll_message(roll)
def _multiple_rolls_message(self, *rolls: Roll):
rolls = sorted(rolls, key=lambda x: x.project_name)
messages = []
for roll in rolls:
messages.append(self._single_roll_message(roll))
texts = [
'[roll {}] Roll {} commits'.format(
', '.join(x.name for x in messages),
sum(x.num_commits for x in messages),
)
]
texts.extend(x.render(with_footer=False) for x in messages)
texts.append('\n'.join(f'{x}' for x in self.footer))
return '\n\n'.join(texts)
def create_roll(self, **kwargs) -> Roll:
"""Create a Roll. See Roll class above for details."""
return Roll(api=self.m, **kwargs)
def message(self, *rolls: Roll) -> str:
with self.m.step.nest('roll message'):
if len(rolls) > 1:
result = self._multiple_rolls_message(*rolls)
else:
result = self._single_roll_message(*rolls).render()
if self._commit_divider:
result += f'\n{self._commit_divider}'
return result
def get_roll_direction(
self,
git_dir: config_types.Path,
old: str,
new: str,
name: str = 'get roll direction',
) -> Direction:
"""Return Direction of roll."""
if old == new:
with self.m.step.nest(name) as pres:
pres.step_summary_text = 'up-to-date'
return Direction.CURRENT
with self.m.context(git_dir):
with self.m.step.nest(name) as pres:
forward = self.m.git(
'is forward',
'merge-base',
'--is-ancestor',
old,
new,
ok_ret=(0, 1),
)
backward = self.m.git(
'is backward',
'merge-base',
'--is-ancestor',
new,
old,
ok_ret=(0, 1),
)
if (
forward.exc_result.retcode == 0
and backward.exc_result.retcode != 0
):
pres.step_summary_text = 'forward'
return Direction.FORWARD
if (
forward.exc_result.retcode != 0
and backward.exc_result.retcode == 0
):
pres.step_summary_text = 'backward'
return Direction.BACKWARD
# If new and old are ancestors of each other then this is the
# same commit. We should only hit this during testing because
# the comparison at the top of the function should have caught
# this situation.
if (
forward.exc_result.retcode == 0
and backward.exc_result.retcode == 0
):
with self.m.step.nest(name) as pres:
pres.step_summary_text = 'up-to-date'
return Direction.CURRENT
# If old is not an ancestor of new and new is not an ancestor
# of old then history was rewritten in some manner but we still
# need to update the pin.
pres.step_summary_text = 'rebase'
return Direction.REBASE
def can_roll(self, direction: Direction) -> bool:
return direction in (Direction.FORWARD, Direction.REBASE)
def skip_roll_step(self, remote: str, old_revision: str, new_revision: str):
with self.m.step.nest('cancelling roll') as pres:
fmt = (
'not updating from {old} to {new} because {old} is newer '
'than {new}'
)
if old_revision == new_revision:
fmt = (
'not updating from {old} to {new} because they are '
'identical'
)
pres.step_summary_text = fmt.format(
old=old_revision[0:7], new=new_revision[0:7]
)
pres.links[old_revision] = f'{remote}/+/{old_revision}'
pres.links[new_revision] = f'{remote}/+/{new_revision}'
def normalize_remote(self, remote: str, base: str) -> str:
"""Convert relative paths to absolute paths.
Support relative paths. If the top-level project is
"https://pigweed.googlesource.com/ex/ample" then a submodule path of
"./abc" maps to "https://pigweed.googlesource.com/ex/ample/abc" and
"../abc" maps to "https://pigweed.googlesource.com/ex/abc". Minimal
error-checking because git does most of these checks for us.
Also converts sso to https.
Args:
remote (str): Submodule remote URL.
base (str): Fully-qualified superproject remote URL.
"""
if remote.startswith('.'):
remote = '/'.join((base.rstrip('/'), remote.lstrip('/')))
changes = 1
while changes:
changes = 0
remote, n = re.subn(r'/\./', '/', remote)
changes += n
remote, n = re.subn(r'/[^/]+/\.\./', '/', remote)
changes += n
return self.m.sso.sso_to_https(remote)
def merge_auto_roller_overrides(
self,
auto_roller_options: AutoRollerOptions,
override_auto_roller_options: AutoRollerOptions,
):
result = AutoRollerOptions()
result.CopyFrom(auto_roller_options)
result.MergeFrom(override_auto_roller_options)
return result