blob: dd9b67e93e9726faec8a4d0c6dfe5c4273ae9a7f [file]
#!/usr/bin/env python3
import argparse
import os
import shlex
import subprocess
import sys
import yaml
def parse_args():
parser = argparse.ArgumentParser(
description="Replicate and emulate BazelCI job configurations from presubmit.yml."
)
parser.add_argument(
"job",
help="The key or name of the CI job to emulate (e.g., ubuntu_workspace).",
)
return parser.parse_args()
def run_cmd(cmd, cwd=None, env=None, shell=False):
if shell:
cmd_str = cmd if isinstance(cmd, str) else " ".join(cmd)
else:
cmd_str = shlex.join(cmd) if isinstance(cmd, list) else str(cmd)
print(f"\nšŸš€ Executing: {cmd_str}")
if cwd and cwd != os.getcwd():
print(f"šŸ“ Directory: {cwd}")
if env and "USE_BAZEL_VERSION" in env:
print(f"šŸ”§ Bazel Version: {env['USE_BAZEL_VERSION']}")
res = subprocess.run(cmd, cwd=cwd, env=env, shell=shell)
if res.returncode != 0:
print(
f"\nāŒ Command failed with return code {res.returncode}: {cmd_str}",
file=sys.stderr,
)
return False
return True
def resolve_bazel_version(task_bazel):
if not task_bazel or task_bazel.startswith("${{"):
return None
return task_bazel
def execute_ci_job(job_key, task, repo_root):
job_name = task.get("name", job_key)
print(f"\n{'=' * 80}\nšŸŽÆ Replicating CI Job: {job_key} ('{job_name}')\n{'=' * 80}")
# Setup working directory
cwd = repo_root
if "working_directory" in task:
cwd = os.path.join(repo_root, task["working_directory"])
if not os.path.exists(cwd):
print(
f"āŒ Error: working_directory '{task['working_directory']}' does not exist at '{cwd}'",
file=sys.stderr,
)
return False
# Setup environment
env = os.environ.copy()
bzl_version = resolve_bazel_version(task.get("bazel"))
if bzl_version:
env["USE_BAZEL_VERSION"] = bzl_version
# Execute pre-commands
is_windows = sys.platform.startswith("win")
pre_cmds = task.get("batch_commands" if is_windows else "shell_commands", [])
for pre_cmd in pre_cmds:
if not run_cmd(pre_cmd, cwd=cwd, env=env, shell=True):
return False
# Execute Build Targets
build_targets = [t for t in task.get("build_targets", []) if t != "--"]
if build_targets:
build_flags = task.get("build_flags", [])
cmd = ["bazel", "build"] + build_flags + ["--"] + build_targets
if not run_cmd(cmd, cwd=cwd, env=env):
return False
# Execute Test Targets
test_targets = [t for t in task.get("test_targets", []) if t != "--"]
if test_targets:
test_flags = task.get("test_flags", [])
if "--build_tests_only" not in test_flags:
test_flags = ["--build_tests_only"] + test_flags
cmd = ["bazel", "test"] + test_flags + ["--"] + test_targets
if not run_cmd(cmd, cwd=cwd, env=env):
return False
# Execute Coverage Targets
coverage_targets = [t for t in task.get("coverage_targets", []) if t != "--"]
if coverage_targets:
coverage_flags = task.get("test_flags", [])
cmd = ["bazel", "coverage"] + coverage_flags + ["--"] + coverage_targets
if not run_cmd(cmd, cwd=cwd, env=env):
return False
print(f"\nšŸŽ‰ Successfully replicated CI Job: {job_key}")
return True
def main():
args = parse_args()
repo_root = os.path.abspath(os.path.dirname(__file__))
presubmit_path = os.path.join(repo_root, ".bazelci/presubmit.yml")
if not os.path.exists(presubmit_path):
print(
f"āŒ Error: Presubmit file not found at '{presubmit_path}'",
file=sys.stderr,
)
sys.exit(1)
with open(presubmit_path) as f:
presubmit = yaml.safe_load(f)
tasks = presubmit.get("tasks", {})
if not tasks:
print(
f"āŒ Error: No tasks found in '{presubmit_path}'",
file=sys.stderr,
)
sys.exit(1)
# If no job specified, print available jobs and exit
if not args.job:
print("āŒ Error: No CI job specified. Provide a job key.\n", file=sys.stderr)
print("šŸ“‹ Available CI Job Keys:", file=sys.stderr)
for key in sorted(tasks.keys()):
name = tasks[key].get("name", key)
print(f" • {key} ({name})", file=sys.stderr)
sys.exit(1)
# Match by key or by name
job_key = None
if args.job in tasks:
job_key = args.job
else:
for key, config in tasks.items():
if config.get("name") == args.job:
job_key = key
break
if not job_key:
print(
f"āŒ Error: CI job '{args.job}' not found in '{presubmit_path}'\n",
file=sys.stderr,
)
print("šŸ“‹ Available CI Job Keys:", file=sys.stderr)
for key in sorted(tasks.keys()):
name = tasks[key].get("name", key)
print(f" • {key} ({name})", file=sys.stderr)
sys.exit(1)
success = execute_ci_job(job_key, tasks[job_key], repo_root)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()