blob: fcab208c597cfa9d9327c2ba204b9ee68b6debef [file] [edit]
#!/usr/bin/env python3
# Copyright 2026 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Mirror the BCR to a local repository.
!!! WARNING !!!
THIS SCRIPT WILL NOT WORK UNTIL THE FOLLOWING BAZEL ISSUE IS FIXED:
https://github.com/bazelbuild/bazel/issues/22857
The above issue prevents `git_repository` Bazel registry types from having
`patches` or `overlay` entries, which means that many Bazel modules are
effectively broken.
!!! WARNING !!!
This script scans the BCR (a local checkout, or the latest HEAD) and creates
equivalent git_repository mirrors. This script assumes that you have already
mirrored required Bazel module dependencies to your own host in a way such that
they can be referenced as:
[arbitrary prefix]/[GitHub org or owner]/[GitHub repository]
Example usage
=============
Create a new mirror
-------------------
Create mirrors for the modules used by ~/projects/pigweed
$ ./tools/mirror_bazel_registry.py \\
--source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\
--out=. \\
--config=./config.json \\
create ~/projects/pigweed \\
--mirror-prefix=https://pigweed.googlesource.com/third_party/github
Update an existing mirror
-------------------------
$ ./tools/mirror_bazel_registry.py \\
--source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\
--out=. \\
--config=./config.json \\
update
Add/allowlist a new module
--------------------------
$ ./tools/mirror_bazel_registry.py \\
--source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\
--out=. \\
--config=./config.json \\
add rules_fuzzing
Config format overview
======================
* mirror_prefix: The main git host to use when transforming module sources
of truth.
* modules: Allowlisted modules. The key is the module name, value is a
dictionary of options.
* manual_versions: A list of versions that are hand-crafted and should be
largely ignored during integrity checks. This is particularly useful for
Bazel modules that have nonstandard source URLs.
* banned_versions: Similar to yanked versions, prohibits specific releases
from being mirrored. This will cause version resolution issues if any
project using this registry references these versions.
Example config
--------------
{
"mirror_prefix": "https://pigweed.googlesource.com/third_party/github",
"modules": [
"nanopb": {
"manual_versions": [
"0.3.9.10",
"0.4.9"
]
},
"pico-sdk": {},
"protobuf": {},
"re2": {
"banned_versions": [
"2025-06-26b"
]
}
]
}
"""
import argparse
import logging
from pathlib import Path
import re
import sys
import tempfile
import json
import subprocess
import shutil
if sys.version_info < (3, 10):
print(
'Python 3.10 or greater is required to run this script.',
file=sys.stderr,
)
print(
'You are using Python {}'.format(sys.version),
file=sys.stderr,
)
sys.exit(1)
# NOTE: This is used in CI in an isolated environment. Do NOT introduce
# dependencies (whether 1p or 3p) to this script.
_LOG = logging.getLogger(__name__)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'--source',
help=(
'The source registry that declares original Bazel module sources '
'of truth'
),
)
parser.add_argument(
'--config',
type=Path,
required=True,
help='The configuration file used to guide this.',
)
parser.add_argument(
'--out',
type=Path,
required=True,
help='The destination directory to write the mirrored registry to.',
)
subparsers = parser.add_subparsers(required=True)
update_cmd_parser = subparsers.add_parser(
name='update',
help=(
'Scan a Bazel registry for new versions, and add them to a mirror.'
),
)
update_cmd_parser.set_defaults(func=update)
create_cmd_parser = subparsers.add_parser(
name='create',
help='Interactively creates a new mirror to the specified directory.',
)
create_cmd_parser.set_defaults(func=create)
create_cmd_parser.add_argument(
'project',
type=Path,
help=(
'Path to the project to create a mirror for. This magically '
'identifies the module dependencies and then adds entries for each '
'identified module.',
),
)
create_cmd_parser.add_argument(
'--mirror-prefix',
help=(
'The default prefix to use when remapping Bazel module git '
'repositories',
),
)
add_cmd_parser = subparsers.add_parser(
name='add',
help='Add a new bazel module to an existing project.',
)
add_cmd_parser.set_defaults(func=add)
add_cmd_parser.add_argument(
'module_name',
type=str,
help='The name of the Bazel module to add.',
)
if len(sys.argv) == 1:
return parser.parse_args(['--help'])
return parser.parse_args()
def _fetch_remote_registry(tempdir: Path, source: str) -> Path:
"""Fetches a remote git repository."""
_LOG.debug('Cloning %s...', source)
dest = tempdir / 'bcr'
try:
subprocess.run(
['git', 'clone', '--depth=1', source, str(dest)],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
_LOG.error("Failed to clone '%s':\n%s", source, e.stderr)
raise e
return dest
def _get_tag_from_url(url: str) -> str:
"""Extracts a git tag or commit from a GitHub archive URL."""
# Tries to match common GitHub URL patterns for release archives.
# The patterns are ordered from most specific (40-char hash) to most
# general.
patterns = [
# https://github.com/user/repo/archive/abcdef123456.zip
r"github\.com/[^/]+/[^/]+/archive/([a-f0-9]{40})\.(?:zip|tar\.gz)",
# https://github.com/user/repo/archive/refs/tags/v1.0.0.tar.gz
r"github\.com/[^/]+/[^/]+/archive/refs/tags/([^/]+?)\.(?:zip|tar\.gz)",
# https://github.com/user/repo/archive/v1.0.0.tar.gz
r"github\.com/[^/]+/[^/]+/archive/([^/]+?)\.(?:zip|tar\.gz)",
# https://github.com/user/repo/releases/download/v1.0.0/archive.zip
r"github\.com/[^/]+/[^/]+/releases/download/([^/]+)/",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract tag from URL: {url}")
def _get_commit_for_tag(repo_url: str, tag: str) -> str:
"""Queries a git remote to find the commit hash for a given tag."""
_LOG.debug(f" Querying {repo_url} for tag '{tag}'")
# If tag is already a full commit hash, just return it. The build will
# fail later if it's invalid.
if re.fullmatch(r'[a-f0-9]{40}', tag):
_LOG.debug(
f" '{tag}' appears to be a commit hash, using verbatim."
)
return tag
# Try fetching the tag with a `refs/tags/` prefix first.
cmd = ['git', 'ls-remote', repo_url, f'refs/tags/{tag}']
try:
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError:
# If that fails, try fetching the tag directly. This can happen with
# non-annotated tags.
cmd = ['git', 'ls-remote', repo_url, tag]
try:
result = subprocess.run(
cmd, check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
_LOG.error(
f"Failed to find tag '{tag}' in '{repo_url}':\n{e.stderr}"
)
raise e
output = result.stdout.strip()
if not output:
raise ValueError(f"Tag '{tag}' not found in remote '{repo_url}'")
commit_hash = output.split()[0]
_LOG.debug(f" Found commit: {commit_hash}")
return commit_hash
def update(source: Path, config_path: Path, out: Path) -> int:
"""Adds new releases for the allowlisted modules to the mirror."""
with open(config_path, 'r') as f:
config = json.load(f)
allowlisted_modules = config.get('modules', {})
if not (source / 'bazel_registry.json').exists():
_LOG.error("Source '%s' is not a valid Bazel Central Registry.", source)
return 1
for module_name, module_config in allowlisted_modules.items():
_LOG.info('Processing module: %s', module_name)
src_metadata_path = source / 'modules' / module_name / 'metadata.json'
dest_metadata_path = out / 'modules' / module_name / 'metadata.json'
if not src_metadata_path.exists():
_LOG.error("Source metadata not found for '%s'", module_name)
continue
if not dest_metadata_path.exists():
_LOG.error(
"Destination metadata not found for '%s'. Run 'add' first.",
module_name,
)
continue
with open(src_metadata_path, 'r') as f:
src_metadata = json.load(f)
with open(dest_metadata_path, 'r') as f:
dest_metadata = json.load(f)
src_versions = set(src_metadata.get('versions', []))
dest_versions = set(dest_metadata.get('versions', []))
new_versions = sorted(list(src_versions - dest_versions))
if not new_versions:
_LOG.info(" No new versions for %s.", module_name)
continue
_LOG.info(" Found new versions: %s", ', '.join(new_versions))
manual_versions = set(module_config.get('manual_versions', []))
banned_versions = set(module_config.get('banned_versions', []))
mirrored_repo_url = dest_metadata['repository'][0]
versions_added = []
for version in new_versions:
if version in manual_versions:
_LOG.info(" Skipping manual version: %s", version)
continue
if version in banned_versions:
_LOG.info(" Skipping banned version: %s", version)
continue
_LOG.debug(" Processing version: %s", version)
src_version_dir = source / 'modules' / module_name / version
dest_version_dir = out / 'modules' / module_name / version
if dest_version_dir.exists():
shutil.rmtree(dest_version_dir)
dest_version_dir.mkdir(parents=True, exist_ok=True)
with open(src_version_dir / 'source.json', 'r') as f:
src_source_info = json.load(f)
dest_source_info = {}
source_type = src_source_info.get('type', 'http_archive')
if source_type == 'http_archive':
url = src_source_info['url']
try:
tag = _get_tag_from_url(url)
commit = _get_commit_for_tag(mirrored_repo_url, tag)
dest_source_info = {
'type': 'git_repository',
'remote': mirrored_repo_url,
'commit': commit,
}
except (ValueError, subprocess.CalledProcessError) as e:
_LOG.error(
" Failed to process http_archive for %s: %s",
version,
e,
)
continue
elif source_type == 'git_repository':
commit = src_source_info['commit']
dest_source_info = {
'type': 'git_repository',
'remote': mirrored_repo_url,
'commit': commit,
}
else:
_LOG.error(
" Unsupported source type '%s' for %s",
source_type,
version,
)
continue
_COPIED_FIELDS = (
'overlay',
# For now, just copy this over verbatim. Ideally, the SHAs are
# recalculated so we're 100% sure they're valid.
'patches',
'strip',
)
for field in _COPIED_FIELDS:
if field in src_source_info:
dest_source_info[field] = src_source_info[field]
with open(dest_version_dir / 'source.json', 'w') as f:
json.dump(dest_source_info, f, indent=2)
_COPIED_SUBDIRS = (
'patches',
'overlay',
)
for subdir in _COPIED_SUBDIRS:
src_subdir = src_version_dir / subdir
if src_subdir.is_dir():
shutil.copytree(src_subdir, dest_version_dir / subdir)
versions_added.append(version)
if versions_added:
dest_metadata.setdefault('versions', [])
dest_metadata['versions'].extend(versions_added)
dest_metadata['versions'].sort()
with open(dest_metadata_path, 'w') as f:
json.dump(dest_metadata, f, indent=2)
_LOG.info(
" Successfully mirrored %d new version(s).",
len(versions_added),
)
return 0
def add(
source: Path,
config: Path,
out: Path,
module_name: str,
interactive: bool = True,
) -> int:
"""Adds a new Bazel module to the mirror allowlist."""
assert (
config.exists()
), f'Config file {config} not found, did you run `create` first?'
with open(config, 'r') as f:
config_data = json.load(f)
if module_name in config_data.get('modules', {}):
_LOG.info('Module `%s` already in config.', module_name)
return 0
_LOG.debug(f'Processing module: {module_name}')
metadata_path = source / 'modules' / module_name / 'metadata.json'
if not metadata_path.exists():
_LOG.error('%s not found in the BCR', module_name)
return 1
with open(metadata_path, 'r') as f:
metadata = json.load(f)
# Replace `repository`, and start with an empty `versions`.
mirrored_metadata = metadata.copy()
mirrored_metadata.pop('versions', None)
mirrored_metadata.pop('repository', None)
module_out_dir = out / 'modules' / module_name
module_out_dir.mkdir(parents=True, exist_ok=True)
repo_str = metadata.get('repository', [''])[0]
mirror_prefix = config_data.get('mirror_prefix', '')
verified_repo_url = None
try:
org, repo = _parse_org_and_repo(repo_str)
candidate_url = f'{mirror_prefix}{org}/{repo}'
except ValueError:
candidate_url = None
url_to_check = candidate_url
while True:
if not url_to_check:
if not interactive:
# Non-interactive mode and the path couldn't be magically
# identified. Abort.
break
# Prompt user for the right mirror path.
_LOG.error(
'Could not automatically verify a mirror for %s', module_name
)
_LOG.info('Original repository: %s', repo_str)
user_input = input('Enter mirror URL, or (s)kip: ')
if user_input.lower() == 's':
return 1
if not _is_url(user_input):
url_to_check = f'{mirror_prefix}{user_input}'
else:
url_to_check = user_input
if _check_repo_exists(url_to_check):
verified_repo_url = url_to_check
break
# Check failed.
if not interactive:
# Only try to validate once if not in interactive mode.
break
# Interactive and failed.
_LOG.error(
'Failed to verify %s, please try again',
url_to_check,
)
url_to_check = None # Force prompt on next iteration.
if not verified_repo_url:
raise AssertionError(
f'Failed to identify mirror for {module_name}: '
f'candidate {candidate_url} does not exist or is invalid'
)
if verified_repo_url and verified_repo_url == repo_str:
_LOG.error('Mirroring to the original upstream is not allowed')
return 1
config_data.setdefault('modules', {})[module_name] = {}
mirrored_metadata['repository'] = [verified_repo_url]
with open(module_out_dir / 'metadata.json', 'w') as f:
json.dump(mirrored_metadata, f, indent=2)
with open(config, 'w') as f:
json.dump(config_data, f, indent=2)
_LOG.debug(f'Wrote config to {config}')
return 0
def _parse_org_and_repo(url: str) -> tuple[str, str]:
supported_prefixes = (
'github:',
'https://github.com/',
)
for prefix in supported_prefixes:
if url.startswith(prefix):
url_parts = url.removeprefix(prefix).split('/')
assert len(url_parts) >= 2, f'Malformed GitHub URL: {url}'
return url_parts[0], url_parts[1]
raise ValueError(f'Unexpected URL format: {url}')
def _walk_deps(obj, expanded_modules: set[str] | None = None) -> set[str]:
# TODO: This doesn't seem to comprehensively find all dependencies Bazel
# will try to resolve among. It's possible Bazel's builtin module
# dependencies are missing from this graph?
result = set()
if expanded_modules is None:
expanded_modules = set()
if 'dependencies' in obj:
for dep in obj['dependencies']:
if 'name' not in dep:
_LOG.error('Unnamed node in project dependency graph: %s', dep)
continue
if dep['name'] in expanded_modules:
continue
result = result | _walk_deps(dep, expanded_modules)
if 'root' not in obj and 'name' in obj:
result.add(obj['name'])
expanded_modules.add(obj['name'])
return result
def _collect_project_dependencies(project_root: Path) -> list[str]:
_LOG.debug(
'Detecting transitive module dependencies in %s...',
project_root.resolve(),
)
result = subprocess.run(
['bazel', 'mod', 'graph', '--output=json'],
cwd=project_root,
capture_output=True,
text=True,
check=True,
)
mod_graph = json.loads(result.stdout)
return list(_walk_deps(mod_graph))
def _check_repo_exists(url: str) -> bool:
_LOG.info(f'Verifying {url}...')
result = subprocess.run(
['git', 'ls-remote', '--exit-code', url],
capture_output=True,
text=True,
)
if result.returncode != 0:
_LOG.error(f'{url} does not appear to work')
return result.returncode == 0
def _is_url(uri_or_suffix: str) -> bool:
if uri_or_suffix.startswith('http://'):
raise ValueError(
f'http://.* URLs banned ({uri_or_suffix}), ' 'please use https://.*'
)
return ':' in uri_or_suffix
def create(
source: Path,
config: Path,
out: Path,
project: Path,
mirror_prefix: str | None,
interactive: bool = True,
) -> int:
"""Creates a new mirror."""
if not mirror_prefix:
if interactive:
mirror_prefix = input(
'Enter the main mirror URL prefix (e.g., https://pigweed.googlesource.com/third_party/github): '
)
else:
raise ValueError(
'Cannot create a new BCR mirror without a --mirror-prefix'
)
# Make sure there's a trailing forward slash.
if mirror_prefix and not mirror_prefix.endswith('/'):
mirror_prefix += '/'
dependencies = _collect_project_dependencies(project)
# Create an initial empty config file.
config_data = {}
# If a mirror prefix was configured, set it now.
if mirror_prefix:
config_data['mirror_prefix'] = mirror_prefix
# Write config file so it exists.
out.mkdir(exist_ok=True, parents=True)
with open(config, 'w') as f:
json.dump(config_data, f, indent=2)
for module_name in dependencies:
add(
source,
config,
out,
module_name=module_name,
interactive=interactive,
)
_LOG.debug('Running update() to populate the mirror...')
return update(source, config, out)
def main(source: str, config: Path, out: Path, func: callable, **kwargs) -> int:
logging.basicConfig(format='%(message)s', level=logging.DEBUG)
remote_bcr_path = None
if _is_url(source):
remote_registry_tempdir = tempfile.TemporaryDirectory()
remote_bcr_path = _fetch_remote_registry(
Path(remote_registry_tempdir.name),
source,
)
else:
# On-disk.
remote_bcr_path = Path(source)
if not remote_bcr_path:
_LOG.error("Remote BCR path not resolved.", file=sys.stderr)
return 1
func(remote_bcr_path, config, out, **kwargs)
return 0
if __name__ == '__main__':
args = _parse_args()
sys.exit(
main(
**vars(args),
)
)