| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Update an Android Repo Tool project.""" |
| |
| from __future__ import annotations |
| |
| import collections |
| import dataclasses |
| import json |
| import re |
| from typing import Sequence, TYPE_CHECKING |
| import urllib |
| import xml.etree.ElementTree |
| |
| from PB.recipe_modules.pigweed.checkout.options import ( |
| Options as CheckoutOptions, |
| ) |
| from recipe_engine import recipe_api |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from recipe_engine import config_types |
| from RECIPE_MODULES.pigweed.checkout import api as checkout_api |
| from PB.recipe_modules.pigweed.repo_roll import Project |
| |
| |
| class _TreeBuilder(xml.etree.ElementTree.TreeBuilder): |
| def comment(self, data): |
| self.start(xml.etree.ElementTree.Comment, {}) |
| self.data(data) |
| self.end(xml.etree.ElementTree.Comment) |
| |
| |
| @dataclasses.dataclass |
| class RevisionChange: |
| dir: config_types.Path |
| old: str |
| new: str |
| direction: roll_util_api.Direction |
| |
| |
| class RepoRollApi(recipe_api.RecipeApi): |
| """Update an Android Repo Tool project.""" |
| |
| def is_branch(self, revision): |
| """Return True if revision appears to be a branch name.""" |
| if re.search(r'^[0-9a-fA-F]{40}$', revision): |
| return False |
| return not revision.startswith('refs/tags/') |
| |
| # ElementTree orders attributes differently in Python 2 and 3, but if given |
| # attributes in a specific order it preserves that order. |
| def _order_attributes(self, root): |
| for el in root.iter(): |
| if len(el.attrib) > 1: |
| new_attrib = sorted(el.attrib.items()) |
| el.attrib.clear() |
| el.attrib.update(new_attrib) |
| |
| def update_project( |
| self, |
| checkout: checkout_api.CheckoutContext, |
| project: Project, |
| ) -> RevisionChange | None: |
| new_revision = None |
| |
| # Try to get new_revision from the trigger. |
| bb_remote = None |
| commit = self.m.buildbucket.build.input.gitiles_commit |
| if commit and commit.project: |
| new_revision = commit.id |
| host = commit.host |
| bb_remote = f'https://{host}/{commit.project}' |
| |
| tree = _TreeBuilder() |
| parser = xml.etree.ElementTree.XMLParser(target=tree) |
| parser.feed( |
| self.m.file.read_text('read manifest', checkout.manifest_path) |
| ) |
| parser.close() |
| root = tree.close() |
| |
| defaults = {} |
| for default in root.findall('default'): |
| defaults.update(default.attrib) |
| |
| remotes = {} |
| for rem in root.findall('remote'): |
| remotes[rem.attrib['name']] = rem.attrib |
| |
| # Search for path_to_update and check the repository it refers to |
| # matches the triggering repository. This check is mostly a config check |
| # and shouldn't fail in real runs unless somebody moves around project |
| # paths in the manifest without updating the builder definition. |
| paths_found = [] |
| proj = None |
| proj_attrib = {} |
| |
| for proj in root.findall('project'): |
| # Sometimes projects don't define something in which case we need to |
| # fall back on the remote specified by the project or the default |
| # for the entire manifest. |
| proj_attrib = {} |
| proj_attrib.update(defaults) |
| remote_name = proj.attrib.get( |
| 'remote', defaults.get('remote', None) |
| ) |
| assert remote_name |
| proj_attrib.update(remotes[remote_name]) |
| proj_attrib.update(proj.attrib) |
| proj_attrib['fetch'] = self.m.sso.sso_to_https(proj_attrib['fetch']) |
| |
| # Apparently if path is left off name is used. This is mildly |
| # infuriating, but easy to work around. |
| if 'path' not in proj_attrib: |
| proj_attrib['path'] = proj_attrib['name'] |
| |
| paths_found.append(proj_attrib['path']) |
| if proj_attrib['path'] == project.path_to_update: |
| fetch_host = proj_attrib['fetch'].strip('/') |
| if fetch_host.startswith('..'): |
| fetch_host = urllib.parse.urljoin( |
| checkout.options.remote, |
| fetch_host, |
| ) |
| |
| manifest_remote = ( |
| fetch_host.rstrip('/') |
| + '/' |
| + proj_attrib['name'].strip('/') |
| ) |
| if bb_remote and not checkout.remotes_equivalent( |
| manifest_remote, bb_remote |
| ): |
| raise self.m.step.StepFailure( |
| f"repo paths don't match: {manifest_remote!r} from " |
| f'manifest and {bb_remote!r} from buildbucket' |
| ) |
| break |
| |
| # Reset proj_attrib to None if this entry wasn't a match, so if this |
| # is the last iteration the condition a few lines down will work. |
| proj_attrib = {} |
| |
| if not proj_attrib: |
| raise self.m.step.StepFailure( |
| 'cannot find "{}" in manifest (found {})'.format( |
| project.path_to_update, |
| ', '.join('"{}"'.format(x) for x in paths_found), |
| ) |
| ) |
| |
| if 'upstream' in proj_attrib: |
| proj_branch = proj_attrib['upstream'] |
| elif 'revision' in proj_attrib and self.is_branch( |
| proj_attrib['revision'] |
| ): |
| proj_branch = proj_attrib['revision'] |
| else: |
| proj_branch = 'main' |
| |
| # If we still don't have a revision then it wasn't in the trigger. |
| # (Perhaps this was manually triggered.) In this case we need to |
| # determine the latest revision of this repository. The use_trigger flag |
| # should have no effect but using it to be explicit. Checking out even |
| # if not needed so commit messages can be collected later. |
| proj_dir = self.m.path.start_dir / 'project' |
| proj_checkout = self.m.checkout( |
| CheckoutOptions( |
| remote=manifest_remote, branch=proj_branch, use_trigger=False |
| ), |
| root=proj_dir, |
| ) |
| if new_revision is None: |
| new_revision = self.m.checkout.get_revision(proj_dir) |
| |
| assert new_revision |
| new_revision = str(new_revision) |
| |
| # If upstream not set we may be transitioning from tracking a branch to |
| # rolling. In that case set upstream to be the revision, but only if the |
| # revision appears to be a branch. |
| if 'upstream' not in proj_attrib: |
| if self.is_branch(proj_attrib['revision']): |
| # Explicitly update both proj.attrib and proj_attrib to minimize |
| # confusion if these statements are moved around later. |
| proj.attrib['upstream'] = proj_attrib['revision'] |
| proj_attrib['upstream'] = proj.attrib['upstream'] |
| else: |
| raise self.m.step.StepFailure( |
| 'upstream not set and revision is not a branch, aborting' |
| ) |
| |
| old_revision = proj_attrib['revision'] |
| # Explicitly update both proj.attrib and proj_attrib to minimize |
| # confusion. |
| proj_attrib['revision'] = proj.attrib['revision'] = new_revision |
| |
| direction = self.m.roll_util.Direction.FORWARD |
| if not self.is_branch(old_revision): |
| direction = self.m.roll_util.get_roll_direction( |
| proj_dir, old_revision, new_revision |
| ) |
| |
| if not self.m.roll_util.can_roll(direction): |
| self.m.roll_util.skip_roll_step( |
| manifest_remote, old_revision, new_revision |
| ) |
| return |
| |
| self._order_attributes(root) |
| |
| self.m.file.write_text( |
| 'write manifest', |
| checkout.manifest_path, |
| '<?xml version="1.0" encoding="UTF-8"?>\n{}\n'.format( |
| xml.etree.ElementTree.tostring(root).decode(), |
| ), |
| ) |
| |
| return RevisionChange( |
| dir=proj_dir, |
| old=old_revision, |
| new=new_revision, |
| direction=direction, |
| ) |