| #!/usr/bin/env python3 |
| # Copyright 2026 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Mirror the BCR to a local repository. |
| |
| !!! WARNING !!! |
| THIS SCRIPT WILL NOT WORK UNTIL THE FOLLOWING BAZEL ISSUE IS FIXED: |
| |
| https://github.com/bazelbuild/bazel/issues/22857 |
| |
| The above issue prevents `git_repository` Bazel registry types from having |
| `patches` or `overlay` entries, which means that many Bazel modules are |
| effectively broken. |
| !!! WARNING !!! |
| |
| This script scans the BCR (a local checkout, or the latest HEAD) and creates |
| equivalent git_repository mirrors. This script assumes that you have already |
| mirrored required Bazel module dependencies to your own host in a way such that |
| they can be referenced as: |
| |
| [arbitrary prefix]/[GitHub org or owner]/[GitHub repository] |
| |
| |
| Example usage |
| ============= |
| |
| Create a new mirror |
| ------------------- |
| |
| Create mirrors for the modules used by ~/projects/pigweed |
| |
| $ ./tools/mirror_bazel_registry.py \\ |
| --source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\ |
| --out=. \\ |
| --config=./config.json \\ |
| create ~/projects/pigweed \\ |
| --mirror-prefix=https://pigweed.googlesource.com/third_party/github |
| |
| Update an existing mirror |
| ------------------------- |
| |
| $ ./tools/mirror_bazel_registry.py \\ |
| --source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\ |
| --out=. \\ |
| --config=./config.json \\ |
| update |
| |
| Add/allowlist a new module |
| -------------------------- |
| |
| $ ./tools/mirror_bazel_registry.py \\ |
| --source=https://pigweed.googlesource.com/third_party/github/bazelbuild/bazel-central-registry.git \\ |
| --out=. \\ |
| --config=./config.json \\ |
| add rules_fuzzing |
| |
| Config format overview |
| ====================== |
| |
| * mirror_prefix: The main git host to use when transforming module sources |
| of truth. |
| * modules: Allowlisted modules. The key is the module name, value is a |
| dictionary of options. |
| * manual_versions: A list of versions that are hand-crafted and should be |
| largely ignored during integrity checks. This is particularly useful for |
| Bazel modules that have nonstandard source URLs. |
| * banned_versions: Similar to yanked versions, prohibits specific releases |
| from being mirrored. This will cause version resolution issues if any |
| project using this registry references these versions. |
| |
| Example config |
| -------------- |
| |
| { |
| "mirror_prefix": "https://pigweed.googlesource.com/third_party/github", |
| "modules": [ |
| "nanopb": { |
| "manual_versions": [ |
| "0.3.9.10", |
| "0.4.9" |
| ] |
| }, |
| "pico-sdk": {}, |
| "protobuf": {}, |
| "re2": { |
| "banned_versions": [ |
| "2025-06-26b" |
| ] |
| } |
| ] |
| } |
| """ |
| |
| import argparse |
| import logging |
| from pathlib import Path |
| import re |
| import sys |
| import tempfile |
| import json |
| import subprocess |
| import shutil |
| |
| if sys.version_info < (3, 10): |
| print( |
| 'Python 3.10 or greater is required to run this script.', |
| file=sys.stderr, |
| ) |
| print( |
| 'You are using Python {}'.format(sys.version), |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
| |
| # NOTE: This is used in CI in an isolated environment. Do NOT introduce |
| # dependencies (whether 1p or 3p) to this script. |
| |
| _LOG = logging.getLogger(__name__) |
| |
| |
| def _parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument( |
| '--source', |
| help=( |
| 'The source registry that declares original Bazel module sources ' |
| 'of truth' |
| ), |
| ) |
| parser.add_argument( |
| '--config', |
| type=Path, |
| required=True, |
| help='The configuration file used to guide this.', |
| ) |
| parser.add_argument( |
| '--out', |
| type=Path, |
| required=True, |
| help='The destination directory to write the mirrored registry to.', |
| ) |
| subparsers = parser.add_subparsers(required=True) |
| update_cmd_parser = subparsers.add_parser( |
| name='update', |
| help=( |
| 'Scan a Bazel registry for new versions, and add them to a mirror.' |
| ), |
| ) |
| update_cmd_parser.set_defaults(func=update) |
| |
| create_cmd_parser = subparsers.add_parser( |
| name='create', |
| help='Interactively creates a new mirror to the specified directory.', |
| ) |
| create_cmd_parser.set_defaults(func=create) |
| |
| create_cmd_parser.add_argument( |
| 'project', |
| type=Path, |
| help=( |
| 'Path to the project to create a mirror for. This magically ' |
| 'identifies the module dependencies and then adds entries for each ' |
| 'identified module.', |
| ), |
| ) |
| create_cmd_parser.add_argument( |
| '--mirror-prefix', |
| help=( |
| 'The default prefix to use when remapping Bazel module git ' |
| 'repositories', |
| ), |
| ) |
| |
| add_cmd_parser = subparsers.add_parser( |
| name='add', |
| help='Add a new bazel module to an existing project.', |
| ) |
| add_cmd_parser.set_defaults(func=add) |
| add_cmd_parser.add_argument( |
| 'module_name', |
| type=str, |
| help='The name of the Bazel module to add.', |
| ) |
| |
| if len(sys.argv) == 1: |
| return parser.parse_args(['--help']) |
| |
| return parser.parse_args() |
| |
| |
| def _fetch_remote_registry(tempdir: Path, source: str) -> Path: |
| """Fetches a remote git repository.""" |
| _LOG.debug('Cloning %s...', source) |
| dest = tempdir / 'bcr' |
| try: |
| subprocess.run( |
| ['git', 'clone', '--depth=1', source, str(dest)], |
| check=True, |
| capture_output=True, |
| text=True, |
| ) |
| except subprocess.CalledProcessError as e: |
| _LOG.error("Failed to clone '%s':\n%s", source, e.stderr) |
| raise e |
| return dest |
| |
| |
| def _get_tag_from_url(url: str) -> str: |
| """Extracts a git tag or commit from a GitHub archive URL.""" |
| # Tries to match common GitHub URL patterns for release archives. |
| # The patterns are ordered from most specific (40-char hash) to most |
| # general. |
| patterns = [ |
| # https://github.com/user/repo/archive/abcdef123456.zip |
| r"github\.com/[^/]+/[^/]+/archive/([a-f0-9]{40})\.(?:zip|tar\.gz)", |
| # https://github.com/user/repo/archive/refs/tags/v1.0.0.tar.gz |
| r"github\.com/[^/]+/[^/]+/archive/refs/tags/([^/]+?)\.(?:zip|tar\.gz)", |
| # https://github.com/user/repo/archive/v1.0.0.tar.gz |
| r"github\.com/[^/]+/[^/]+/archive/([^/]+?)\.(?:zip|tar\.gz)", |
| # https://github.com/user/repo/releases/download/v1.0.0/archive.zip |
| r"github\.com/[^/]+/[^/]+/releases/download/([^/]+)/", |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, url) |
| if match: |
| return match.group(1) |
| raise ValueError(f"Could not extract tag from URL: {url}") |
| |
| |
| def _get_commit_for_tag(repo_url: str, tag: str) -> str: |
| """Queries a git remote to find the commit hash for a given tag.""" |
| _LOG.debug(f" Querying {repo_url} for tag '{tag}'") |
| # If tag is already a full commit hash, just return it. The build will |
| # fail later if it's invalid. |
| if re.fullmatch(r'[a-f0-9]{40}', tag): |
| _LOG.debug( |
| f" '{tag}' appears to be a commit hash, using verbatim." |
| ) |
| return tag |
| |
| # Try fetching the tag with a `refs/tags/` prefix first. |
| cmd = ['git', 'ls-remote', repo_url, f'refs/tags/{tag}'] |
| try: |
| result = subprocess.run(cmd, check=True, capture_output=True, text=True) |
| except subprocess.CalledProcessError: |
| # If that fails, try fetching the tag directly. This can happen with |
| # non-annotated tags. |
| cmd = ['git', 'ls-remote', repo_url, tag] |
| try: |
| result = subprocess.run( |
| cmd, check=True, capture_output=True, text=True |
| ) |
| except subprocess.CalledProcessError as e: |
| _LOG.error( |
| f"Failed to find tag '{tag}' in '{repo_url}':\n{e.stderr}" |
| ) |
| raise e |
| |
| output = result.stdout.strip() |
| if not output: |
| raise ValueError(f"Tag '{tag}' not found in remote '{repo_url}'") |
| |
| commit_hash = output.split()[0] |
| _LOG.debug(f" Found commit: {commit_hash}") |
| return commit_hash |
| |
| |
| def update(source: Path, config_path: Path, out: Path) -> int: |
| """Adds new releases for the allowlisted modules to the mirror.""" |
| with open(config_path, 'r') as f: |
| config = json.load(f) |
| allowlisted_modules = config.get('modules', {}) |
| |
| if not (source / 'bazel_registry.json').exists(): |
| _LOG.error("Source '%s' is not a valid Bazel Central Registry.", source) |
| return 1 |
| |
| for module_name, module_config in allowlisted_modules.items(): |
| _LOG.info('Processing module: %s', module_name) |
| |
| src_metadata_path = source / 'modules' / module_name / 'metadata.json' |
| dest_metadata_path = out / 'modules' / module_name / 'metadata.json' |
| |
| if not src_metadata_path.exists(): |
| _LOG.error("Source metadata not found for '%s'", module_name) |
| continue |
| if not dest_metadata_path.exists(): |
| _LOG.error( |
| "Destination metadata not found for '%s'. Run 'add' first.", |
| module_name, |
| ) |
| continue |
| |
| with open(src_metadata_path, 'r') as f: |
| src_metadata = json.load(f) |
| with open(dest_metadata_path, 'r') as f: |
| dest_metadata = json.load(f) |
| |
| src_versions = set(src_metadata.get('versions', [])) |
| dest_versions = set(dest_metadata.get('versions', [])) |
| new_versions = sorted(list(src_versions - dest_versions)) |
| |
| if not new_versions: |
| _LOG.info(" No new versions for %s.", module_name) |
| continue |
| |
| _LOG.info(" Found new versions: %s", ', '.join(new_versions)) |
| |
| manual_versions = set(module_config.get('manual_versions', [])) |
| banned_versions = set(module_config.get('banned_versions', [])) |
| mirrored_repo_url = dest_metadata['repository'][0] |
| versions_added = [] |
| |
| for version in new_versions: |
| if version in manual_versions: |
| _LOG.info(" Skipping manual version: %s", version) |
| continue |
| if version in banned_versions: |
| _LOG.info(" Skipping banned version: %s", version) |
| continue |
| |
| _LOG.debug(" Processing version: %s", version) |
| src_version_dir = source / 'modules' / module_name / version |
| dest_version_dir = out / 'modules' / module_name / version |
| if dest_version_dir.exists(): |
| shutil.rmtree(dest_version_dir) |
| dest_version_dir.mkdir(parents=True, exist_ok=True) |
| |
| with open(src_version_dir / 'source.json', 'r') as f: |
| src_source_info = json.load(f) |
| |
| dest_source_info = {} |
| source_type = src_source_info.get('type', 'http_archive') |
| |
| if source_type == 'http_archive': |
| url = src_source_info['url'] |
| try: |
| tag = _get_tag_from_url(url) |
| commit = _get_commit_for_tag(mirrored_repo_url, tag) |
| dest_source_info = { |
| 'type': 'git_repository', |
| 'remote': mirrored_repo_url, |
| 'commit': commit, |
| } |
| except (ValueError, subprocess.CalledProcessError) as e: |
| _LOG.error( |
| " Failed to process http_archive for %s: %s", |
| version, |
| e, |
| ) |
| continue |
| |
| elif source_type == 'git_repository': |
| commit = src_source_info['commit'] |
| dest_source_info = { |
| 'type': 'git_repository', |
| 'remote': mirrored_repo_url, |
| 'commit': commit, |
| } |
| else: |
| _LOG.error( |
| " Unsupported source type '%s' for %s", |
| source_type, |
| version, |
| ) |
| continue |
| |
| _COPIED_FIELDS = ( |
| 'overlay', |
| # For now, just copy this over verbatim. Ideally, the SHAs are |
| # recalculated so we're 100% sure they're valid. |
| 'patches', |
| 'strip', |
| ) |
| for field in _COPIED_FIELDS: |
| if field in src_source_info: |
| dest_source_info[field] = src_source_info[field] |
| |
| with open(dest_version_dir / 'source.json', 'w') as f: |
| json.dump(dest_source_info, f, indent=2) |
| |
| _COPIED_SUBDIRS = ( |
| 'patches', |
| 'overlay', |
| ) |
| for subdir in _COPIED_SUBDIRS: |
| src_subdir = src_version_dir / subdir |
| if src_subdir.is_dir(): |
| shutil.copytree(src_subdir, dest_version_dir / subdir) |
| |
| versions_added.append(version) |
| |
| if versions_added: |
| dest_metadata.setdefault('versions', []) |
| dest_metadata['versions'].extend(versions_added) |
| dest_metadata['versions'].sort() |
| with open(dest_metadata_path, 'w') as f: |
| json.dump(dest_metadata, f, indent=2) |
| _LOG.info( |
| " Successfully mirrored %d new version(s).", |
| len(versions_added), |
| ) |
| |
| return 0 |
| |
| |
| def add( |
| source: Path, |
| config: Path, |
| out: Path, |
| module_name: str, |
| interactive: bool = True, |
| ) -> int: |
| """Adds a new Bazel module to the mirror allowlist.""" |
| assert ( |
| config.exists() |
| ), f'Config file {config} not found, did you run `create` first?' |
| with open(config, 'r') as f: |
| config_data = json.load(f) |
| |
| if module_name in config_data.get('modules', {}): |
| _LOG.info('Module `%s` already in config.', module_name) |
| return 0 |
| |
| _LOG.debug(f'Processing module: {module_name}') |
| metadata_path = source / 'modules' / module_name / 'metadata.json' |
| |
| if not metadata_path.exists(): |
| _LOG.error('%s not found in the BCR', module_name) |
| return 1 |
| |
| with open(metadata_path, 'r') as f: |
| metadata = json.load(f) |
| |
| # Replace `repository`, and start with an empty `versions`. |
| mirrored_metadata = metadata.copy() |
| mirrored_metadata.pop('versions', None) |
| mirrored_metadata.pop('repository', None) |
| |
| module_out_dir = out / 'modules' / module_name |
| module_out_dir.mkdir(parents=True, exist_ok=True) |
| |
| repo_str = metadata.get('repository', [''])[0] |
| |
| mirror_prefix = config_data.get('mirror_prefix', '') |
| verified_repo_url = None |
| try: |
| org, repo = _parse_org_and_repo(repo_str) |
| candidate_url = f'{mirror_prefix}{org}/{repo}' |
| except ValueError: |
| candidate_url = None |
| |
| url_to_check = candidate_url |
| |
| while True: |
| if not url_to_check: |
| if not interactive: |
| # Non-interactive mode and the path couldn't be magically |
| # identified. Abort. |
| break |
| # Prompt user for the right mirror path. |
| _LOG.error( |
| 'Could not automatically verify a mirror for %s', module_name |
| ) |
| _LOG.info('Original repository: %s', repo_str) |
| user_input = input('Enter mirror URL, or (s)kip: ') |
| if user_input.lower() == 's': |
| return 1 |
| if not _is_url(user_input): |
| url_to_check = f'{mirror_prefix}{user_input}' |
| else: |
| url_to_check = user_input |
| |
| if _check_repo_exists(url_to_check): |
| verified_repo_url = url_to_check |
| break |
| |
| # Check failed. |
| if not interactive: |
| # Only try to validate once if not in interactive mode. |
| break |
| |
| # Interactive and failed. |
| _LOG.error( |
| 'Failed to verify %s, please try again', |
| url_to_check, |
| ) |
| url_to_check = None # Force prompt on next iteration. |
| |
| if not verified_repo_url: |
| raise AssertionError( |
| f'Failed to identify mirror for {module_name}: ' |
| f'candidate {candidate_url} does not exist or is invalid' |
| ) |
| |
| if verified_repo_url and verified_repo_url == repo_str: |
| _LOG.error('Mirroring to the original upstream is not allowed') |
| return 1 |
| |
| config_data.setdefault('modules', {})[module_name] = {} |
| mirrored_metadata['repository'] = [verified_repo_url] |
| |
| with open(module_out_dir / 'metadata.json', 'w') as f: |
| json.dump(mirrored_metadata, f, indent=2) |
| |
| with open(config, 'w') as f: |
| json.dump(config_data, f, indent=2) |
| _LOG.debug(f'Wrote config to {config}') |
| return 0 |
| |
| |
| def _parse_org_and_repo(url: str) -> tuple[str, str]: |
| supported_prefixes = ( |
| 'github:', |
| 'https://github.com/', |
| ) |
| for prefix in supported_prefixes: |
| if url.startswith(prefix): |
| url_parts = url.removeprefix(prefix).split('/') |
| assert len(url_parts) >= 2, f'Malformed GitHub URL: {url}' |
| return url_parts[0], url_parts[1] |
| |
| raise ValueError(f'Unexpected URL format: {url}') |
| |
| |
| def _walk_deps(obj, expanded_modules: set[str] | None = None) -> set[str]: |
| # TODO: This doesn't seem to comprehensively find all dependencies Bazel |
| # will try to resolve among. It's possible Bazel's builtin module |
| # dependencies are missing from this graph? |
| result = set() |
| if expanded_modules is None: |
| expanded_modules = set() |
| if 'dependencies' in obj: |
| for dep in obj['dependencies']: |
| if 'name' not in dep: |
| _LOG.error('Unnamed node in project dependency graph: %s', dep) |
| continue |
| if dep['name'] in expanded_modules: |
| continue |
| result = result | _walk_deps(dep, expanded_modules) |
| |
| if 'root' not in obj and 'name' in obj: |
| result.add(obj['name']) |
| expanded_modules.add(obj['name']) |
| return result |
| |
| |
| def _collect_project_dependencies(project_root: Path) -> list[str]: |
| _LOG.debug( |
| 'Detecting transitive module dependencies in %s...', |
| project_root.resolve(), |
| ) |
| result = subprocess.run( |
| ['bazel', 'mod', 'graph', '--output=json'], |
| cwd=project_root, |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| mod_graph = json.loads(result.stdout) |
| return list(_walk_deps(mod_graph)) |
| |
| |
| def _check_repo_exists(url: str) -> bool: |
| _LOG.info(f'Verifying {url}...') |
| result = subprocess.run( |
| ['git', 'ls-remote', '--exit-code', url], |
| capture_output=True, |
| text=True, |
| ) |
| if result.returncode != 0: |
| _LOG.error(f'{url} does not appear to work') |
| return result.returncode == 0 |
| |
| |
| def _is_url(uri_or_suffix: str) -> bool: |
| if uri_or_suffix.startswith('http://'): |
| raise ValueError( |
| f'http://.* URLs banned ({uri_or_suffix}), ' 'please use https://.*' |
| ) |
| return ':' in uri_or_suffix |
| |
| |
| def create( |
| source: Path, |
| config: Path, |
| out: Path, |
| project: Path, |
| mirror_prefix: str | None, |
| interactive: bool = True, |
| ) -> int: |
| """Creates a new mirror.""" |
| if not mirror_prefix: |
| if interactive: |
| mirror_prefix = input( |
| 'Enter the main mirror URL prefix (e.g., https://pigweed.googlesource.com/third_party/github): ' |
| ) |
| else: |
| raise ValueError( |
| 'Cannot create a new BCR mirror without a --mirror-prefix' |
| ) |
| |
| # Make sure there's a trailing forward slash. |
| if mirror_prefix and not mirror_prefix.endswith('/'): |
| mirror_prefix += '/' |
| |
| dependencies = _collect_project_dependencies(project) |
| |
| # Create an initial empty config file. |
| config_data = {} |
| |
| # If a mirror prefix was configured, set it now. |
| if mirror_prefix: |
| config_data['mirror_prefix'] = mirror_prefix |
| |
| # Write config file so it exists. |
| out.mkdir(exist_ok=True, parents=True) |
| with open(config, 'w') as f: |
| json.dump(config_data, f, indent=2) |
| |
| for module_name in dependencies: |
| add( |
| source, |
| config, |
| out, |
| module_name=module_name, |
| interactive=interactive, |
| ) |
| |
| _LOG.debug('Running update() to populate the mirror...') |
| return update(source, config, out) |
| |
| |
| def main(source: str, config: Path, out: Path, func: callable, **kwargs) -> int: |
| logging.basicConfig(format='%(message)s', level=logging.DEBUG) |
| remote_bcr_path = None |
| if _is_url(source): |
| remote_registry_tempdir = tempfile.TemporaryDirectory() |
| remote_bcr_path = _fetch_remote_registry( |
| Path(remote_registry_tempdir.name), |
| source, |
| ) |
| else: |
| # On-disk. |
| remote_bcr_path = Path(source) |
| |
| if not remote_bcr_path: |
| _LOG.error("Remote BCR path not resolved.", file=sys.stderr) |
| return 1 |
| |
| func(remote_bcr_path, config, out, **kwargs) |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| args = _parse_args() |
| sys.exit( |
| main( |
| **vars(args), |
| ) |
| ) |