blob: 0592300540258afb50c62251f610a25630372be4 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Roll submodules of a git repository."""
from __future__ import annotations
import configparser
import dataclasses
import io
import re
from typing import Generator, TYPE_CHECKING
import attrs
from PB.recipes.pigweed.submodule_roller import InputProperties, Submodule
if TYPE_CHECKING: # pragma: no cover
from recipe_engine import config_types, post_process, recipe_test_api
DEPS = [
'fuchsia/auto_roller',
'fuchsia/git',
'pigweed/checkout',
'pigweed/roll_util',
'recipe_engine/context',
'recipe_engine/file',
'recipe_engine/properties',
'recipe_engine/step',
]
PROPERTIES = InputProperties
@dataclasses.dataclass
class _Submodule:
path: str
name: str
branch: str
remote: str = dataclasses.field(default=None)
dir: config_types.Path = dataclasses.field(default=None)
@dataclasses.dataclass
class _RevisionChange:
old: str
new: str
def _update_submodule(api, checkout, path, new_revision):
with api.context(cwd=checkout.top):
api.git.submodule_update(
paths=(path,),
timeout=checkout.options.submodule_timeout_sec,
)
old_revision = api.checkout.get_revision(
path, 'get old revision', test_data='1' * 40
)
with api.context(cwd=path):
api.git('git fetch', 'fetch', 'origin', new_revision)
api.git('git checkout', 'checkout', 'FETCH_HEAD')
# In case new_revision is a branch name we need to retrieve the hash it
# resolved to.
if not re.search(r'^[0-9a-f]{40}$', new_revision):
new_revision = api.checkout.get_revision(
path, 'get new revision', test_data='2' * 40
)
return _RevisionChange(old=old_revision, new=new_revision)
def RunSteps(api, props): # pylint: disable=invalid-name
submodules = [
_Submodule(path=x.path, name=x.name, branch=x.branch)
for x in props.submodules
]
cc_authors_on_rolls = props.cc_authors_on_rolls
cc_reviewers_on_rolls = props.cc_reviewers_on_rolls
cc_domains = props.cc_domains
always_cc = props.always_cc
# The checkout module will try to use trigger data to pull in a specific
# patch. Since the triggering commit is in a different repository that
# needs to be disabled.
props.checkout_options.use_trigger = False
checkout = api.checkout(props.checkout_options)
# Confirm the given path is actually a submodule.
gitmodules = api.file.read_text(
'read .gitmodules', checkout.root / '.gitmodules'
)
# Example .gitmodules file:
# [submodule "third_party/pigweed"]
# path = third_party/pigweed
# url = https://pigweed.googlesource.com/pigweed/pigweed
# configparser doesn't like leading whitespace on lines, despite what its
# documentation says.
gitmodules = re.sub(r'\n\s+', '\n', gitmodules)
parser = configparser.RawConfigParser()
parser.readfp(io.StringIO(gitmodules))
rolls = {}
for submodule in submodules:
if not submodule.name:
submodule.name = submodule.path
submodule.dir = checkout.root / submodule.path
with api.step.nest(submodule.name) as pres:
section = f'submodule "{submodule.name}"'
if not parser.has_section(section):
sections = parser.sections()
submodules = sorted(
re.sub(r'^.*"(.*)"$', r'\1', x) for x in sections
)
raise api.step.StepFailure(
'no submodule "{}" (submodules: {})'.format(
submodule.name,
', '.join('"{}"'.format(x) for x in submodules),
)
)
if not submodule.branch:
try:
submodule.branch = parser.get(section, 'branch')
except configparser.NoOptionError:
submodule.branch = 'main'
submodule.remote = api.roll_util.normalize_remote(
parser.get(section, 'url'),
checkout.options.remote,
)
change = _update_submodule(
api, checkout, submodule.dir, submodule.branch
)
direction = api.roll_util.get_roll_direction(
submodule.dir, change.old, change.new
)
# If the primary roll is not necessary or is backwards we can exit
# immediately and don't need to check deps.
if api.roll_util.can_roll(direction):
rolls[submodule.path] = api.roll_util.create_roll(
project_name=str(submodule.path),
old_revision=change.old,
new_revision=change.new,
proj_dir=submodule.dir,
direction=direction,
nest_steps=False,
)
else:
pres.step_summary_text = 'no roll required'
api.roll_util.skip_roll_step(
submodule.remote, change.old, change.new
)
if not rolls:
with api.step.nest('nothing to roll, exiting'):
return
authors = api.roll_util.authors(*rolls.values())
num_commits = sum(len(x.commits) for x in rolls.values())
max_commits_for_ccing = props.max_commits_for_ccing or 10
if num_commits <= max_commits_for_ccing:
cc = set()
if cc_authors_on_rolls:
cc.update(authors)
if cc_reviewers_on_rolls:
cc.update(api.roll_util.reviewers(*rolls.values()))
def include_cc(email):
return api.roll_util.include_cc(
email, cc_domains, checkout.gerrit_host()
)
# include_cc() writes steps, so we want things sorted before calling it.
cc = sorted(set(cc))
cc_emails = [x.email for x in cc if include_cc(x)]
if always_cc:
props.auto_roller_options.cc_emails.extend(cc_emails)
else:
props.auto_roller_options.cc_on_failure_emails.extend(cc_emails)
author_override = None
with api.step.nest('authors') as pres:
pres.step_summary_text = repr(authors)
if len(authors) == 1 and props.forge_author:
author_override = attrs.asdict(
api.roll_util.fake_author(next(iter(authors)))
)
# merge auto_roller_options and override_auto_roller_options.
complete_auto_roller_options = api.roll_util.merge_auto_roller_overrides(
props.auto_roller_options, props.override_auto_roller_options
)
change = api.auto_roller.attempt_roll(
complete_auto_roller_options,
repo_dir=checkout.root,
commit_message=api.roll_util.message(*rolls.values()),
author_override=author_override,
)
return api.auto_roller.raw_result(change)
def GenTests(api) -> Generator[recipe_test_api.TestData, None, None]:
"""Create tests."""
def _url(x):
if x.startswith(('https://', 'sso://', '.')):
return x
return 'https://foo.googlesource.com/' + x
def submodules(*subs):
res = []
for sub in subs:
if isinstance(sub, str):
res.append(Submodule(path=sub))
elif isinstance(sub, dict):
res.append(Submodule(**sub))
else:
raise ValueError(repr(sub)) # pragma: no cover
return res
def gitmodules(**submodules):
branches = {}
for k, v in submodules.items():
if k.endswith('_branch'):
branches[k.replace('_branch', '')] = v
for x in branches:
del submodules[f'{x}_branch']
text = []
for k, v in submodules.items():
text.append(
'[submodule "{0}"]\n\tpath = {0}\n\turl = {1}\n'.format(
k, _url(v)
)
)
if k in branches:
text.append(f'\tbranch = {branches[k]}\n')
return api.step_data(
'read .gitmodules', api.file.read_text(''.join(text))
)
def properties(submodules, **kwargs):
props = InputProperties(**kwargs)
props.checkout_options.CopyFrom(api.checkout.git_options())
props.submodules.extend(submodules)
props.auto_roller_options.dry_run = True
props.auto_roller_options.remote = api.checkout.pigweed_repo
return api.properties(props)
def commit_data(name, **kwargs):
return api.roll_util.commit_data(
name,
api.roll_util.commit('a' * 40, 'foo\nbar\n\nChange-Id: I1111'),
**kwargs,
)
yield (
api.test('success')
+ properties(
submodules('a1', 'b2'),
cc_authors_on_rolls=True,
always_cc=True,
forge_author=True,
)
+ commit_data('a1', prefix='')
+ commit_data('b2', prefix='')
+ gitmodules(a1='sso://foo/a1', b2='sso://foo/b2')
+ api.roll_util.forward_roll('a1')
+ api.roll_util.forward_roll('b2')
+ api.auto_roller.dry_run_success()
)
yield (
api.test('partial_noop')
+ properties(submodules('a1', 'b2'), cc_reviewers_on_rolls=True)
+ commit_data('a1', prefix='')
+ gitmodules(a1='sso://foo/a1', b2='sso://foo/b2')
+ api.roll_util.forward_roll('a1')
+ api.roll_util.noop_roll('b2')
+ api.auto_roller.dry_run_success()
)
yield (
api.test('noop')
+ properties(submodules('a1', {'path': 'b2'}))
+ gitmodules(a1='a1', b2='b2', b2_branch='branch')
+ api.roll_util.noop_roll('a1')
+ api.roll_util.noop_roll('b2')
)
yield (
api.test('missing', status='FAILURE')
+ properties(submodules('a1', 'b2'), cc_authors_on_rolls=True)
+ gitmodules(a1='sso://foo/a1')
)