| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Roll submodules of a git repository.""" |
| |
| from __future__ import annotations |
| |
| import configparser |
| import dataclasses |
| import io |
| import re |
| from typing import Generator, TYPE_CHECKING |
| |
| import attrs |
| from PB.recipes.pigweed.submodule_roller import InputProperties, Submodule |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from recipe_engine import config_types, post_process, recipe_test_api |
| |
| DEPS = [ |
| 'fuchsia/auto_roller', |
| 'fuchsia/git', |
| 'pigweed/checkout', |
| 'pigweed/roll_util', |
| 'recipe_engine/context', |
| 'recipe_engine/file', |
| 'recipe_engine/properties', |
| 'recipe_engine/step', |
| ] |
| |
| PROPERTIES = InputProperties |
| |
| |
| @dataclasses.dataclass |
| class _Submodule: |
| path: str |
| name: str |
| branch: str |
| remote: str = dataclasses.field(default=None) |
| dir: config_types.Path = dataclasses.field(default=None) |
| |
| |
| @dataclasses.dataclass |
| class _RevisionChange: |
| old: str |
| new: str |
| |
| |
| def _update_submodule(api, checkout, path, new_revision): |
| with api.context(cwd=checkout.top): |
| api.git.submodule_update( |
| paths=(path,), |
| timeout=checkout.options.submodule_timeout_sec, |
| ) |
| |
| old_revision = api.checkout.get_revision( |
| path, 'get old revision', test_data='1' * 40 |
| ) |
| |
| with api.context(cwd=path): |
| api.git('git fetch', 'fetch', 'origin', new_revision) |
| api.git('git checkout', 'checkout', 'FETCH_HEAD') |
| |
| # In case new_revision is a branch name we need to retrieve the hash it |
| # resolved to. |
| if not re.search(r'^[0-9a-f]{40}$', new_revision): |
| new_revision = api.checkout.get_revision( |
| path, 'get new revision', test_data='2' * 40 |
| ) |
| |
| return _RevisionChange(old=old_revision, new=new_revision) |
| |
| |
| def RunSteps(api, props): # pylint: disable=invalid-name |
| submodules = [ |
| _Submodule(path=x.path, name=x.name, branch=x.branch) |
| for x in props.submodules |
| ] |
| cc_authors_on_rolls = props.cc_authors_on_rolls |
| cc_reviewers_on_rolls = props.cc_reviewers_on_rolls |
| cc_domains = props.cc_domains |
| always_cc = props.always_cc |
| |
| # The checkout module will try to use trigger data to pull in a specific |
| # patch. Since the triggering commit is in a different repository that |
| # needs to be disabled. |
| props.checkout_options.use_trigger = False |
| checkout = api.checkout(props.checkout_options) |
| |
| # Confirm the given path is actually a submodule. |
| gitmodules = api.file.read_text( |
| 'read .gitmodules', checkout.root / '.gitmodules' |
| ) |
| # Example .gitmodules file: |
| # [submodule "third_party/pigweed"] |
| # path = third_party/pigweed |
| # url = https://pigweed.googlesource.com/pigweed/pigweed |
| |
| # configparser doesn't like leading whitespace on lines, despite what its |
| # documentation says. |
| gitmodules = re.sub(r'\n\s+', '\n', gitmodules) |
| parser = configparser.RawConfigParser() |
| parser.readfp(io.StringIO(gitmodules)) |
| |
| rolls = {} |
| |
| for submodule in submodules: |
| if not submodule.name: |
| submodule.name = submodule.path |
| submodule.dir = checkout.root / submodule.path |
| |
| with api.step.nest(submodule.name) as pres: |
| section = f'submodule "{submodule.name}"' |
| if not parser.has_section(section): |
| sections = parser.sections() |
| submodules = sorted( |
| re.sub(r'^.*"(.*)"$', r'\1', x) for x in sections |
| ) |
| raise api.step.StepFailure( |
| 'no submodule "{}" (submodules: {})'.format( |
| submodule.name, |
| ', '.join('"{}"'.format(x) for x in submodules), |
| ) |
| ) |
| |
| if not submodule.branch: |
| try: |
| submodule.branch = parser.get(section, 'branch') |
| except configparser.NoOptionError: |
| submodule.branch = 'main' |
| |
| submodule.remote = api.roll_util.normalize_remote( |
| parser.get(section, 'url'), |
| checkout.options.remote, |
| ) |
| |
| change = _update_submodule( |
| api, checkout, submodule.dir, submodule.branch |
| ) |
| |
| direction = api.roll_util.get_roll_direction( |
| submodule.dir, change.old, change.new |
| ) |
| |
| # If the primary roll is not necessary or is backwards we can exit |
| # immediately and don't need to check deps. |
| if api.roll_util.can_roll(direction): |
| rolls[submodule.path] = api.roll_util.create_roll( |
| project_name=str(submodule.path), |
| old_revision=change.old, |
| new_revision=change.new, |
| proj_dir=submodule.dir, |
| direction=direction, |
| nest_steps=False, |
| ) |
| |
| else: |
| pres.step_summary_text = 'no roll required' |
| api.roll_util.skip_roll_step( |
| submodule.remote, change.old, change.new |
| ) |
| |
| if not rolls: |
| with api.step.nest('nothing to roll, exiting'): |
| return |
| |
| authors = api.roll_util.authors(*rolls.values()) |
| num_commits = sum(len(x.commits) for x in rolls.values()) |
| |
| max_commits_for_ccing = props.max_commits_for_ccing or 10 |
| if num_commits <= max_commits_for_ccing: |
| cc = set() |
| if cc_authors_on_rolls: |
| cc.update(authors) |
| if cc_reviewers_on_rolls: |
| cc.update(api.roll_util.reviewers(*rolls.values())) |
| |
| def include_cc(email): |
| return api.roll_util.include_cc( |
| email, cc_domains, checkout.gerrit_host() |
| ) |
| |
| # include_cc() writes steps, so we want things sorted before calling it. |
| cc = sorted(set(cc)) |
| cc_emails = [x.email for x in cc if include_cc(x)] |
| |
| if always_cc: |
| props.auto_roller_options.cc_emails.extend(cc_emails) |
| else: |
| props.auto_roller_options.cc_on_failure_emails.extend(cc_emails) |
| |
| author_override = None |
| with api.step.nest('authors') as pres: |
| pres.step_summary_text = repr(authors) |
| if len(authors) == 1 and props.forge_author: |
| author_override = attrs.asdict( |
| api.roll_util.fake_author(next(iter(authors))) |
| ) |
| |
| # merge auto_roller_options and override_auto_roller_options. |
| complete_auto_roller_options = api.roll_util.merge_auto_roller_overrides( |
| props.auto_roller_options, props.override_auto_roller_options |
| ) |
| |
| change = api.auto_roller.attempt_roll( |
| complete_auto_roller_options, |
| repo_dir=checkout.root, |
| commit_message=api.roll_util.message(*rolls.values()), |
| author_override=author_override, |
| ) |
| |
| return api.auto_roller.raw_result(change) |
| |
| |
| def GenTests(api) -> Generator[recipe_test_api.TestData, None, None]: |
| """Create tests.""" |
| |
| def _url(x): |
| if x.startswith(('https://', 'sso://', '.')): |
| return x |
| return 'https://foo.googlesource.com/' + x |
| |
| def submodules(*subs): |
| res = [] |
| for sub in subs: |
| if isinstance(sub, str): |
| res.append(Submodule(path=sub)) |
| elif isinstance(sub, dict): |
| res.append(Submodule(**sub)) |
| else: |
| raise ValueError(repr(sub)) # pragma: no cover |
| return res |
| |
| def gitmodules(**submodules): |
| branches = {} |
| for k, v in submodules.items(): |
| if k.endswith('_branch'): |
| branches[k.replace('_branch', '')] = v |
| |
| for x in branches: |
| del submodules[f'{x}_branch'] |
| |
| text = [] |
| for k, v in submodules.items(): |
| text.append( |
| '[submodule "{0}"]\n\tpath = {0}\n\turl = {1}\n'.format( |
| k, _url(v) |
| ) |
| ) |
| if k in branches: |
| text.append(f'\tbranch = {branches[k]}\n') |
| |
| return api.step_data( |
| 'read .gitmodules', api.file.read_text(''.join(text)) |
| ) |
| |
| def properties(submodules, **kwargs): |
| props = InputProperties(**kwargs) |
| props.checkout_options.CopyFrom(api.checkout.git_options()) |
| props.submodules.extend(submodules) |
| props.auto_roller_options.dry_run = True |
| props.auto_roller_options.remote = api.checkout.pigweed_repo |
| return api.properties(props) |
| |
| def commit_data(name, **kwargs): |
| return api.roll_util.commit_data( |
| name, |
| api.roll_util.commit('a' * 40, 'foo\nbar\n\nChange-Id: I1111'), |
| **kwargs, |
| ) |
| |
| yield ( |
| api.test('success') |
| + properties( |
| submodules('a1', 'b2'), |
| cc_authors_on_rolls=True, |
| always_cc=True, |
| forge_author=True, |
| ) |
| + commit_data('a1', prefix='') |
| + commit_data('b2', prefix='') |
| + gitmodules(a1='sso://foo/a1', b2='sso://foo/b2') |
| + api.roll_util.forward_roll('a1') |
| + api.roll_util.forward_roll('b2') |
| + api.auto_roller.dry_run_success() |
| ) |
| |
| yield ( |
| api.test('partial_noop') |
| + properties(submodules('a1', 'b2'), cc_reviewers_on_rolls=True) |
| + commit_data('a1', prefix='') |
| + gitmodules(a1='sso://foo/a1', b2='sso://foo/b2') |
| + api.roll_util.forward_roll('a1') |
| + api.roll_util.noop_roll('b2') |
| + api.auto_roller.dry_run_success() |
| ) |
| |
| yield ( |
| api.test('noop') |
| + properties(submodules('a1', {'path': 'b2'})) |
| + gitmodules(a1='a1', b2='b2', b2_branch='branch') |
| + api.roll_util.noop_roll('a1') |
| + api.roll_util.noop_roll('b2') |
| ) |
| |
| yield ( |
| api.test('missing', status='FAILURE') |
| + properties(submodules('a1', 'b2'), cc_authors_on_rolls=True) |
| + gitmodules(a1='sso://foo/a1') |
| ) |