| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Calls to checkout code. |
| |
| Usage: |
| api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed') |
| """ |
| |
| from __future__ import annotations |
| |
| import collections |
| import contextlib |
| import re |
| from typing import TYPE_CHECKING |
| import urllib |
| import xml.etree.ElementTree |
| |
| import attrs |
| from PB.go.chromium.org.luci.buildbucket.proto import ( |
| build as build_pb2, |
| common as common_pb2, |
| ) |
| from PB.go.chromium.org.luci.scheduler.api.scheduler.v1 import ( |
| triggers as triggers_pb2, |
| ) |
| from PB.recipe_modules.pigweed.checkout.options import Options |
| from recipe_engine import recipe_api |
| |
| if TYPE_CHECKING: # pragma: no cover |
| from typing import Any, Sequence |
| from recipe_engine import config_types |
| |
| PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed' |
| |
| |
| def to_dict(obj) -> dict[str, Any]: |
| try: |
| # Modifications to the dict returned by the built-in vars() function |
| # modify the original data structure. Always create a copy for this |
| # function to return. |
| return __builtins__['vars'](obj).copy() |
| except TypeError: |
| keys = [x for x in obj.__slots__ if not x.startswith('__')] |
| return {k: getattr(obj, k) for k in keys} |
| |
| |
| @attrs.define |
| class Manifest: |
| remotes: dict[str, 'Remote'] = attrs.Factory(dict) |
| projects: list['Project'] = attrs.Factory(list) |
| |
| def dict(self) -> dict[str, Any]: |
| return { |
| 'remotes': {k: v.dict() for k, v in self.remotes.items()}, |
| 'projects': [x.dict() for x in self.projects], |
| } |
| |
| |
| class Url: |
| def __init__(self, url: str, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self.url: str = url |
| self.https: str | None = None |
| |
| def dict(self) -> dict[str, Any]: |
| return to_dict(self) |
| |
| |
| @attrs.define |
| class Remote: |
| """Remote config from manifest.""" |
| |
| name: str |
| fetch: Url |
| review: str | None = None |
| revision: str | None = None |
| alias: str | None = None |
| |
| def dict(self) -> dict[str, Any]: |
| res = to_dict(self) |
| res['fetch'] = res['fetch'].dict() |
| return res |
| |
| |
| @attrs.define |
| class Project: |
| """Key variables describing a repository/project.""" |
| |
| name: str |
| path: str |
| remote: str |
| revision: str |
| upstream: str |
| url: str | None = None |
| |
| def path_object(self, root: config_types.Path) -> config_types.Path: |
| return root / self.path |
| |
| def dict(self) -> dict[str, Any]: |
| return to_dict(self) |
| |
| |
| def _str_or_none(x: Any | None) -> str | None: |
| if x is None: |
| return x |
| return str(x) |
| |
| |
| def _int_or_none(x: Any | None) -> int | None: |
| if x is None: |
| return x |
| return int(x) |
| |
| |
| @attrs.define |
| class Change: |
| """Data from buildbucket.""" |
| |
| number: int = attrs.field(converter=int) |
| remote: str | None = attrs.field(converter=_str_or_none) |
| ref: str | None = attrs.field(converter=_str_or_none) |
| rebase: bool | None = None |
| project: str | None = None |
| branch: str | None = attrs.field(converter=_str_or_none, default=None) |
| gerrit_name: str | None = attrs.field(converter=_str_or_none, default=None) |
| submitted: bool = False |
| patchset: int | None = attrs.field(converter=_int_or_none, default=None) |
| applied: bool = attrs.field(default=False, repr=False) |
| path: str | None = None |
| base: str | None = attrs.field(converter=_str_or_none, default=None) |
| base_type: str | None = attrs.field(converter=_str_or_none, default=None) |
| is_merge: bool = attrs.field(default=False) |
| commit_message: str = attrs.field(default='') |
| topic: str | None = None |
| current_revision: str | None = None |
| |
| @property |
| def gerrit_host(self) -> str: |
| return f'https://{self.gerrit_name}-review.googlesource.com' |
| |
| @property |
| def gerrit_url(self) -> str: |
| if not self.number: |
| return self.gitiles_url |
| return f'{self.gerrit_host}/c/{self.number}' |
| |
| @property |
| def gitiles_url(self) -> str: |
| return f'{self.remote}/+/{self.ref}' |
| |
| @property |
| def name(self) -> str: |
| return f'{self.gerrit_name}:{self.number}' |
| |
| @property |
| def name_with_path(self) -> str: |
| return f'{self.name} ({self.path})' |
| |
| |
| @attrs.define |
| class Submodule: |
| """Submodule properties.""" |
| |
| api: recipe_api.RecipeApi = attrs.field(repr=False) |
| hash: str |
| relative_path: str |
| path: config_types.Path |
| name: str |
| describe: str |
| remote: str |
| initialized: bool |
| modified: bool |
| conflict: bool |
| branch: str |
| url: str |
| update: str |
| ignore: str |
| shallow: bool |
| fetchRecurseSubmodules: bool |
| describe: str |
| |
| def __lt__(self, other: 'Submodule') -> bool: |
| return (self.relative_path, self.url) < (other.relative_path, other.url) |
| |
| |
| @attrs.define |
| class StatusOfChanges: |
| """Changes that were applied or not applied.""" |
| |
| applied: tuple[Change, ...] |
| not_applied: tuple[Change, ...] |
| |
| |
| @attrs.define(slots=False) |
| class CheckoutContext: |
| _api: recipe_api.RecipeApi = attrs.field(repr=False) |
| options: Options | None = None |
| changes: list[Change] | None = None # List of triggering changes. |
| top: config_types.Path = None # Actual checkout root. |
| # Logical checkout root. Usually identical to 'top', but occasionally a |
| # subdirectory instead. |
| root: config_types.Path = None |
| # Which triggering changes were applied or not applied. |
| status: StatusOfChanges | None = None |
| # Remotes that should be treated identically. |
| equivalent_remotes: dict[str, list[str]] | None = attrs.field(factory=dict) |
| manifest: Manifest | None = None # Parsed repo manifest. |
| # Path to a JSON file containing metadata about the triggering changes. |
| changes_json: config_types.Path | None = None |
| bazel_overrides: dict[str, config_types.Path] = attrs.field(factory=dict) |
| |
| # Current revision number. |
| def revision(self) -> str: |
| if hasattr(self, '_revision'): |
| return self._revision |
| |
| self._revision = self._api.checkout.get_revision(self.root) |
| return self._revision |
| |
| @property |
| def manifest_path(self) -> config_types.Path: |
| return self.root / self.options.manifest_file |
| |
| def applied_changes(self) -> list[Change]: |
| return [x for x in self.changes if x.applied] |
| |
| # Repo manifest with all projects pinned. |
| def manifest_snapshot(self): |
| if not self.options.use_repo: |
| return None |
| |
| if hasattr(self, '_manifest_snapshot'): |
| return self._manifest_snapshot |
| |
| with self._api.context(cwd=self.top): |
| self._manifest_snapshot = self._api.repo.manifest_snapshot() |
| return self._manifest_snapshot |
| |
| # Equivalent of manifest_snapshot() but not as strictly formatted. |
| def submodule_snapshot(self): |
| if self.options.use_repo: |
| return None |
| |
| if hasattr(self, '_submodule_snapshot'): |
| return self._submodule_snapshot |
| |
| with self._api.context(cwd=self.root): |
| # To get step_test_data line to pass pylint. |
| raw_io_stream_output = self._api.raw_io.test_api.stream_output_text |
| |
| self._submodule_snapshot = ( |
| self._api.git( |
| 'submodule-status', |
| 'submodule', |
| 'status', |
| '--recursive', |
| stdout=self._api.raw_io.output_text(), |
| step_test_data=lambda: raw_io_stream_output( |
| 'submodule status filler text', |
| ), |
| ok_ret='any', |
| ).stdout.strip() |
| or '' |
| ) |
| return self._submodule_snapshot |
| |
| def snapshot_to_dir(self, directory: config_types.Path) -> None: |
| self._api.file.ensure_directory('mkdir', directory) |
| if self.manifest_snapshot(): |
| self._api.file.write_text( |
| 'write manifest.xml', |
| directory / 'manifest.xml', |
| self.manifest_snapshot(), |
| ) |
| |
| if self.submodule_snapshot(): |
| self._api.file.write_text( |
| 'write submodule snapshot', |
| directory / 'submodules.log', |
| self.submodule_snapshot(), |
| ) |
| |
| with self._api.context(cwd=self.root): |
| log = self._api.git( |
| 'log', |
| 'log', |
| '--oneline', |
| '-n', |
| '10', |
| stdout=self._api.raw_io.output_text(), |
| ok_ret='any', |
| ).stdout |
| self._api.file.write_text( |
| 'write git log', |
| directory / 'git.log', |
| log, |
| ) |
| |
| def submodules(self, recursive: bool = False) -> list[Submodule]: |
| """Return data about all submodules.""" |
| |
| cmd = [ |
| 'python3', |
| self._api.checkout.resource('submodule_status.py'), |
| self.root, |
| self._api.json.output(), |
| ] |
| |
| if recursive: |
| cmd.append('--recursive') |
| |
| submodules = [] |
| submodule_status = self._api.step( |
| 'submodule status', |
| cmd, |
| step_test_data=lambda: self._api.json.test_api.output({}), |
| ).json.output |
| for sub in submodule_status.values(): |
| sub['remote'] = self._api.sso.sso_to_https(sub['remote']) |
| if sub['remote'].endswith('.git'): |
| sub['remote'] = sub['remote'][:-4] |
| sub['relative_path'] = sub['path'] |
| sub['path'] = self.root / sub['path'] |
| submodules.append(Submodule(self._api, **sub)) |
| |
| return submodules |
| |
| _REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$') |
| |
| def gerrit_host(self) -> str | None: |
| match = self._REMOTE_REGEX.match(self.options.remote) |
| if not match: |
| return # pragma: no cover |
| |
| gerrit_review_host = f"{match.group('host')}" |
| if '-review' not in gerrit_review_host: |
| gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1) |
| return gerrit_review_host |
| |
| def gerrit_project(self) -> str | None: |
| match = self._REMOTE_REGEX.match(self.options.remote) |
| if not match: |
| return # pragma: no cover |
| |
| return match.group('project') |
| |
| def remotes_equivalent(self, remote1: str, remote2: str) -> bool: |
| # Sometimes remote1 or remote2 is None. In that case we shouldn't |
| # convert sso to https. |
| if remote1: |
| remote1 = self._api.sso.sso_to_https(remote1).removesuffix('.git') |
| if remote2: |
| remote2 = self._api.sso.sso_to_https(remote2).removesuffix('.git') |
| if remote1 == remote2: |
| return True |
| return remote1 in self.equivalent_remotes.get(remote2, ()) |
| |
| |
| class CheckoutApi(recipe_api.RecipeApi): |
| """Calls to checkout code.""" |
| |
| Change = Change |
| CheckoutContext = CheckoutContext |
| |
| def fake_context(self): # pragma: no cover |
| ctx = CheckoutContext(api=self.m) |
| ctx.top = ctx.root = self.m.path.start_dir / 'checkout' |
| ctx.options = Options(remote=self.test_api.pigweed_repo) |
| return ctx |
| |
| def _read_manifest( |
| self, manifest_remote: str, manifest_file: str |
| ) -> Manifest: |
| """Reads manifest file to get git repo locations.""" |
| |
| with self.m.step.nest('read manifest') as read_step: |
| manifest_text: str = self.m.file.read_text( |
| 'read file', manifest_file |
| ) |
| read_step.logs['raw'] = manifest_text |
| |
| xml_tree = xml.etree.ElementTree.fromstring(manifest_text) |
| |
| manifest = Manifest() |
| |
| for remote in xml_tree.iter('remote'): |
| with self.m.step.nest('log') as pres: |
| pres.step_summary_text = repr(remote.attrib) |
| remote = Remote(**remote.attrib) |
| if remote.fetch.startswith('..'): |
| rest = re.sub(r'^(..)(/..)*', '', remote.fetch) |
| parsed = urllib.parse.urlparse(manifest_remote) |
| remote.fetch = f'{parsed.scheme}://{parsed.netloc}' + rest |
| remote.fetch = Url(remote.fetch) |
| remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url) |
| manifest.remotes[remote.name] = remote |
| |
| defaults = {} |
| for default in xml_tree.iter('default'): |
| defaults.update(default.attrib) |
| |
| for project in xml_tree.iter('project'): |
| name = project.attrib['name'] |
| path = project.attrib.get('path', name) |
| |
| if 'remote' in project.attrib: |
| remote = project.attrib['remote'] |
| elif 'remote' in defaults: |
| remote = defaults['remote'] |
| else: # pragma: no cover |
| assert False, f'remote not specified for {name}' |
| |
| assert ( |
| remote in manifest.remotes |
| ), f'Remote {remote} does not exist' |
| |
| if 'revision' in project.attrib: |
| revision = project.attrib['revision'] |
| elif manifest.remotes[remote].revision: |
| revision = manifest.remotes[remote].revision |
| elif 'revision' in defaults: |
| revision = defaults['revision'] |
| else: # pragma: no cover |
| assert False, f'revision not specified for {name}' |
| |
| if 'upstream' in project.attrib: |
| upstream = project.attrib['upstream'] |
| elif 'upstream' in defaults: # pragma: no cover |
| # This is unlikely to be used and hard to test--it requires |
| # a completely separate manifest definition, otherwise the |
| # 'else' condition won't be covered. It's also simple. |
| upstream = defaults['upstream'] |
| else: |
| upstream = revision |
| |
| # urllib.urljoin does something different than what's desired |
| # here. |
| url = '/'.join( |
| ( |
| manifest.remotes[remote].fetch.https.rstrip('/'), |
| name.lstrip('/'), |
| ) |
| ) |
| manifest.projects.append( |
| Project( |
| name=name, |
| path=path, |
| remote=remote, |
| revision=revision, |
| upstream=upstream, |
| url=url, |
| ) |
| ) |
| |
| self.m.file.write_json( |
| 'manifest json', |
| self.m.path.start_dir / 'manifest.json', |
| manifest.dict(), |
| ) |
| |
| return manifest |
| |
| def _process_gerrit_change( |
| self, |
| ctx: CheckoutContext, |
| host: str, |
| change_id: str | int, |
| project: str = 'pigweed/pigweed', |
| patchset: int | None = None, |
| ) -> Change: |
| """Process a LUCI GerritChange and return a Change object.""" |
| |
| host = self.m.gerrit.normalize_host(host) |
| gitiles_host = host.replace( |
| '-review.googlesource.com', '.googlesource.com' |
| ) |
| gerrit_name = gitiles_host.split('.')[0] |
| details = self.m.gerrit.change_details( |
| 'details', |
| change_id=str(change_id), |
| host=host, |
| max_attempts=5, |
| query_params=[ |
| 'CURRENT_COMMIT', |
| 'CURRENT_REVISION', |
| ], |
| timeout=30, |
| test_data=self.m.json.test_api.output( |
| { |
| 'branch': 'main', |
| 'current_revision': 'f' * 40, |
| 'revisions': { |
| 'f' |
| * 40: { |
| '_number': 3, |
| 'commit': { |
| 'parents': [{}], |
| 'message': '', |
| }, |
| } |
| }, |
| 'project': project, |
| } |
| ), |
| ).json.output |
| branch = details['branch'] |
| |
| remote = f'https://{gitiles_host}/{details["project"]}'.strip('/') |
| rebase = not ctx.options.force_no_rebase |
| |
| current_revision = details['revisions'][details['current_revision']] |
| is_merge = len(current_revision['commit']['parents']) > 1 |
| if is_merge: |
| rebase = False |
| |
| if not patchset: |
| patchset = current_revision['_number'] |
| |
| ref = f'refs/changes/{change_id % 100:02}/{change_id}/{patchset}' |
| |
| return Change( |
| number=int(change_id), |
| patchset=patchset, |
| remote=remote, |
| ref=ref, |
| rebase=rebase, |
| is_merge=is_merge, |
| branch=branch, |
| gerrit_name=gerrit_name, |
| submitted=False, |
| commit_message=current_revision['commit']['message'], |
| project=details['project'], |
| topic=details.get('topic', None) or None, |
| current_revision=details['current_revision'], |
| ) |
| |
| def _process_gerrit_changes( |
| self, |
| ctx: CheckoutContext, |
| bb_input: build_pb2.Build.Input, |
| ) -> None: |
| seen = set() |
| for i, change in enumerate(bb_input.gerrit_changes): |
| with self.m.step.nest(str(i)): |
| result = self._process_gerrit_change( |
| ctx=ctx, |
| host=change.host, |
| project=change.project, |
| change_id=change.change, |
| patchset=change.patchset, |
| ) |
| yield result |
| seen.add(result.name) |
| |
| cq_deps_result = self.m.cq_deps.resolve( |
| result.gerrit_name, |
| result.number, |
| result.topic, |
| ) |
| for dep in cq_deps_result.resolved: |
| # dep.name should only appear in seen if there are multiple |
| # gerrit_changes from buildbucket and a later one depends on an |
| # earlier one. If buildbucket has multiple gerrit_changes the |
| # cq_deps module is not needed here, so this is just double-checking |
| # something that shouldn't happen. |
| if dep.name in seen: # pragma: no cover |
| continue |
| seen.add(dep.name) |
| yield self._process_gerrit_change( |
| ctx=ctx, |
| host=dep.host, |
| project=dep.project, |
| change_id=dep.change, |
| ) |
| |
| for cl in cq_deps_result.unresolved: |
| yield Change( |
| number=cl.change, |
| remote=None, |
| ref=None, |
| rebase=None, |
| project=None, |
| branch=None, |
| gerrit_name=cl.gerrit_name, |
| submitted=False, |
| ) |
| |
| def _number_details( |
| self, |
| host: str, |
| commit_hash: str, |
| branch: str = 'main', |
| ) -> dict[str, Any]: |
| if 'github.com' in host or 'github-review' in host: |
| return None # pragma: no cover |
| |
| try: |
| results = self.m.gerrit.change_query( |
| 'number', |
| f'commit:{commit_hash}', |
| host=host, |
| max_attempts=5, |
| timeout=30, |
| test_data=self.m.json.test_api.output( |
| [ |
| { |
| '_number': '1234', |
| 'branch': branch, |
| 'project': 'pigweed', |
| } |
| ] |
| ), |
| ).json.output |
| # Skip this change if it didn't go through Gerrit. |
| if results and len(results) == 1: |
| return results[0] |
| except self.m.step.StepFailure: # pragma: no cover |
| pass |
| |
| return None |
| |
| def _change_data( |
| self, |
| ctx: CheckoutContext, |
| remote: str = None, |
| branch: str = None, |
| ) -> tuple[Change, ...]: |
| bb_input: build_pb2.Build.Input = self.m.buildbucket.build.input |
| results: list[Change] = [] |
| |
| triggers: dict[str, dict[str, triggers_pb2.Trigger]] = ( |
| collections.defaultdict(dict) |
| ) |
| for trigger in self.m.scheduler.triggers: |
| gitiles: triggers_pb2.GitilesTrigger = trigger.gitiles |
| if gitiles: |
| triggers[gitiles.repo][gitiles.revision] = trigger |
| |
| with self.m.step.nest('change data'): |
| if bb_input.gerrit_changes: |
| with self.m.step.nest('process gerrit changes'): |
| results.extend(self._process_gerrit_changes(ctx, bb_input)) |
| |
| elif bb_input.gitiles_commit.id: |
| with self.m.step.nest('process gitiles commit'): |
| commit: common_pb2.GitilesCommit = bb_input.gitiles_commit |
| assert commit.host |
| if commit.project: |
| remote: str = f'https://{commit.host}/{commit.project}' |
| |
| host: str = commit.host.replace( |
| '.googlesource.com', '-review.googlesource.com' |
| ) |
| gerrit_name: str = commit.host.split('.')[0] |
| |
| result: dict[str, Any] = self._number_details( |
| host, commit.id |
| ) |
| |
| if result: |
| branch: str = result['branch'] |
| if commit.id in triggers[remote]: |
| branch = triggers[remote][commit.id].gitiles.ref |
| branch = branch.removeprefix('refs/heads/') |
| |
| results.append( |
| Change( |
| number=result['_number'], |
| remote=remote, |
| ref=commit.id, |
| rebase=False, |
| branch=branch, |
| gerrit_name=gerrit_name, |
| submitted=True, |
| project=result['project'], |
| current_revision=commit.id, |
| ) |
| ) |
| |
| if not results: |
| # If not triggered by a gitiles_poller gitiles_commit may be |
| # empty. In that case treat the most recent commit on the |
| # remote as the triggering commit. This is a good assumption |
| # except for Android Repo Tool projects, unless all projects |
| # are pinned to commits instead of tracking branches. However, |
| # even if this is wrong it's close enough to have utility. |
| head: str = self.m.git.get_remote_branch_head(remote, branch) |
| gerrit_name: str = urllib.parse.urlparse(remote).netloc.split( |
| '.' |
| )[0] |
| host: str = f'{gerrit_name}-review.googlesource.com' |
| result: dict[str, Any] = self._number_details(host, head) |
| |
| results.append( |
| Change( |
| number=result['_number'] if result else 0, |
| remote=remote, |
| ref=head, |
| rebase=False, |
| branch=result['branch'] if result else branch, |
| gerrit_name=gerrit_name, |
| project=None, |
| submitted=True, |
| ) |
| ) |
| |
| with self.m.step.nest('changes'): |
| for result in results: |
| with self.m.step.nest(result.name) as change_data_pres: |
| change_data_pres.step_summary_text = repr(result) |
| |
| return tuple(results) |
| |
| def _matching_branches( |
| self, |
| repo: str, |
| branches: Sequence[str], |
| name: str = 'has branch', |
| **kwargs, |
| ): |
| """Returns the subset of the given branches that exist on gitiles.""" |
| matches: set[str] = set() |
| with self.m.step.nest(name), self.m.context(infra_steps=True): |
| for branch in branches: |
| head: str = self.m.git.get_remote_branch_head( |
| repo, |
| branch, |
| step_name=f'git ls-remote {branch}', |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| '' |
| ), |
| **kwargs, |
| ) |
| with self.m.step.nest('head') as pres: |
| pres.step_summary_text = repr(head) |
| |
| if head: |
| matches.add(branch) |
| return sorted(matches) |
| |
| def _apply_change( |
| self, |
| ctx: CheckoutContext, |
| change: Change, |
| cwd: config_types.Path = None, |
| ): |
| """Applies the given change to the given directory. |
| |
| Args: |
| ctx: Checkout context object. |
| change: Change to apply. |
| cwd: Working directory, defaults to current directory. |
| """ |
| with self._apply_change_context(ctx=ctx, change=change, cwd=cwd): |
| pass |
| |
| @contextlib.contextmanager |
| def _apply_change_context( |
| self, |
| ctx: CheckoutContext, |
| change: Change, |
| cwd: config_types.Path = None, |
| ): |
| """Applies the given change to the given directory. |
| |
| Args: |
| ctx: Checkout context object. |
| change: Change to apply. |
| cwd: Working directory, defaults to current directory. |
| """ |
| kwargs: dict[str, Any] = {'cwd': cwd} if cwd else {} |
| change.applied = True |
| change.path = self.m.path.relpath(cwd or ctx.root, ctx.root) |
| |
| try: |
| |
| apply_step: str = f'apply {change.name}' |
| with self.m.context(**kwargs), self.m.step.nest(apply_step) as pres: |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| if cwd: |
| pres.step_summary_text = str( |
| self.m.path.relpath(cwd, ctx.root) |
| ) |
| |
| with self.m.context(infra_steps=True): |
| # 'git fetch' fails if a submodule pin in the patch isn't |
| # present in the remote (for example, if the pin is only |
| # present in the uploader's workspace). Use |
| # '--no-recurse-submodules' here so 'git fetch' doesn't fail |
| # but instead 'git rebase' or 'git submodule update' fails |
| # later (important because those are not infra steps). Also |
| # don't use '--recurse-submodules' in 'git checkout' for |
| # similar reasons. |
| with self.m.default_timeout(): |
| self.m.git.fetch( |
| change.remote, |
| change.ref, |
| recurse_submodules=False, |
| step_name='git fetch patch', |
| ) |
| self.m.git( |
| 'git checkout patch', |
| 'checkout', |
| '--force', |
| '-b', |
| 'working', |
| 'FETCH_HEAD', |
| ) |
| |
| # These remain unused if change.submitted is False. |
| remote: str | None = None |
| remote_branch: str | None = None |
| |
| with self.m.context(infra_steps=True): |
| # Change "https://foo.googlesource.com/bar" |
| # to "https___foo_googlesource_com_bar". |
| # In Android Repo Tool projects, the remote for the manifest |
| # is often configured in a way that seems incorrect. Instead |
| # of relying on it, create a whole new remote every time |
| # that is always correct. |
| remote = re.sub(r'[^\w]', '_', change.remote) |
| remote_branch = '/'.join((remote, change.branch)) |
| self.m.git( |
| 'git remote add', |
| 'remote', |
| 'add', |
| remote, |
| change.remote, |
| ) |
| |
| with self.m.default_timeout(): |
| self.m.git.fetch( |
| remote, |
| f'refs/heads/{change.branch}', |
| prune=False, |
| step_name='git fetch branch', |
| ) |
| |
| self.m.git( |
| 'git set upstream', |
| 'branch', |
| f'--set-upstream-to={remote_branch}', |
| ) |
| |
| if not change.submitted: |
| with self.m.context(infra_steps=True): |
| self.m.git( |
| 'pre-rebase log', 'log', '--oneline', '-n', '10' |
| ) |
| |
| if change.submitted: |
| change.base = self.m.git.rev_parse( |
| 'HEAD', |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| 'HEAD_' * 8, |
| ), |
| ) |
| change.base_type = 'submitted_commit_hash' |
| |
| elif change.rebase: |
| self.m.git('git rebase', 'rebase', remote_branch) |
| |
| change.base = self.m.git.rev_parse( |
| remote_branch, |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| 'REMOTE_BRANCH_' * 3, |
| ), |
| ) |
| change.base_type = 'remote_branch_tip' |
| |
| else: |
| change.base = self.m.git( |
| 'merge-base', |
| 'merge-base', |
| 'HEAD', |
| remote_branch, |
| stdout=self.m.raw_io.output_text(), |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| 'MERGEBASE_' * 4, |
| ), |
| ).stdout |
| change.base_type = 'merge-base' |
| |
| with self.m.context(infra_steps=True): |
| self.m.git( |
| 'post-rebase log', 'log', '--oneline', '-n', '10' |
| ) |
| |
| # In most cases this is redundant or unnecessary, but it |
| # shouldn't cause problems. It's necessary when a superproject |
| # CL is updating a submodule pin and we need to sync the |
| # submodule to the new revision. |
| with self.m.default_timeout(): |
| # See b/243673776 for why we detach before updating |
| # submodules. |
| self.m.git('detach', 'checkout', '--detach') |
| self.m.git.submodule_update( |
| recursive=True, |
| timeout=ctx.options.submodule_timeout_sec, |
| ) |
| self.m.git('reattach', 'checkout', '-') |
| |
| yield |
| |
| finally: |
| pass |
| |
| def _check_unapplied_changes(self, changes: Sequence[Change]): |
| applied: list[Change] = [] |
| failed_to_apply: list[Change] = [] |
| if not changes: # pragma: no cover |
| return None |
| |
| def handle_unapplied_change(change): |
| with self.m.step.nest(f'failed to apply {change.name}') as pres: |
| pres.status = 'WARNING' |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| failed_to_apply.append(change) |
| |
| with self.m.context(infra_steps=True): |
| if all(not x.applied for x in changes): |
| with self.m.step.nest('no changes were applied') as pres: |
| pres.status = 'FAILURE' |
| for change in changes: |
| handle_unapplied_change(change) |
| pres.properties['changes'] = [x.name for x in changes] |
| |
| raise self.m.step.InfraFailure( |
| 'could not find triggering changes in checkout' |
| ) |
| |
| elif any(not x.applied for x in changes): |
| with self.m.step.nest('some changes were not applied') as pres: |
| pres.status = 'WARNING' |
| for change in changes: |
| if change.applied: |
| applied.append(change) |
| else: |
| handle_unapplied_change(change) |
| |
| else: |
| applied.extend(changes) |
| |
| with self.m.step.nest('status') as pres: |
| pres.step_summary_text = ( |
| f'applied {applied}\nnot applied {failed_to_apply}' |
| ) |
| |
| return StatusOfChanges( |
| applied=tuple(applied), |
| not_applied=tuple(failed_to_apply), |
| ) |
| |
| def _cached_checkout( |
| self, |
| remote: str, |
| path: config_types.Path, |
| ref: str, |
| submodules: bool, |
| included_submodules: Sequence[str] | None = None, |
| excluded_submodules: Sequence[str] | None = None, |
| submodule_timeout_sec: int = 10 * 60, |
| cache: bool = True, |
| use_packfiles: bool = False, |
| **kwargs, |
| ): |
| submodule_paths = included_submodules = included_submodules or [] |
| |
| if cache: |
| with self.m.step.nest('cache') as pres, self.m.cache.guard('git'): |
| parsed_remote = urllib.parse.urlparse(remote) |
| cache_name = ( |
| parsed_remote.hostname |
| + parsed_remote.path.replace('-', '--').replace('/', '-') |
| ) |
| cache_path = self.m.path.cache_dir / 'git' / cache_name |
| self.m.file.ensure_directory('makedirs', cache_path) |
| |
| with self.m.context(cwd=cache_path): |
| dotgit = cache_path / '.git' |
| if self.m.path.exists(dotgit): # pragma: no cover |
| self.m.git.config_remove_section( |
| 'remote.origin', **kwargs |
| ) |
| pres.step_summary_text = 'hit' |
| else: |
| self.m.git.init(bare=False, **kwargs) |
| pres.step_summary_text = 'miss' |
| |
| self.m.git.config( |
| 'remote.origin.url', |
| remote, |
| step_name='remote set-url', |
| **kwargs, |
| ) |
| |
| if use_packfiles: |
| self.m.git.config( |
| 'fetch.uriprotocols', |
| 'https', |
| step_name='enable packfiles', |
| **kwargs, |
| ) |
| else: |
| self.m.git.config( |
| 'fetch.uriprotocols', |
| '', |
| step_name='disable packfiles', |
| **kwargs, |
| ) |
| |
| with self.m.default_timeout(): |
| try: |
| self.m.git.fetch( |
| repository='origin', |
| prune=True, |
| tags=True, |
| recurse_submodules=submodules, |
| **kwargs, |
| ) |
| |
| # If the checkout failed save the git config. It might |
| # not be helpful, but it shouldn't hurt. |
| except self.m.step.StepFailure as exc: |
| with self.m.step.nest('git config'): |
| self.m.git.config( |
| '--list', '--local', step_name='local' |
| ) |
| self.m.git.config( |
| '--list', '--global', step_name='global' |
| ) |
| raise |
| |
| self.m.git.merge(ref='FETCH_HEAD', **kwargs) |
| |
| if included_submodules and excluded_submodules: |
| raise self.m.step.InfraFailure( |
| 'cannot specify both included_submodules and ' |
| 'excluded_submodules' |
| ) |
| |
| submodule_paths = included_submodules |
| if excluded_submodules: |
| submodule_status = self.m.git( |
| 'submodule status', |
| 'submodule', |
| 'status', |
| stdout=self.m.raw_io.output_text(), |
| step_test_data=lambda: self.test_api.m.raw_io.stream_output_text( |
| '-0000000000000000000000000000000000000000 pigweed (abc123)\n' |
| '-1111111111111111111111111111111111111111 nanopb (heads/branch)\n' |
| ), |
| ).stdout.splitlines() |
| |
| submodule_paths = [ |
| x.split()[1] for x in submodule_status |
| ] |
| for sub in excluded_submodules: |
| if sub not in submodule_paths: |
| raise self.m.step.InfraFailure( |
| f'excluded submodule {sub} is not a submodule' |
| ) |
| with self.m.step.nest(f'excluding submodule {sub}'): |
| pass |
| submodule_paths.remove(sub) |
| |
| for sub in submodule_paths: |
| with self.m.step.nest(f'including submodule {sub}'): |
| pass |
| |
| if submodules or submodule_paths: |
| self.m.git.submodule_sync(recursive=True, **kwargs) |
| with self.m.default_timeout(): |
| self.m.git.submodule_update( |
| recursive=True, |
| force=True, |
| paths=submodule_paths, |
| timeout=submodule_timeout_sec, |
| **kwargs, |
| ) |
| |
| if not submodules: |
| # Even though submodules weren't requested, if the cache |
| # had any active submodules we need to update them. |
| # Otherwise we'll get weird situations in rolls where an |
| # uninvolved submodule will be rolled back. |
| with self.m.default_timeout(): |
| self.m.git.submodule_update( |
| recursive=True, |
| force=True, |
| init=False, |
| timeout=submodule_timeout_sec, |
| **kwargs, |
| ) |
| |
| self.m.file.copytree( |
| 'copy from cache', cache_path, path, symlinks=True |
| ) |
| |
| # Deliberately not combining contexts into one line so it's obvious to |
| # both devs and Python which one is "outer" and which is "inner". |
| with self.m.step.nest('git checkout'): |
| with self.m.default_timeout(): |
| self.m.git_checkout( |
| repo=remote, |
| path=path, |
| cache=False, |
| revision=ref, |
| recursive=submodules, |
| submodules=submodules, |
| submodule_force=submodules, |
| submodule_paths=submodule_paths, |
| submodule_timeout=submodule_timeout_sec, |
| step_name="", |
| use_packfiles=use_packfiles, |
| ) |
| |
| def _git(self, ctx: CheckoutContext): |
| """Checkout code from git.""" |
| |
| super_branch = self._matching_branch(ctx) or ctx.options.branch |
| |
| with self.m.context(infra_steps=True): |
| self._cached_checkout( |
| ctx.options.remote, |
| path=ctx.root, |
| ref=super_branch, |
| cache=not ctx.options.do_not_cache, |
| submodules=ctx.options.initialize_submodules, |
| submodule_timeout_sec=ctx.options.submodule_timeout_sec, |
| included_submodules=ctx.options.included_submodules, |
| excluded_submodules=ctx.options.excluded_submodules, |
| use_packfiles=not ctx.options.do_not_use_packfiles, |
| ) |
| |
| with self.m.context(cwd=ctx.root): |
| got_revision = None |
| got_revision_type = 'no_trigger' |
| |
| submodules = [] |
| |
| if ctx.options.use_trigger: |
| got_revision = self.m.git.rev_parse( |
| 'HEAD', |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| 'HEAD' * 10, |
| ), |
| ) |
| |
| # Check for CLs for the top-level repository. |
| for change in ctx.changes: |
| if ctx.remotes_equivalent( |
| ctx.options.remote, change.remote |
| ): |
| self._apply_change(ctx, change) |
| got_revision = change.base |
| got_revision_type = change.base_type |
| |
| submodules = ctx.submodules(recursive=True) |
| |
| # Check for CLs for submodules. |
| # There are three rough cases: |
| # 1. Zero submodules have matching remotes. In this case we do |
| # nothing. |
| # 2. Exactly one submodule has a matching remote. In this case, |
| # the change is applied to that submodule, even if the branch |
| # does not match. |
| # 3. Multiple submodules have matching remotes. |
| # 1. Exactly one of them matches the triggering change's |
| # branch. In this case, the change is applied to this |
| # submodule. |
| # 2. Zero or multiple submodules match both the remote and |
| # the branch of the triggering change. In these cases we |
| # error out. |
| for change in ctx.changes: |
| with self.m.step.nest(f'matching {change.name}') as pres: |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| |
| matching_submodules = [] |
| for submodule in submodules: |
| if submodule.initialized: |
| if ctx.remotes_equivalent( |
| submodule.remote, change.remote |
| ): |
| self.m.step.empty( |
| f'match: {submodule.path} ' |
| f'({submodule.remote})' |
| ) |
| matching_submodules.append(submodule) |
| else: |
| self.m.step.empty( |
| f'no match: {submodule.path} ' |
| f'({submodule.remote})' |
| ) |
| |
| if not matching_submodules: |
| pres.step_summary_text = 'no matching submodules' |
| continue |
| |
| if len(matching_submodules) > 1: |
| submodule_info = ', '.join( |
| f'{self.m.path.relpath(sub.path, ctx.root)} ' |
| f'(branch {sub.branch})' |
| for sub in matching_submodules |
| ) |
| |
| matching_with_branch = [] |
| for submodule in matching_submodules: |
| if submodule.branch == change.branch: |
| matching_with_branch.append(submodule) |
| |
| if len(matching_with_branch) == 1: |
| pres.step_summary_text = ( |
| 'one matching submodule' |
| ) |
| matching_submodules = matching_with_branch |
| |
| elif len(matching_with_branch) > 1: |
| pres.step_summary_text = ( |
| 'too many submodules match the branch' |
| ) |
| raise self.m.step.StepFailure( |
| f'change {change.name} (branch ' |
| f'{change.branch}) matches multiple ' |
| f'submodules ({submodule_info}), but too ' |
| 'many branches match' |
| ) |
| |
| else: |
| pres.step_summary_text = ( |
| 'zero submodules match the branch' |
| ) |
| raise self.m.step.StepFailure( |
| f'change {change.name} ' |
| f'(branch {change.branch}) matches ' |
| f'multiple submodules ({submodule_info}) ' |
| 'but no branches match' |
| ) |
| |
| if len(matching_submodules) == 1: |
| submodule = matching_submodules[0] |
| if not ctx.options.initialize_submodules: |
| with self.m.default_timeout(): |
| self.m.git.submodule_update( |
| paths=(submodule.path,) |
| ) |
| self._apply_change(ctx, change, cwd=submodule.path) |
| |
| def _vars_primitive_only(x): |
| return { |
| k: v |
| for k, v in to_dict(x).items() |
| if isinstance(v, (int, str, bool, type(None))) |
| } |
| |
| applied_changes = [ |
| _vars_primitive_only(x) for x in ctx.changes if x.applied |
| ] |
| ctx.changes_json = self.m.path.mkstemp() |
| self.m.file.write_json( |
| 'write changes.json', |
| ctx.changes_json, |
| applied_changes, |
| ) |
| |
| # Run git log for both the top-level checkout and every submodule. |
| with self.m.step.nest('git log'): |
| self.m.git(str(ctx.root), 'log', '--oneline', '-n', '10') |
| for submodule in sorted(submodules): |
| with self.m.context(cwd=submodule.path): |
| self.m.git( |
| str(submodule.path), |
| 'log', |
| '--oneline', |
| '-n', |
| '10', |
| ) |
| |
| if got_revision: |
| with self.m.step.nest('base') as pres: |
| pres.properties['got_revision'] = got_revision |
| # got_revision_type isn't needed by anything but helps |
| # explain why got_revision is the value it is. |
| pres.properties['got_revision_type'] = got_revision_type |
| |
| def _matching_branch(self, ctx: CheckoutContext): |
| """Return if there are manifest branches that match the triggering CLs. |
| |
| If the triggering change is on a branch name that is also present in the |
| manifest or superproject remote, use that branch when checking out the |
| project. |
| |
| Args: |
| ctx (CheckoutContext): Context object. |
| |
| Raises: |
| StepFailure if there are multiple matching branches. |
| |
| Returns: |
| One matching branch name, or None. |
| """ |
| if not ctx.options.match_branch or not ctx.options.use_trigger: |
| with self.m.step.nest('not matching branch names'): |
| return |
| |
| kind = 'manifest' if ctx.options.use_repo else 'superproject' |
| |
| manifest_branch = None |
| branch_names = sorted( |
| set( |
| x.branch |
| for x in ctx.changes |
| if x.branch not in ('master', 'main', None) |
| ) |
| ) |
| |
| if not branch_names: |
| with self.m.step.nest('no non-standard branch names'): |
| return |
| |
| with self.m.step.nest('branch names') as pres: |
| pres.step_summary_text = str(branch_names) |
| |
| matching_branches = self._matching_branches( |
| ctx.options.remote, branch_names, name=f'{kind} has branch' |
| ) |
| if not matching_branches: |
| with self.m.step.nest('no branch names match'): |
| return |
| |
| if len(matching_branches) > 1: |
| with self.m.step.nest( |
| f"too many matching branches ({', '.join(matching_branches)})" |
| ) as pres: |
| pres.step_summary_text = ( |
| "Can't figure out which {} branch to use. Remove some " |
| '"Requires:" lines to simplify the checkout.'.format(kind) |
| ) |
| raise self.m.step.StepFailure('multiple matching branches') |
| |
| manifest_branch = matching_branches.pop() |
| self.m.step( |
| f'changing {kind} branch to {manifest_branch}', |
| None, |
| ) |
| return manifest_branch |
| |
| def _repo(self, ctx: CheckoutContext): |
| """Checkout code from an Android Repo Tool manifest.""" |
| |
| # Git makes the top-level folder, Repo requires caller to make it. |
| self.m.file.ensure_directory('mkdir checkout', ctx.root) |
| |
| with self.m.context(cwd=ctx.root): |
| manifest_branch = self._matching_branch(ctx) or ctx.options.branch |
| |
| with self.m.context(infra_steps=True): |
| kwargs = {} |
| if ctx.options.repo_init_timeout_sec: |
| kwargs['timeout'] = ctx.options.repo_init_timeout_sec |
| kwargs['attempts'] = ctx.options.number_of_attempts |
| if ctx.options.manifest_groups: |
| kwargs['groups'] = ctx.options.manifest_groups |
| |
| self.m.repo.init( |
| manifest_url=ctx.options.remote, |
| manifest_branch=manifest_branch, |
| manifest_name=ctx.options.manifest_file, |
| clone_bundle=not ctx.options.repo_no_clone_bundle, |
| **kwargs, |
| ) |
| |
| manifests_dir = ctx.root / '.repo' / 'manifests' |
| # If the triggering CL is a manifest change, apply it before running |
| # sync. |
| if ctx.options.use_trigger: |
| for change in ctx.changes: |
| if change.remote and ctx.remotes_equivalent( |
| ctx.options.remote, change.remote |
| ): |
| with self._apply_change_context( |
| ctx, change, cwd=manifests_dir |
| ): |
| # Right now the upstream of 'working' is the local |
| # 'default' branch. 'repo sync' complains if the |
| # upstream isn't remote, so it's changed to the |
| # remote branch that's identical to 'default'. |
| self.m.git( |
| 'git branch', |
| 'branch', |
| f'--set-upstream-to=origin/{manifest_branch}', |
| ) |
| |
| ctx.manifest = self._read_manifest( |
| ctx.options.remote, |
| manifests_dir / ctx.options.manifest_file, |
| ) |
| |
| for _, remote_host in sorted(ctx.manifest.remotes.items()): |
| if remote_host.fetch.url.startswith('sso://'): |
| self.m.sso.configure_insteadof(remote_host.fetch.url) |
| |
| with self.m.context(infra_steps=True): |
| kwargs = {} |
| if ctx.options.repo_sync_timeout_sec: |
| kwargs['timeout'] = ctx.options.repo_sync_timeout_sec |
| kwargs['attempts'] = ctx.options.number_of_attempts |
| self.m.repo.sync( |
| force_sync=True, current_branch=True, jobs=2, **kwargs |
| ) |
| self.m.repo.start('base') |
| |
| if ctx.options.use_trigger: |
| for change in ctx.changes: |
| for entry in ctx.manifest.projects: |
| if ctx.remotes_equivalent(entry.url, change.remote): |
| with self._apply_change_context( |
| ctx, |
| change, |
| cwd=entry.path_object(ctx.root), |
| ): |
| with self.m.step.nest( |
| 'compare branch name' |
| ) as pres: |
| pres.step_summary_text = ( |
| 'CL branch: {}\nupstream branch: {}' |
| ).format(change.branch, entry.upstream) |
| |
| # Some dependent projects have everything inside one top-level folder |
| # in their repo workspace. For those projects pretend that top-level |
| # folder is actually the checkout root. The top member will always |
| # point to the actual repo workspace root. |
| ctx.top = ctx.root |
| files = set(self.m.file.listdir('ls', ctx.root)) |
| dotrepo = ctx.root / '.repo' |
| if dotrepo in files: |
| files.remove(dotrepo) |
| orig_root = ctx.root |
| if len(files) == 1: |
| ctx.root = files.pop() |
| |
| def _workspace(self, ctx: CheckoutContext): |
| if not ctx.options.eligible_workspace_paths or all( |
| x.applied for x in ctx.changes |
| ): |
| return |
| |
| repos_dir = self.m.path.start_dir / 'bazel_repos' |
| self.m.file.ensure_directory(f'mkdir {repos_dir}', repos_dir) |
| |
| with self.m.step.nest('workspace'): |
| workspace_changed = False |
| |
| for change in ctx.changes: |
| if change.applied: |
| continue # pragma: no cover |
| |
| with self.m.step.nest(change.name) as pres: |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| |
| for workspace in ctx.options.eligible_workspace_paths: |
| workspace_path = ctx.root / workspace |
| self.m.path.mock_add_file(workspace_path) |
| if not self.m.path.isfile(workspace_path): |
| continue # pragma: no cover |
| |
| with self.m.step.nest(workspace): |
| |
| repos = self.m.bazel_roll.retrieve_git_repository_attributes( |
| checkout=ctx, |
| project_remote=change.remote, |
| path=workspace_path, |
| ) |
| |
| if not repos: |
| continue |
| |
| # Things will be much simpler if we assume all |
| # entries with a url matching the change have |
| # identical remote URLs. |
| remotes = list(set(x['remote'] for x in repos)) |
| assert len(remotes) == 1 |
| remote = remotes[0] |
| |
| name = remote |
| name = name.replace('http://', '') |
| name = name.replace('https://', '') |
| name = name.replace('sso://', '') |
| name = name.replace('.git.corp.google.com', '') |
| name = name.replace('.googlesource.com', '') |
| name = name.replace('/', '_') |
| path = repos_dir / name |
| |
| self.m.git_checkout( |
| remote, |
| path=path, |
| step_name=f'checkout {name}', |
| ignore_build_input=True, |
| cache=False, |
| ) |
| |
| self._apply_change(ctx, change, path) |
| |
| for repo in repos: |
| if 'strip_prefix' in repo: |
| path = path / repo['strip_prefix'] |
| ctx.bazel_overrides[repo['name']] = path |
| |
| def _configure_insteadof(self, ctx: CheckoutContext): |
| """Configure git to use some urls in place of others.""" |
| if not ctx.options.rewrites: |
| return |
| |
| with self.m.step.nest('insteadof'): |
| for rewrite in ctx.options.rewrites: |
| self.m.git( |
| f"{rewrite.original} to {rewrite.final}", |
| "config", |
| "--global", |
| "--add", |
| f"url.{rewrite.final}.insteadof", |
| rewrite.original, |
| ) |
| |
| self.m.git("rewrites", "config", "--get-regexp", "^url.*") |
| |
| def _name(self, options: Options): |
| """Turn "https://foo/bar/baz.git" into "baz".""" |
| name = options.remote.rstrip('/').removesuffix('.git') |
| parts = name.split('/') |
| if options.use_repo and parts[-1] == 'manifest': |
| parts.pop(-1) |
| return f'checkout {parts[-1]}' |
| |
| def __call__( |
| self, |
| options: Options, |
| root: config_types.Path | None = None, |
| name: str = None, |
| ): |
| """Checkout code.""" |
| |
| checkout_name = name or self._name(options) |
| |
| assert options.remote |
| |
| initial_options = repr(options) |
| options.manifest_file = options.manifest_file or 'default.xml' |
| options.repo_init_timeout_sec = options.repo_init_timeout_sec or 20 |
| options.repo_sync_timeout_sec = options.repo_sync_timeout_sec or 2 * 60 |
| options.number_of_attempts = options.number_of_attempts or 3 |
| options.submodule_timeout_sec = options.submodule_timeout_sec or 10 * 60 |
| final_options = repr(options) |
| |
| ctx = CheckoutContext(api=self.m) |
| ctx.options = options |
| ctx.changes = [] |
| ctx.root = root or self.m.path.start_dir / 'co' |
| |
| for remotes in options.equivalent_remotes: |
| new_remotes = [self.m.sso.sso_to_https(x) for x in remotes.remotes] |
| for remote in new_remotes: |
| assert remote not in ctx.equivalent_remotes |
| ctx.equivalent_remotes[remote] = new_remotes |
| |
| with self.m.step.nest(checkout_name) as pres: |
| with self.m.step.nest('options') as options_pres: |
| options_pres.step_summary_text = initial_options |
| |
| with self.m.step.nest('options with defaults') as options_pres: |
| options_pres.step_summary_text = final_options |
| |
| if options.remote.endswith('.git'): |
| options.remote = options.remote[:-4] |
| |
| if options.use_trigger: |
| ctx.changes = self._change_data( |
| ctx, options.remote, options.branch |
| ) |
| |
| self._configure_insteadof(ctx) |
| |
| if options.use_repo: |
| self._repo(ctx) |
| |
| else: |
| self._git(ctx) |
| |
| self._workspace(ctx) |
| |
| if not options.use_repo: |
| with self.m.context(cwd=ctx.root): |
| self.m.git.clean(force=2, recursive=True) |
| try: |
| self.m.git.status() |
| except self.m.step.StepFailure: # pragma: no cover |
| pass |
| |
| ctx.status = self._check_unapplied_changes(ctx.changes) |
| |
| if ctx.status: |
| for change in ctx.status.applied: |
| pres.links[f'applied {change.name_with_path}'] = ( |
| change.gerrit_url |
| ) |
| |
| for change in ctx.status.not_applied: |
| pres.links[f'failed to apply {change.name}'] = ( |
| change.gerrit_url |
| ) |
| |
| snapshot_dir = self.m.path.start_dir / 'snapshot' |
| ctx.snapshot_to_dir(snapshot_dir) |
| |
| ctx.top = ctx.root |
| if ctx.options.root_subdirectory: |
| ctx.root = ctx.root / ctx.options.root_subdirectory |
| |
| return ctx |
| |
| def get_revision( |
| self, |
| root: config_types.Path, |
| name: str = 'git log', |
| test_data: str = 'HASH', |
| ): |
| """Like self.revision, but works for secondary checkouts.""" |
| with self.m.context(cwd=root): |
| step = self.m.git( |
| name, |
| 'log', |
| '--max-count=1', |
| '--pretty=format:%H', |
| stdout=self.m.raw_io.output_text(), |
| step_test_data=lambda: self.test_api.m.raw_io.stream_output_text( |
| test_data, |
| ), |
| ) |
| |
| # Sometimes test data has additional information, following a |
| # newline. Only keep the part before the first newline. |
| result = step.stdout.strip().split('\n')[0] |
| step.presentation.step_summary_text = result |
| return result |