| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Calls to checkout code. |
| |
| Usage: |
| api.checkout(remote='https://pigweed.googlesource.com/pigweed/pigweed') |
| """ |
| |
| import pprint |
| import re |
| import xml.etree.ElementTree |
| |
| import attr |
| from recipe_engine import recipe_api |
| import urlparse |
| |
| PIGWEED_REMOTE = 'https://pigweed.googlesource.com/pigweed/pigweed' |
| |
| |
| @attr.s |
| class _Manifest(object): |
| remotes = attr.ib(default=attr.Factory(dict)) |
| projects = attr.ib(default=attr.Factory(list)) |
| |
| def dict(self): |
| return { |
| 'remotes': {k: v.dict() for k, v in self.remotes.iteritems()}, |
| 'projects': [x.dict() for x in self.projects], |
| } |
| |
| |
| class _Url(object): |
| def __init__(self, url, *args, **kwargs): |
| super(_Url, self).__init__(*args, **kwargs) |
| self.url = url |
| self.https = None |
| |
| def dict(self): |
| return self.__dict__.copy() |
| |
| |
| @attr.s |
| class _Remote(object): |
| """Remote config from manifest.""" |
| |
| name = attr.ib(type=str) |
| fetch = attr.ib(type=_Url) |
| review = attr.ib(type=str, default=None) |
| revision = attr.ib(type=str, default=None) |
| |
| def dict(self): |
| res = self.__dict__.copy() |
| res['fetch'] = res['fetch'].dict() |
| return res |
| |
| |
| @attr.s |
| class _Project(object): |
| """Key variables describing a repository/project.""" |
| |
| name = attr.ib(type=str) |
| path = attr.ib(type=str) |
| remote = attr.ib(type=str) |
| revision = attr.ib(type=str) |
| upstream = attr.ib(type=str) |
| url = attr.ib(type=str, default=None) |
| |
| def path_object(self, root): |
| return root.join(*self.path.split('/')) |
| |
| def dict(self): |
| return self.__dict__.copy() |
| |
| |
| @attr.s |
| class _Change(object): |
| """Data from buildbucket.""" |
| |
| number = attr.ib(type=int) |
| bb_input = attr.ib(repr=False) |
| remote = attr.ib(type=str) |
| ref = attr.ib(type=str) |
| rebase = attr.ib(type=bool) |
| branch = attr.ib(type=str) |
| gerrit_name = attr.ib(type=str) |
| applied = attr.ib(type=bool, default=False, repr=False) |
| |
| @property |
| def gerrit_url(self): |
| return 'https://{}-review.googlesource.com/c/{}'.format( |
| self.gerrit_name, self.number |
| ) |
| |
| @property |
| def gitiles_url(self): |
| return '{}/+/{}'.format(self.remote, self.ref) |
| |
| @property |
| def name(self): |
| return '{}:{}'.format(self.gerrit_name, self.number) |
| |
| |
| @attr.s |
| class _Submodule(object): |
| """Submodule properties.""" |
| |
| api = attr.ib(type=recipe_api.RecipeApi, repr=False) |
| hash = attr.ib(type=str) |
| relative_path = attr.ib(type=str) |
| path = attr.ib(type=str) |
| describe = attr.ib(type=str) |
| remote = attr.ib(type=str) |
| |
| |
| class CheckoutApi(recipe_api.RecipeApi): |
| """Calls to checkout code.""" |
| |
| def __init__(self, props, *args, **kwargs): |
| super(CheckoutApi, self).__init__(*args, **kwargs) |
| self._remote = props.remote or PIGWEED_REMOTE |
| # TODO(pwbug/209) Change 'master' to 'main'. |
| self._branch = props.branch or 'master' |
| self._use_repo = props.use_repo |
| self._manifest_file = props.manifest_file or 'default.xml' |
| self._root = None |
| self._revision = None |
| self._repo_top = None |
| self._manifest = None |
| self._triggering_repo = None |
| self._changes = None |
| self._manifest_snapshot = None |
| self._repo_init_timeout_sec = props.repo_init_timeout_sec or 20 |
| self._repo_sync_timeout_sec = props.repo_sync_timeout_sec or 120 |
| self._number_of_attempts = props.number_of_attempts or 3 |
| |
| def _read_manifest(self, manifest_file): |
| """Reads manifest file to get git repo locations.""" |
| |
| with self.m.step.nest('read manifest') as read_step: |
| manifest_text = self.m.file.read_text('read file', manifest_file) |
| read_step.logs['raw'] = manifest_text |
| |
| xml_tree = xml.etree.ElementTree.fromstring(manifest_text) |
| |
| manifest = _Manifest() |
| |
| for remote in xml_tree.iter('remote'): |
| remote = _Remote(**remote.attrib) |
| remote.fetch = _Url(remote.fetch) |
| remote.fetch.https = self.m.sso.sso_to_https(remote.fetch.url) |
| manifest.remotes[remote.name] = remote |
| |
| defaults = {} |
| for default in xml_tree.iter('default'): |
| defaults.update(default.attrib) |
| |
| for project in xml_tree.iter('project'): |
| name = project.attrib['name'] |
| path = project.attrib.get('path', name) |
| |
| if 'remote' in project.attrib: |
| remote = project.attrib['remote'] |
| elif 'remote' in defaults: |
| remote = defaults['remote'] |
| else: # pragma: no cover |
| assert False, 'remote not specified for {}'.format(name) |
| |
| assert ( |
| remote in manifest.remotes |
| ), 'Remote {} does not exist'.format(remote) |
| |
| if 'revision' in project.attrib: |
| revision = project.attrib['revision'] |
| elif manifest.remotes[remote].revision: |
| revision = manifest.remotes[remote].revision |
| elif 'revision' in defaults: |
| revision = defaults['revision'] |
| else: # pragma: no cover |
| assert False, 'revision not specified for {}'.format(name) |
| |
| if 'upstream' in project.attrib: |
| upstream = project.attrib['upstream'] |
| elif 'upstream' in defaults: # pragma: no cover |
| # This is unlikely to be used and hard to test--it requires |
| # a completely separate manifest definition, otherwise the |
| # 'else' condition won't be covered. It's also simple. |
| upstream = defaults['upstream'] |
| else: |
| upstream = revision |
| |
| url = urlparse.urljoin( |
| manifest.remotes[remote].fetch.https, name |
| ) |
| manifest.projects.append( |
| _Project( |
| name=name, |
| path=path, |
| remote=remote, |
| revision=revision, |
| upstream=upstream, |
| url=url, |
| ) |
| ) |
| |
| self.m.file.write_json( |
| 'manifest json', |
| self.m.path['start_dir'].join('manifest.json'), |
| manifest.dict(), |
| ) |
| |
| return manifest |
| |
| def _change_data(self, remote=None, branch=None): |
| bb_input = self.m.buildbucket.build.input |
| results = [] |
| |
| with self.m.step.nest('change data'): |
| if bb_input.gerrit_changes: |
| with self.m.step.nest('process gerrit changes'): |
| for i, change in enumerate(bb_input.gerrit_changes): |
| with self.m.step.nest(str(i)): |
| assert change.host |
| ref = 'refs/changes/{:02}/{}/{}'.format( |
| change.change % 100, |
| change.change, |
| change.patchset, |
| ) |
| host = change.host.replace( |
| '-review.googlesource.com', '.googlesource.com' |
| ) |
| remote = 'https://{}/{}'.format( |
| host, change.project |
| ).strip('/') |
| gerrit_name = host.split('.')[0] |
| branch = self.m.gerrit.change_details( |
| 'details', |
| change_id=str(change.change), |
| host=change.host, |
| test_data=self.m.json.test_api.output( |
| {'branch': 'master'} |
| ), |
| ).json.output['branch'] |
| |
| results.append( |
| _Change( |
| number=change.change, |
| bb_input=bb_input, |
| remote=remote, |
| ref=ref, |
| rebase=True, |
| branch=branch, |
| gerrit_name=gerrit_name, |
| ) |
| ) |
| |
| elif bb_input.gitiles_commit.id: |
| with self.m.step.nest('process gitiles commit'): |
| commit = bb_input.gitiles_commit |
| assert commit.host |
| if commit.project: |
| remote = 'https://{}/{}'.format( |
| commit.host, commit.project |
| ) |
| |
| host = commit.host.replace( |
| '.googlesource.com', '-review.googlesource.com' |
| ) |
| gerrit_name = commit.host.split('.')[0] |
| |
| query_results = self.m.gerrit.change_query( |
| 'number', |
| dict(commit=str(commit.id)), |
| host=host, |
| ok_ret='any', |
| test_data=self.m.json.test_api.output( |
| [{'_number': '1234', 'branch': branch}] |
| ), |
| ).json.output |
| assert query_results and len(query_results) == 1 |
| number = query_results[0]['_number'] |
| branch = query_results[0]['branch'] |
| results.append( |
| _Change( |
| number=number, |
| bb_input=bb_input, |
| remote=remote, |
| ref=commit.id, |
| rebase=False, |
| branch=branch, |
| gerrit_name=gerrit_name, |
| ) |
| ) |
| |
| with self.m.step.nest('changes'): |
| for result in results: |
| with self.m.step.nest(result.name) as change_data_pres: |
| change_data_pres.step_summary_text = repr(result) |
| |
| return tuple(results) |
| |
| def _parse_submodule_status(self, root, line): |
| """Parse a `git submodule status` and get the remote URL.""" |
| match = re.search( |
| r'^(?P<hash>[0-9a-fA-F]{40})\s+' |
| r'(?P<path>[^()]*)\s+' |
| r'\((?P<describe>[^()]*)\)$', |
| line.strip(), |
| ) |
| if not match: |
| raise self.m.step.InfraFailure( |
| 'unrecognized submodule status line "{}"'.format(line) |
| ) |
| |
| with self.m.step.nest(match.group('path')) as pres: |
| pres.step_summary_text = 'hash={}\ndescribe={}'.format( |
| match.group('hash'), match.group('describe') |
| ) |
| path = root.join(*match.group('path').split('/')) |
| with self.m.context(cwd=path): |
| remote = self.m.git( |
| 'git origin {}'.format(path), |
| 'config', |
| '--get', |
| 'remote.origin.url', |
| stdout=self.m.raw_io.output(), |
| ).stdout.strip() |
| remote_https = self.m.sso.sso_to_https(remote) |
| if remote_https.endswith('.git'): |
| remote_https = remote_https[0:-4] |
| |
| return _Submodule( |
| api=self.m, |
| hash=match.group('hash'), |
| relative_path=match.group('path'), |
| path=path, |
| describe=match.group('describe'), |
| remote=remote_https, |
| ) |
| |
| def _matching_branches(self, repo, branches, name='has branch', **kwargs): |
| """Returns the subset of the given branches that exist on gitiles.""" |
| matches = set() |
| with self.m.step.nest(name), self.m.context(infra_steps=True): |
| for branch in branches: |
| # Gitiles returns an empty dictionary when the branch does not |
| # exist, so a simple bool conversion is all we need. |
| if self.m.gitiles.refs( |
| repo, 'refs/heads/' + branch, step_name=branch, **kwargs |
| ): |
| matches.add(branch) |
| return matches |
| |
| def _apply_change( |
| self, change, cwd=None, extra_calls=None, remote='origin' |
| ): |
| """Applies the given change to the given directory. |
| |
| Args: |
| change (_Change): Change to apply. |
| cwd (Path): Working directory, defaults to current directory. |
| extra_calls (callable): Additional steps to run within the nested |
| 'apply ...' step and, if specified, within directory cwd. |
| remote (str): Name of the remote as configured in this repository. |
| """ |
| if cwd: |
| with self.m.context(cwd=cwd): |
| return self._apply_change( |
| change=change, extra_calls=extra_calls, remote=remote |
| ) |
| |
| change.applied = True |
| with self.m.step.nest('apply {}'.format(change.name)) as pres: |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| |
| with self.m.context(infra_steps=True): |
| self.m.git('git fetch', 'fetch', change.remote, change.ref) |
| self.m.git( |
| 'git checkout patch', |
| 'checkout', |
| '--recurse-submodules', |
| '-b', |
| 'working', |
| 'FETCH_HEAD', |
| ) |
| if change.rebase: |
| with self.m.context(infra_steps=True): |
| self.m.git( |
| 'pre-rebase log', 'log', '--oneline', '-n', '10', |
| ) |
| self.m.git('git fetch', 'fetch', remote, change.branch) |
| self.m.git( |
| 'git rebase', |
| 'rebase', |
| '{}/{}'.format(remote, change.branch), |
| ) |
| |
| # In most cases this is redundant or unnecessary, but it shouldn't |
| # cause problems. It's necessary when a superproject CL is updating |
| # a submodule pin and we need to sync the submodule to the new |
| # revision. |
| self.m.git('git submodule', 'submodule', 'update') |
| |
| # TODO(pwbug/233) Make this function a context manager so callers |
| # can do the following: |
| # with self._apply_change(...): |
| # extra_calls() |
| if extra_calls: |
| extra_calls() |
| |
| def _check_unapplied_changes(self, changes): |
| if not changes: |
| return # pragma: no cover |
| |
| def display_unapplied_change(change): |
| with self.m.step.nest( |
| 'failed to apply {}'.format(change.name) |
| ) as pres: |
| pres.status = 'WARNING' |
| pres.links['gerrit'] = change.gerrit_url |
| pres.links['gitiles'] = change.gitiles_url |
| |
| with self.m.context(infra_steps=True): |
| if all(not x.applied for x in changes): |
| with self.m.step.nest('no changes were applied') as pres: |
| pres.status = 'FAILURE' |
| for change in changes: |
| if not change.applied: |
| display_unapplied_change(change) |
| pres.properties['changes'] = [x.name for x in changes] |
| |
| raise self.m.step.InfraFailure( |
| 'could not find triggering changes in checkout' |
| ) |
| |
| if any(not x.applied for x in changes): |
| with self.m.step.nest('some changes were not applied') as pres: |
| pres.status = 'WARNING' |
| for change in changes: |
| if not change.applied: |
| display_unapplied_change(change) |
| |
| def _git(self, remote, branch, use_trigger, root, changes): |
| """Checkout code from git. |
| |
| Args: |
| remote (str): URL of git repository. |
| branch (str): Remote branch to retrieve. |
| use_trigger (bool): Attempt to apply the triggering change to the |
| checkout. |
| root (Path): Path to checkout into. |
| changes (sequence[_Change]): List of triggering changes. |
| """ |
| |
| with self.m.context(infra_steps=True): |
| self.m.git.checkout( |
| remote, path=root, ref=branch, recursive=True, submodules=True |
| ) |
| |
| submodules = [] |
| |
| with self.m.context(cwd=root): |
| if use_trigger: |
| # Check for CLs for the top-level repository. |
| for change in changes: |
| if remote == change.remote: |
| self._apply_change(change) |
| |
| # Recursively look at all submodule paths (submodules can have |
| # submodules) and find among other things the remote URL. |
| submodule_status_lines = self.m.git( |
| 'git submodule status', |
| 'submodule', |
| 'status', |
| '--recursive', |
| stdout=self.m.raw_io.output(), |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output( |
| '' |
| ), |
| ).stdout.splitlines() |
| if submodule_status_lines: |
| with self.m.step.nest('parse_submodules'): |
| for line in submodule_status_lines: |
| submodules.append( |
| self._parse_submodule_status(root, line) |
| ) |
| |
| # Check for CLs for submodules. |
| for change in changes: |
| for submodule in submodules: |
| if submodule.remote == change.remote: |
| self._apply_change(change, cwd=submodule.path) |
| |
| self._check_unapplied_changes(changes) |
| |
| # Run git log for both the top-level checkout and every submodule. |
| with self.m.step.nest('git log'): |
| self.m.git(str(root), 'log', '--oneline', '-n', '10') |
| for submodule in submodules: |
| with self.m.context(cwd=submodule.path): |
| self.m.git( |
| str(submodule.path), 'log', '--oneline', '-n', '10', |
| ) |
| |
| def _repo(self, remote, branch, manifest_file, use_trigger, root, changes): |
| """Checkout code from an Android Repo Tool manifest. |
| |
| Args: |
| remote (str): URL of git repository. |
| branch (str): Remote branch to retrieve. |
| manifest_file (str): Name of manifest XML file. |
| use_trigger (bool): Attempt to apply the triggering change to the |
| checkout. |
| root (Path): Path to checkout into. |
| changes (sequence[_Change]): List of triggering changes. |
| """ |
| |
| # Git makes the top-level folder, Repo requires caller to make it. |
| self.m.file.ensure_directory('mkdir checkout', root) |
| |
| if manifest_file is None: |
| manifest_file = self._manifest_file |
| |
| with self.m.context(cwd=root): |
| remote = remote.rstrip('/') |
| |
| # If the triggering change is on a branch name that is also present |
| # in the manifest branch, use that manifest branch when checking |
| # out the manifest. |
| manifest_branch = branch |
| branch_names = set( |
| x.branch |
| for x in changes |
| if x.branch not in ('master', 'main', None) |
| ) |
| if branch_names: |
| with self.m.step.nest('branch names') as pres: |
| pres.step_summary_text = str(branch_names) |
| if changes: |
| matching_branches = self._matching_branches( |
| remote, branch_names, name='manifest has branch' |
| ) |
| if matching_branches: |
| if len(matching_branches) > 1: |
| with self.m.step.nest( |
| 'too many matching branches ({})'.format( |
| ', '.join(matching_branches) |
| ) |
| ) as pres: |
| pres.step_summary_text = ( |
| "Can't figure out which manifest branch to " |
| 'use. Remove some "Cq-Depends:" lines to ' |
| 'simplify the checkout.' |
| ) |
| raise self.m.step.StepFailure( |
| 'too many matching branches' |
| ) |
| else: |
| manifest_branch = matching_branches.pop() |
| self.m.step( |
| 'changing manifest branch to {}'.format( |
| manifest_branch |
| ), |
| None, |
| ) |
| |
| with self.m.context(infra_steps=True): |
| kwargs = {} |
| if self._repo_init_timeout_sec: |
| kwargs['timeout'] = self._repo_init_timeout_sec |
| kwargs['attempts'] = self._number_of_attempts |
| self.m.repo.init( |
| manifest_url=remote, |
| manifest_branch=manifest_branch, |
| **kwargs |
| ) |
| |
| manifests_dir = root.join('.repo', 'manifests') |
| # If the triggering CL is a manifest change, apply it before running |
| # sync. |
| if use_trigger: |
| for change in changes: |
| if change.remote and remote == change.remote: |
| |
| def update_upstream(): |
| # Right now the upstream of 'working' is the local |
| # 'default' branch. 'repo sync' complains if the |
| # upstream isn't remote, so it's changed to the |
| # remote branch that's identical to 'default'. |
| self.m.git( |
| 'git branch', |
| 'branch', |
| '--set-upstream-to=origin/{}'.format(branch), |
| ) |
| |
| self._apply_change( |
| change, |
| cwd=manifests_dir, |
| extra_calls=update_upstream, |
| ) |
| |
| self._manifest = self._read_manifest( |
| manifests_dir.join(manifest_file) |
| ) |
| |
| for _, remote_host in sorted(self._manifest.remotes.iteritems()): |
| if remote_host.fetch.url.startswith('sso://'): |
| self.m.sso.configure_insteadof(remote_host.fetch.url) |
| |
| with self.m.context(infra_steps=True): |
| kwargs = {} |
| if self._repo_sync_timeout_sec: |
| kwargs['timeout'] = self._repo_sync_timeout_sec |
| kwargs['attempts'] = self._number_of_attempts |
| self.m.repo.sync( |
| force_sync=True, current_branch=True, jobs=20, **kwargs |
| ) |
| self.m.repo.start('base') |
| |
| if use_trigger: |
| for change in changes: |
| for entry in self._manifest.projects: |
| if entry.url == change.remote: |
| |
| def compare_branch_name(): |
| with self.m.step.nest( |
| 'compare branch name' |
| ) as pres: |
| pres.step_summary_text = ( |
| 'CL branch: {}\nupstream branch: {}' |
| ).format(change.branch, entry.upstream) |
| |
| self._apply_change( |
| change, |
| cwd=entry.path_object(root), |
| extra_calls=compare_branch_name, |
| remote=entry.remote, |
| ) |
| |
| self._check_unapplied_changes(changes) |
| |
| self._manifest_snapshot = self.m.repo.manifest_snapshot() |
| |
| # Some dependent projects have everything inside one top-level folder |
| # in their repo workspace. For those projects pretend that top-level |
| # folder is actually the checkout root. The repo_top member will always |
| # point to the actual repo workspace root. |
| with self.m.step.nest('root') as pres: |
| pres.step_summary_text = 'root={}\nself._root={}\n'.format( |
| root, self._root |
| ) |
| if root == self._root: |
| self._repo_top = self._root |
| files = set(self.m.file.listdir('ls', root)) |
| dotrepo = self._root.join('.repo') |
| if dotrepo in files: |
| files.remove(dotrepo) |
| if len(files) == 1: |
| self._root = files.pop() |
| |
| def __call__( |
| self, |
| remote=None, |
| branch=None, |
| name=None, |
| use_trigger=True, |
| root=None, |
| use_repo=None, |
| manifest_file=None, |
| ): |
| """Checkout code. |
| |
| Grabs data from buildbucket. If drawing a blank, uses remote. Returns |
| path to checkout. |
| |
| Args: |
| remote (str): URL of git repository. |
| branch (str): Remote branch to retrieve. |
| name (str|None): If not None, this is used in the nesting step that |
| wraps all steps invoked by this method. |
| use_trigger (bool): Attempt to apply the triggering change to the |
| checkout. |
| root (Path|None): If not None, checkout into this path and not |
| self.root. |
| use_repo (bool|None): If True, treat the remote/branch as an Android |
| Repo Tool manifest. If False, treat as a regular Git repository. |
| If None, use the property value instead. |
| manifest_file (str|None): Path to manifest file, defaults to |
| 'default.xml'. |
| """ |
| |
| if use_repo is None: |
| use_repo = self._use_repo |
| |
| # The caller supplying the branch and not the remote is a weird |
| # situation that's probably an error. Only grab from properties if |
| # caller supplied neither. |
| if remote is None and branch is None: |
| remote = self._remote |
| branch = self._branch |
| assert remote |
| branch = branch or 'master' |
| |
| # Turn "https://foo/bar/baz.git" into "baz". |
| if name is None: |
| name = remote.rstrip('/') |
| if name.endswith('.git'): |
| name = name[0:-4] |
| parts = name.split('/') |
| if use_repo and parts[-1] == 'manifest': |
| parts.pop(-1) |
| name = parts[-1] |
| |
| if remote.endswith('.git'): |
| remote = remote[0:-4] |
| |
| with self.m.step.nest('checkout {}'.format(name)): |
| changes = self._change_data(remote, branch) |
| |
| if root is None: |
| root = self._root = self.m.path['start_dir'].join('checkout') |
| self._changes = changes |
| |
| if use_repo: |
| self._repo( |
| remote=remote, |
| branch=branch, |
| root=root, |
| use_trigger=use_trigger, |
| manifest_file=manifest_file, |
| changes=changes, |
| ) |
| else: |
| self._git( |
| remote=remote, |
| branch=branch, |
| root=root, |
| use_trigger=use_trigger, |
| changes=changes, |
| ) |
| |
| @property |
| def root(self): |
| """Returns the logical top level directory of the checkout. |
| |
| Returns: |
| For Git checkouts, returns the top-level directory. For Android Repo |
| Tool checkouts, returns the top-level directory unless there is |
| exactly one subdirectory of that top-level directory (except for |
| .repo). In that case it returns that one subdirectory. |
| """ |
| return self._root |
| |
| @property |
| def repo_top(self): |
| """Always returns the directory containing the .repo folder.""" |
| return self._repo_top |
| |
| @property |
| def manifest(self): |
| return self._manifest |
| |
| @property |
| def manifest_snapshot(self): |
| return self._manifest_snapshot |
| |
| @property |
| def remote(self): |
| return self._remote |
| |
| @property |
| def branch(self): |
| return self._branch |
| |
| @property |
| def manifest_file(self): |
| return self._manifest_file |
| |
| @property |
| def use_repo(self): |
| return self._use_repo |
| |
| @property |
| def revision(self): |
| """Returns revision of the primary checkout directory.""" |
| assert self._root, 'checkout() not yet called' |
| if self._revision: |
| return self._revision |
| |
| self._revision = self.get_revision(self._root) |
| return self._revision |
| |
| @property |
| def changes(self): |
| """Returns the changes that triggered the build. |
| |
| For CI builds this is a list with at most one item, the merged commit |
| that triggered the build. For CQ builds this could contain multiple |
| pending CLs. |
| """ |
| assert self._root, 'checkout() not yet called' |
| return self._changes |
| |
| def get_revision(self, root): |
| """Like self.revision, but works for secondary checkouts.""" |
| with self.m.context(cwd=root): |
| return self.m.git( |
| 'git log', |
| 'log', |
| '--max-count=1', |
| '--pretty=format:%H', |
| stdout=self.m.raw_io.output_text(), |
| step_test_data=lambda: self.test_api.m.raw_io.stream_output( |
| 'HASH' |
| ), |
| ).stdout.strip() |
| |
| # gerrit_host and gerrit_project aren't really properties of checkout, but |
| # they make some sense here and don't make much sense anywhere else. |
| _REMOTE_REGEX = re.compile(r'^https://(?P<host>[^/]+)/(?P<project>.+)$') |
| |
| def gerrit_host(self, remote=None): |
| match = self._REMOTE_REGEX.match(remote or self.remote) |
| if not match: |
| return |
| |
| gerrit_review_host = '{}'.format(match.group('host')) |
| if '-review' not in gerrit_review_host: |
| gerrit_review_host = gerrit_review_host.replace('.', '-review.', 1) |
| return gerrit_review_host |
| |
| def gerrit_project(self, remote=None): |
| match = self._REMOTE_REGEX.match(remote or self.remote) |
| if not match: |
| return |
| |
| return match.group('project') |