blob: 3d25970d90d534ed3c419ff45638337dc0809a57 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Roll submodules of a git repository."""
import configparser
import io
import re
import attr
from PB.recipes.pigweed.submodule_roller import InputProperties
from recipe_engine import post_process
DEPS = [
'fuchsia/auto_roller',
'fuchsia/git',
'fuchsia/status_check',
'pigweed/checkout',
'pigweed/roll_util',
'recipe_engine/context',
'recipe_engine/file',
'recipe_engine/properties',
'recipe_engine/step',
]
PROPERTIES = InputProperties
@attr.s
class _Submodule:
path = attr.ib(type=str)
name = attr.ib(type=str)
branch = attr.ib(type=str)
remote = attr.ib(type=str, default=None)
dir = attr.ib(type=str, default=None)
@attr.s
class _RevisionChange:
old = attr.ib(type=str)
new = attr.ib(type=str)
def _update_submodule(api, checkout, path, new_revision):
with api.context(cwd=checkout.top):
api.git.update_submodule(
paths=(path,), timeout=checkout.options.submodule_timeout_sec,
)
old_revision = api.checkout.get_revision(
path, 'get old revision', test_data='1' * 40
)
with api.context(cwd=path):
api.git('git fetch', 'fetch', 'origin', new_revision)
api.git('git checkout', 'checkout', 'FETCH_HEAD')
# In case new_revision is a branch name we need to retrieve the hash it
# resolved to.
if not re.search(r'^[0-9a-f]{40}$', new_revision):
new_revision = api.checkout.get_revision(
path, 'get new revision', test_data='2' * 40
)
return _RevisionChange(old=old_revision, new=new_revision)
def RunSteps(api, props): # pylint: disable=invalid-name
submodules = [
_Submodule(path=x.path, name=x.name, branch=x.branch)
for x in props.submodules
]
cc_authors_on_rolls = props.cc_authors_on_rolls
cc_reviewers_on_rolls = props.cc_reviewers_on_rolls
cc_domains = props.cc_domains
always_cc = props.always_cc
# The checkout module will try to use trigger data to pull in a specific
# patch. Since the triggering commit is in a different repository that
# needs to be disabled.
props.checkout_options.use_trigger = False
checkout = api.checkout(props.checkout_options)
# Confirm the given path is actually a submodule.
gitmodules = api.file.read_text(
'read .gitmodules', checkout.root.join('.gitmodules')
)
# Example .gitmodules file:
# [submodule "third_party/pigweed"]
# path = third_party/pigweed
# url = https://pigweed.googlesource.com/pigweed/pigweed
# configparser doesn't like leading whitespace on lines, despite what its
# documentation says.
gitmodules = re.sub(r'\n\s+', '\n', gitmodules)
parser = configparser.RawConfigParser()
parser.readfp(io.StringIO(gitmodules))
rolls = {}
for submodule in submodules:
if not submodule.name:
submodule.name = submodule.path
submodule.dir = checkout.root.join(submodule.path)
with api.step.nest(submodule.name) as pres:
section = 'submodule "{}"'.format(submodule.name)
if not parser.has_section(section):
sections = parser.sections()
submodules = sorted(
re.sub(r'^.*"(.*)"$', r'\1', x) for x in sections
)
raise api.step.StepFailure(
'no submodule "{}" (submodules: {})'.format(
submodule.name,
', '.join('"{}"'.format(x) for x in submodules),
)
)
if not submodule.branch:
try:
submodule.branch = parser.get(section, 'branch')
except configparser.NoOptionError:
submodule.branch = 'main'
submodule.remote = api.roll_util.normalize_remote(
parser.get(section, 'url'), checkout.options.remote,
)
change = _update_submodule(
api, checkout, submodule.dir, submodule.branch
)
direction = api.roll_util.get_roll_direction(
submodule.dir, change.old, change.new
)
# If the primary roll is not necessary or is backwards we can exit
# immediately and don't need to check deps.
if api.roll_util.can_roll(direction):
rolls[submodule.path] = api.roll_util.Roll(
project_name=str(submodule.path),
old_revision=change.old,
new_revision=change.new,
proj_dir=submodule.dir,
direction=direction,
nest_steps=False,
)
else:
pres.step_summary_text = 'no roll required'
api.roll_util.skip_roll_step(
submodule.remote, change.old, change.new
)
if not rolls:
with api.step.nest('nothing to roll, exiting'):
return
cc = set()
authors = api.roll_util.authors(*rolls.values())
if cc_authors_on_rolls:
cc.update(authors)
if cc_reviewers_on_rolls:
cc.update(api.roll_util.reviewers(*rolls.values()))
def include_cc(email):
return api.roll_util.include_cc(
email, cc_domains, checkout.gerrit_host()
)
# include_cc() writes steps, so we want things sorted before calling it.
cc = sorted(set(cc))
cc_emails = [x.email for x in cc if include_cc(x)]
if always_cc:
props.auto_roller_options.cc_emails.extend(cc_emails)
else:
props.auto_roller_options.cc_on_failure_emails.extend(cc_emails)
author_override = None
with api.step.nest('authors') as pres:
pres.step_summary_text = repr(authors)
if len(authors) == 1 and props.forge_author:
author_override = api.roll_util.fake_author(
next(iter(authors))
)._asdict()
change = api.auto_roller.attempt_roll(
props.auto_roller_options,
repo_dir=checkout.root,
commit_message=api.roll_util.message(*rolls.values()),
author_override=author_override,
)
return api.auto_roller.raw_result(change)
def GenTests(api): # pylint: disable=invalid-name
"""Create tests."""
def _url(x):
if x.startswith(('https://', 'sso://', '.')):
return x
return 'https://foo.googlesource.com/' + x
def submodules(*subs):
res = []
for sub in subs:
if isinstance(sub, str):
res.append(dict(path=sub))
elif isinstance(sub, dict):
res.append(sub)
else:
raise ValueError(repr(sub)) # pragma: no cover
return res
def gitmodules(**submodules):
branches = {}
for k, v in submodules.items():
if k.endswith('_branch'):
branches[k.replace('_branch', '')] = v
for x in branches:
del submodules['{}_branch'.format(x)]
text = []
for k, v in submodules.items():
text.append(
'[submodule "{0}"]\n\tpath = {0}\n\turl = {1}\n'.format(
k, _url(v)
)
)
if k in branches:
text.append('\tbranch = {}\n'.format(branches[k]))
return api.step_data(
'read .gitmodules', api.file.read_text(''.join(text))
)
def properties(submodules, **kwargs):
new_kwargs = api.checkout.git_properties()
new_kwargs['submodules'] = submodules
new_kwargs.update(kwargs)
new_kwargs.setdefault(
'auto_roller_options',
{'dry_run': True, 'remote': api.checkout.pigweed_repo,},
)
return api.properties(**new_kwargs)
def commit_data(name, **kwargs):
return api.roll_util.commit_data(
name,
api.roll_util.commit('a' * 40, 'foo\nbar\n\nChange-Id: I1111'),
**kwargs,
)
yield (
api.status_check.test('success')
+ properties(
submodules('a1', 'b2'),
cc_authors_on_rolls=True,
always_cc=True,
forge_author=True,
)
+ commit_data('a1', prefix='')
+ commit_data('b2', prefix='')
+ gitmodules(a1='sso://foo/a1', b2='sso://foo/b2')
+ api.roll_util.forward_roll('a1')
+ api.roll_util.forward_roll('b2')
+ api.auto_roller.dry_run_success()
)
yield (
api.status_check.test('partial_noop')
+ properties(submodules('a1', 'b2'), cc_reviewers_on_rolls=True)
+ commit_data('a1', prefix='')
+ gitmodules(a1='sso://foo/a1', b2='sso://foo/b2')
+ api.roll_util.forward_roll('a1')
+ api.roll_util.noop_roll('b2')
+ api.auto_roller.dry_run_success()
)
yield (
api.status_check.test('noop')
+ properties(submodules('a1', {'path': 'b2'}))
+ gitmodules(a1='a1', b2='b2', b2_branch='branch')
+ api.roll_util.noop_roll('a1')
+ api.roll_util.noop_roll('b2')
)
yield (
api.status_check.test('missing', status='failure')
+ properties(submodules('a1', 'b2'), cc_authors_on_rolls=True)
+ gitmodules(a1='sso://foo/a1')
)