blob: 59b1c701311d8234ef2fc267a680a6dc2ef7be49 [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Helpful commands for working with a Git repository."""
import logging
from pathlib import Path
import subprocess
from typing import Collection, Iterable, Iterator, List, NamedTuple, Optional
from typing import Pattern, Set, Tuple, Union
from import log_run, plural
_LOG = logging.getLogger(__name__)
PathOrStr = Union[Path, str]
PatternOrStr = Union[Pattern, str]
def git_stdout(*args: PathOrStr,
repo: PathOrStr = '.') -> str:
return log_run(['git', '-C', str(repo), *args],
stderr=None if show_stderr else subprocess.DEVNULL,
def _ls_files(args: Collection[PathOrStr], repo: Path) -> Iterable[Path]:
"""Returns results of git ls-files as absolute paths."""
git_root = repo.resolve()
for file in git_stdout('ls-files', '--', *args, repo=repo).splitlines():
full_path = git_root / file
# Modified submodules will show up as directories and should be ignored.
if full_path.is_file():
yield full_path
def _diff_names(commit: str, pathspecs: Collection[PathOrStr],
repo: Path) -> Iterable[Path]:
"""Returns absolute paths of files changed since the specified commit."""
git_root = root(repo)
for file in git_stdout('diff',
full_path = git_root / file
# Modified submodules will show up as directories and should be ignored.
if full_path.is_file():
yield full_path
def tracking_branch(repo_path: Path = None) -> Optional[str]:
"""Returns the tracking branch of the current branch.
Since most callers of this function can safely handle a return value of
None, suppress exceptions and return None if there is no tracking branch.
repo_path: repo path from which to run commands; defaults to Path.cwd()
ValueError: if repo_path is not in a Git repository
the remote tracking branch name or None if there is none
if repo_path is None:
repo_path = Path.cwd()
if not is_repo(repo_path or Path.cwd()):
raise ValueError(f'{repo_path} is not within a Git repository')
# This command should only error out if there's no upstream branch set.
return git_stdout('rev-parse',
except subprocess.CalledProcessError:
return None
def list_files(commit: Optional[str] = None,
pathspecs: Collection[PathOrStr] = (),
repo_path: Optional[Path] = None) -> List[Path]:
"""Lists files with git ls-files or git diff --name-only.
commit: commit to use as a base for git diff
pathspecs: Git pathspecs to use in git ls-files or diff
repo_path: repo path from which to run commands; defaults to Path.cwd()
A sorted list of absolute paths
if repo_path is None:
repo_path = Path.cwd()
commit = tracking_branch(repo_path)
if commit:
return sorted(_diff_names(commit, pathspecs, repo_path))
except subprocess.CalledProcessError:
'Error comparing with base revision %s of %s, listing all '
'files instead of just changed files', commit, repo_path)
return sorted(_ls_files(pathspecs, repo_path))
def has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
"""Returns True if the Git repo has uncommitted changes in it.
This does not check for untracked files.
if repo is None:
repo = Path.cwd()
# Refresh the Git index so that the diff-index command will be accurate.
# The `git update-index` command isn't reliable when run in parallel with
# other processes that may touch files in the repo directory, so retry a
# few times before giving up. The hallmark of this failure mode is the lack
# of an error message on stderr, so if we see something there we can assume
# it's some other issue and raise.
retries = 6
for i in range(retries):
log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'],
except subprocess.CalledProcessError as err:
if err.stderr or i == retries - 1:
# diff-index exits with 1 if there are uncommitted changes.
return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
'--']).returncode == 1
def _describe_constraints(git_root: Path, repo_path: Path,
commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern[str]]) -> Iterable[str]:
if not git_root.samefile(repo_path):
yield (
f'under the {repo_path.resolve().relative_to(git_root.resolve())} '
commit = tracking_branch(git_root)
if commit is None:
'Attempted to list files changed since the remote tracking '
'branch, but the repo is not tracking a branch')
if commit:
yield f'that have changed since {commit}'
if pathspecs:
paths_str = ', '.join(str(p) for p in pathspecs)
yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
if exclude:
yield (f'that do not match {plural(exclude, "pattern")} (' +
', '.join(p.pattern for p in exclude) + ')')
def describe_files(git_root: Path,
repo_path: Path,
commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern],
project_root: Path = None) -> str:
"""Completes 'Doing something to ...' for a set of files in a Git repo."""
constraints = list(
_describe_constraints(git_root, repo_path, commit, pathspecs, exclude))
name =
if project_root and project_root != git_root:
name = str(git_root.relative_to(project_root))
if not constraints:
return f'all files in the {name} repo'
msg = f'files in the {name} repo'
if len(constraints) == 1:
return f'{msg} {constraints[0]}'
return msg + ''.join(f'\n - {line}' for line in constraints)
def root(repo_path: PathOrStr = '.', *, show_stderr: bool = True) -> Path:
"""Returns the repository root as an absolute path.
FileNotFoundError: the path does not exist
subprocess.CalledProcessError: the path is not in a Git repo
repo_path = Path(repo_path)
if not repo_path.exists():
raise FileNotFoundError(f'{repo_path} does not exist')
return Path(
repo=repo_path if repo_path.is_dir() else repo_path.parent,
def within_repo(repo_path: PathOrStr = '.') -> Optional[Path]:
"""Similar to root(repo_path), returns None if the path is not in a repo."""
return root(repo_path, show_stderr=False)
except subprocess.CalledProcessError:
return None
def is_repo(repo_path: PathOrStr = '.') -> bool:
"""True if the path is tracked by a Git repo."""
return within_repo(repo_path) is not None
def path(repo_path: PathOrStr,
*additional_repo_paths: PathOrStr,
repo: PathOrStr = '.') -> Path:
"""Returns a path relative to a Git repository's root."""
return root(repo).joinpath(repo_path, *additional_repo_paths)
class PythonPackage(NamedTuple):
root: Path # Path to the file containing the
package: Path # Path to the main package directory
packaged_files: Tuple[Path, ...] # All sources in the main package dir
other_files: Tuple[Path, ...] # Other Python files under root
def all_files(self) -> Tuple[Path, ...]:
return self.packaged_files + self.other_files
def all_python_packages(repo: PathOrStr = '.') -> Iterator[PythonPackage]:
"""Finds all Python packages in the repo based on locations."""
root_py_dirs = [
for file in _ls_files(['', '*/'], Path(repo))
for py_dir in root_py_dirs:
all_packaged_files = _ls_files([py_dir / '*' / '*.py'], repo=py_dir)
common_dir: Optional[str] = None
# Make there is only one package directory with Python files in it.
for file in all_packaged_files:
package_dir = file.relative_to(py_dir).parts[0]
if common_dir is None:
common_dir = package_dir
elif common_dir != package_dir:
'There are multiple Python package directories in %s: %s '
'and %s. This is not supported by pw presubmit. Each '
' should correspond with a single Python package',
py_dir, common_dir, package_dir)
if common_dir is not None:
packaged_files = tuple(_ls_files(['*/*.py'], repo=py_dir))
other_files = tuple(
f for f in _ls_files(['*.py'], repo=py_dir)
if != '' and f not in packaged_files)
yield PythonPackage(py_dir, py_dir / common_dir, packaged_files,
def python_packages_containing(
python_paths: Iterable[Path],
repo: PathOrStr = '.') -> Tuple[List[PythonPackage], List[Path]]:
"""Finds all Python packages containing the provided Python paths.
([packages], [files_not_in_packages])
all_packages = list(all_python_packages(repo))
packages: Set[PythonPackage] = set()
files_not_in_packages: List[Path] = []
for python_path in python_paths:
for package in all_packages:
if package.root in python_path.parents:
return list(packages), files_not_in_packages
def commit_message(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
return git_stdout('log', '--format=%B', '-n1', commit, repo=repo)
def commit_author(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
return git_stdout('log', '--format=%ae', '-n1', commit, repo=repo)
def commit_hash(rev: str = 'HEAD',
short: bool = True,
repo: PathOrStr = '.') -> str:
"""Returns the commit hash of the revision."""
args = ['rev-parse']
if short:
args += ['--short']
args += [rev]
return git_stdout(*args, repo=repo)
def discover_submodules(
superproject_dir: Path, excluded_paths: Collection[PatternOrStr] = ()
) -> List[Path]:
"""Query git and return a list of submodules in the current project.
superproject_dir: Path object to directory under which we are looking
for submodules. This will also be included in list
returned unless excluded.
excluded_paths: Pattern or string that match submodules that should not
be returned. All matches are done on posix style paths.
List of "Path"s which were found but not excluded, this includes
superproject_dir unless excluded.
discovery_report = git_stdout('submodule',
'echo $sm_path',
module_dirs = [Path(line) for line in discovery_report.split()]
# The superproject is omitted in the prior scan.
for exclude in excluded_paths:
if isinstance(exclude, Pattern):
for module_dir in reversed(module_dirs):
if exclude.fullmatch(module_dir.as_posix()):
for module_dir in reversed(module_dirs):
if exclude == module_dir.as_posix():
return module_dirs