blob: 2ce15a569cf4a987319235f29f85a7f9f15213e6 [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Helpful commands for working with a Git repository."""
import logging
from pathlib import Path
import subprocess
from typing import Collection, Iterable, Iterator, List, NamedTuple, Optional
from typing import Pattern, Set, Tuple, Union
from pw_presubmit.tools import log_run, plural
_LOG = logging.getLogger(__name__)
PathOrStr = Union[Path, str]
TRACKING_BRANCH_ALIAS = '@{upstream}'
_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}'
def git_stdout(*args: PathOrStr,
show_stderr=False,
repo: PathOrStr = '.') -> str:
return log_run(['git', '-C', repo, *args],
stdout=subprocess.PIPE,
stderr=None if show_stderr else subprocess.DEVNULL,
check=True).stdout.decode().strip()
def _ls_files(args: Collection[PathOrStr], repo: Path) -> Iterable[Path]:
"""Returns results of git ls-files as absolute paths."""
git_root = repo.resolve()
for file in git_stdout('ls-files', '--', *args, repo=repo).splitlines():
full_path = git_root / file
# Modified submodules will show up as directories and should be ignored.
if full_path.is_file():
yield full_path
def _diff_names(commit: str, pathspecs: Collection[PathOrStr],
repo: Path) -> Iterable[Path]:
"""Returns absolute paths of files changed since the specified commit."""
git_root = root(repo)
for file in git_stdout('diff',
'--name-only',
'--diff-filter=d',
commit,
'--',
*pathspecs,
repo=repo).splitlines():
full_path = git_root / file
# Modified submodules will show up as directories and should be ignored.
if full_path.is_file():
yield full_path
def tracking_branch(repo_path: Path = None) -> Optional[str]:
"""Returns the tracking branch of the current branch.
Since most callers of this function can safely handle a return value of
None, suppress exceptions and return None if there is no tracking branch.
Args:
repo_path: repo path from which to run commands; defaults to Path.cwd()
Raises:
ValueError: if repo_path is not in a Git repository
Returns:
the remote tracking branch name or None if there is none
"""
if repo_path is None:
repo_path = Path.cwd()
if not is_repo(repo_path or Path.cwd()):
raise ValueError(f'{repo_path} is not within a Git repository')
# This command should only error out if there's no upstream branch set.
try:
return git_stdout('rev-parse',
'--abbrev-ref',
'--symbolic-full-name',
TRACKING_BRANCH_ALIAS,
repo=repo_path)
except subprocess.CalledProcessError:
return None
def list_files(commit: Optional[str] = None,
pathspecs: Collection[PathOrStr] = (),
repo_path: Optional[Path] = None) -> List[Path]:
"""Lists files with git ls-files or git diff --name-only.
Args:
commit: commit to use as a base for git diff
pathspecs: Git pathspecs to use in git ls-files or diff
repo_path: repo path from which to run commands; defaults to Path.cwd()
Returns:
A sorted list of absolute paths
"""
if repo_path is None:
repo_path = Path.cwd()
if commit in _TRACKING_BRANCH_ALIASES:
commit = tracking_branch(repo_path)
if commit:
try:
return sorted(_diff_names(commit, pathspecs, repo_path))
except subprocess.CalledProcessError:
_LOG.warning(
'Error comparing with base revision %s of %s, listing all '
'files instead of just changed files', commit, repo_path)
return sorted(_ls_files(pathspecs, repo_path))
def has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
"""Returns True if the Git repo has uncommitted changes in it.
This does not check for untracked files.
"""
if repo is None:
repo = Path.cwd()
# Refresh the Git index so that the diff-index command will be accurate.
# The `git update-index` command isn't reliable when run in parallel with
# other processes that may touch files in the repo directory, so retry a
# few times before giving up. The hallmark of this failure mode is the lack
# of an error message on stderr, so if we see something there we can assume
# it's some other issue and raise.
retries = 6
for i in range(retries):
try:
log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'],
capture_output=True,
check=True)
except subprocess.CalledProcessError as err:
if err.stderr or i == retries - 1:
raise
continue
# diff-index exits with 1 if there are uncommitted changes.
return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
'--']).returncode == 1
def _describe_constraints(git_root: Path, repo_path: Path,
commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern[str]]) -> Iterable[str]:
if not git_root.samefile(repo_path):
yield (
f'under the {repo_path.resolve().relative_to(git_root.resolve())} '
'subdirectory')
if commit in _TRACKING_BRANCH_ALIASES:
commit = tracking_branch(git_root)
if commit is None:
_LOG.warning(
'Attempted to list files changed since the remote tracking '
'branch, but the repo is not tracking a branch')
if commit:
yield f'that have changed since {commit}'
if pathspecs:
paths_str = ', '.join(str(p) for p in pathspecs)
yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
if exclude:
yield (f'that do not match {plural(exclude, "pattern")} (' +
', '.join(p.pattern for p in exclude) + ')')
def describe_files(git_root: Path,
repo_path: Path,
commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern],
project_root: Path = None) -> str:
"""Completes 'Doing something to ...' for a set of files in a Git repo."""
constraints = list(
_describe_constraints(git_root, repo_path, commit, pathspecs, exclude))
name = git_root.name
if project_root and project_root != git_root:
name = str(git_root.relative_to(project_root))
if not constraints:
return f'all files in the {name} repo'
msg = f'files in the {name} repo'
if len(constraints) == 1:
return f'{msg} {constraints[0]}'
return msg + ''.join(f'\n - {line}' for line in constraints)
def root(repo_path: PathOrStr = '.', *, show_stderr: bool = True) -> Path:
"""Returns the repository root as an absolute path.
Raises:
FileNotFoundError: the path does not exist
subprocess.CalledProcessError: the path is not in a Git repo
"""
repo_path = Path(repo_path)
if not repo_path.exists():
raise FileNotFoundError(f'{repo_path} does not exist')
return Path(
git_stdout('rev-parse',
'--show-toplevel',
repo=repo_path if repo_path.is_dir() else repo_path.parent,
show_stderr=show_stderr))
def within_repo(repo_path: PathOrStr = '.') -> Optional[Path]:
"""Similar to root(repo_path), returns None if the path is not in a repo."""
try:
return root(repo_path, show_stderr=False)
except subprocess.CalledProcessError:
return None
def is_repo(repo_path: PathOrStr = '.') -> bool:
"""True if the path is tracked by a Git repo."""
return within_repo(repo_path) is not None
def path(repo_path: PathOrStr,
*additional_repo_paths: PathOrStr,
repo: PathOrStr = '.') -> Path:
"""Returns a path relative to a Git repository's root."""
return root(repo).joinpath(repo_path, *additional_repo_paths)
class PythonPackage(NamedTuple):
root: Path # Path to the file containing the setup.py
package: Path # Path to the main package directory
packaged_files: Tuple[Path, ...] # All sources in the main package dir
other_files: Tuple[Path, ...] # Other Python files under root
def all_files(self) -> Tuple[Path, ...]:
return self.packaged_files + self.other_files
def all_python_packages(repo: PathOrStr = '.') -> Iterator[PythonPackage]:
"""Finds all Python packages in the repo based on setup.py locations."""
root_py_dirs = [
file.parent
for file in _ls_files(['setup.py', '*/setup.py'], Path(repo))
]
for py_dir in root_py_dirs:
all_packaged_files = _ls_files([py_dir / '*' / '*.py'], repo=py_dir)
common_dir: Optional[str] = None
# Make there is only one package directory with Python files in it.
for file in all_packaged_files:
package_dir = file.relative_to(py_dir).parts[0]
if common_dir is None:
common_dir = package_dir
elif common_dir != package_dir:
_LOG.warning(
'There are multiple Python package directories in %s: %s '
'and %s. This is not supported by pw presubmit. Each '
'setup.py should correspond with a single Python package',
py_dir, common_dir, package_dir)
break
if common_dir is not None:
packaged_files = tuple(_ls_files(['*/*.py'], repo=py_dir))
other_files = tuple(
f for f in _ls_files(['*.py'], repo=py_dir)
if f.name != 'setup.py' and f not in packaged_files)
yield PythonPackage(py_dir, py_dir / common_dir, packaged_files,
other_files)
def python_packages_containing(
python_paths: Iterable[Path],
repo: PathOrStr = '.') -> Tuple[List[PythonPackage], List[Path]]:
"""Finds all Python packages containing the provided Python paths.
Returns:
([packages], [files_not_in_packages])
"""
all_packages = list(all_python_packages(repo))
packages: Set[PythonPackage] = set()
files_not_in_packages: List[Path] = []
for python_path in python_paths:
for package in all_packages:
if package.root in python_path.parents:
packages.add(package)
break
else:
files_not_in_packages.append(python_path)
return list(packages), files_not_in_packages
def commit_message(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
return git_stdout('log', '--format=%B', '-n1', commit, repo=repo)
def commit_author(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
return git_stdout('log', '--format=%ae', '-n1', commit, repo=repo)
def commit_hash(rev: str = 'HEAD',
short: bool = True,
repo: PathOrStr = '.') -> str:
"""Returns the commit hash of the revision."""
args = ['rev-parse']
if short:
args += ['--short']
args += [rev]
return git_stdout(*args, repo=repo)