blob: e0bf69acac9f82b878000933f04a3e177cd8cf48 [file] [log] [blame]
# Copyright 2024 The Bazel Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functionality shared only by repository rule phase code.
This code should only be loaded and used during the repository phase.
"""
REPO_DEBUG_ENV_VAR = "RULES_PYTHON_REPO_DEBUG"
REPO_VERBOSITY_ENV_VAR = "RULES_PYTHON_REPO_DEBUG_VERBOSITY"
def _is_repo_debug_enabled(mrctx):
"""Tells if debbugging output is requested during repo operatiosn.
Args:
mrctx: repository_ctx or module_ctx object
Returns:
True if enabled, False if not.
"""
return _getenv(mrctx, REPO_DEBUG_ENV_VAR) == "1"
def _logger(mrctx, name = None):
"""Creates a logger instance for printing messages.
Args:
mrctx: repository_ctx or module_ctx object. If the attribute
`_rule_name` is present, it will be included in log messages.
name: name for the logger. Optional for repository_ctx usage.
Returns:
A struct with attributes logging: trace, debug, info, warn, fail.
"""
if _is_repo_debug_enabled(mrctx):
verbosity_level = "DEBUG"
else:
verbosity_level = "WARN"
env_var_verbosity = _getenv(mrctx, REPO_VERBOSITY_ENV_VAR)
verbosity_level = env_var_verbosity or verbosity_level
verbosity = {
"DEBUG": 2,
"INFO": 1,
"TRACE": 3,
}.get(verbosity_level, 0)
if hasattr(mrctx, "attr"):
rctx = mrctx # This is `repository_ctx`.
name = name or "{}(@@{})".format(getattr(rctx.attr, "_rule_name", "?"), rctx.name)
elif not name:
fail("The name has to be specified when using the logger with `module_ctx`")
def _log(enabled_on_verbosity, level, message_cb_or_str, printer = print):
if verbosity < enabled_on_verbosity:
return
if type(message_cb_or_str) == "string":
message = message_cb_or_str
else:
message = message_cb_or_str()
# NOTE: printer may be the `fail` function.
printer("\nrules_python:{} {}:".format(
name,
level.upper(),
), message) # buildifier: disable=print
return struct(
trace = lambda message_cb: _log(3, "TRACE", message_cb),
debug = lambda message_cb: _log(2, "DEBUG", message_cb),
info = lambda message_cb: _log(1, "INFO", message_cb),
warn = lambda message_cb: _log(0, "WARNING", message_cb),
fail = lambda message_cb: _log(-1, "FAIL", message_cb, fail),
)
def _execute_internal(
mrctx,
*,
op,
fail_on_error = False,
arguments,
environment = {},
logger = None,
**kwargs):
"""Execute a subprocess with debugging instrumentation.
Args:
mrctx: module_ctx or repository_ctx object
op: string, brief description of the operation this command
represents. Used to succintly describe it in logging and
error messages.
fail_on_error: bool, True if fail() should be called if the command
fails (non-zero exit code), False if not.
arguments: list of arguments; see module_ctx.execute#arguments or
repository_ctx#arguments.
environment: optional dict of the environment to run the command
in; see module_ctx.execute#environment or
repository_ctx.execute#environment.
logger: optional `Logger` to use for logging execution details. Must be
specified when using module_ctx. If not specified, a default will
be created.
**kwargs: additional kwargs to pass onto rctx.execute
Returns:
exec_result object, see repository_ctx.execute return type.
"""
if not logger and hasattr(mrctx, "attr"):
rctx = mrctx
logger = _logger(rctx)
elif not logger:
fail("logger must be specified when using 'module_ctx'")
logger.debug(lambda: (
"repo.execute: {op}: start\n" +
" command: {cmd}\n" +
" working dir: {cwd}\n" +
" timeout: {timeout}\n" +
" environment:{env_str}\n"
).format(
op = op,
cmd = _args_to_str(arguments),
cwd = _cwd_to_str(mrctx, kwargs),
timeout = _timeout_to_str(kwargs),
env_str = _env_to_str(environment),
))
mrctx.report_progress("Running {}".format(op))
result = mrctx.execute(arguments, environment = environment, **kwargs)
if fail_on_error and result.return_code != 0:
logger.fail((
"repo.execute: {op}: end: failure:\n" +
" command: {cmd}\n" +
" return code: {return_code}\n" +
" working dir: {cwd}\n" +
" timeout: {timeout}\n" +
" environment:{env_str}\n" +
"{output}"
).format(
op = op,
cmd = _args_to_str(arguments),
return_code = result.return_code,
cwd = _cwd_to_str(mrctx, kwargs),
timeout = _timeout_to_str(kwargs),
env_str = _env_to_str(environment),
output = _outputs_to_str(result),
))
elif _is_repo_debug_enabled(mrctx):
logger.debug((
"repo.execute: {op}: end: {status}\n" +
" return code: {return_code}\n" +
"{output}"
).format(
op = op,
status = "success" if result.return_code == 0 else "failure",
return_code = result.return_code,
output = _outputs_to_str(result),
))
result_kwargs = {k: getattr(result, k) for k in dir(result)}
return struct(
describe_failure = lambda: _execute_describe_failure(
op = op,
arguments = arguments,
result = result,
mrctx = mrctx,
kwargs = kwargs,
environment = environment,
),
**result_kwargs
)
def _execute_unchecked(*args, **kwargs):
"""Execute a subprocess.
Additional information will be printed if debug output is enabled.
Args:
*args: see _execute_internal
**kwargs: see _execute_internal
Returns:
exec_result object, see repository_ctx.execute return type.
"""
return _execute_internal(fail_on_error = False, *args, **kwargs)
def _execute_checked(*args, **kwargs):
"""Execute a subprocess, failing for a non-zero exit code.
If the command fails, then fail() is called with detailed information
about the command and its failure.
Args:
*args: see _execute_internal
**kwargs: see _execute_internal
Returns:
exec_result object, see repository_ctx.execute return type.
"""
return _execute_internal(fail_on_error = True, *args, **kwargs)
def _execute_checked_stdout(*args, **kwargs):
"""Calls execute_checked, but only returns the stdout value."""
return _execute_checked(*args, **kwargs).stdout
def _execute_describe_failure(*, op, arguments, result, mrctx, kwargs, environment):
return (
"repo.execute: {op}: failure:\n" +
" command: {cmd}\n" +
" return code: {return_code}\n" +
" working dir: {cwd}\n" +
" timeout: {timeout}\n" +
" environment:{env_str}\n" +
"{output}"
).format(
op = op,
cmd = _args_to_str(arguments),
return_code = result.return_code,
cwd = _cwd_to_str(mrctx, kwargs),
timeout = _timeout_to_str(kwargs),
env_str = _env_to_str(environment),
output = _outputs_to_str(result),
)
def _which_checked(mrctx, binary_name):
"""Tests to see if a binary exists, and otherwise fails with a message.
Args:
binary_name: name of the binary to find.
mrctx: module_ctx or repository_ctx.
Returns:
mrctx.Path for the binary.
"""
result = _which_unchecked(mrctx, binary_name)
if result.binary == None:
fail(result.describe_failure())
return result.binary
def _which_unchecked(mrctx, binary_name):
"""Tests to see if a binary exists.
This is also watch the `PATH` environment variable.
Args:
binary_name: name of the binary to find.
mrctx: repository context.
Returns:
`struct` with attributes:
* `binary`: `repository_ctx.Path`
* `describe_failure`: `Callable | None`; takes no args. If the
binary couldn't be found, provides a detailed error description.
"""
path = _getenv(mrctx, "PATH", "")
binary = mrctx.which(binary_name)
if binary:
_watch(mrctx, binary)
describe_failure = None
else:
describe_failure = lambda: _which_describe_failure(binary_name, path)
return struct(
binary = binary,
describe_failure = describe_failure,
)
def _which_describe_failure(binary_name, path):
return (
"Unable to find the binary '{binary_name}' on PATH.\n" +
" PATH = {path}"
).format(
binary_name = binary_name,
path = path,
)
def _getenv(mrctx, name, default = None):
# Bazel 7+ API has (repository|module)_ctx.getenv
return getattr(mrctx, "getenv", mrctx.os.environ.get)(name, default)
def _args_to_str(arguments):
return " ".join([_arg_repr(a) for a in arguments])
def _arg_repr(value):
if _arg_should_be_quoted(value):
return repr(value)
else:
return str(value)
_SPECIAL_SHELL_CHARS = [" ", "'", '"', "{", "$", "("]
def _arg_should_be_quoted(value):
# `value` may be non-str, such as mrctx.path objects
value_str = str(value)
for char in _SPECIAL_SHELL_CHARS:
if char in value_str:
return True
return False
def _cwd_to_str(mrctx, kwargs):
cwd = kwargs.get("working_directory")
if not cwd:
cwd = "<default: {}>".format(mrctx.path(""))
return cwd
def _env_to_str(environment):
if not environment:
env_str = " <default environment>"
else:
env_str = "\n".join(["{}={}".format(k, repr(v)) for k, v in environment.items()])
env_str = "\n" + env_str
return env_str
def _timeout_to_str(kwargs):
return kwargs.get("timeout", "<default timeout>")
def _outputs_to_str(result):
lines = []
items = [
("stdout", result.stdout),
("stderr", result.stderr),
]
for name, content in items:
if content:
lines.append("===== {} start =====".format(name))
# Prevent adding an extra new line, which makes the output look odd.
if content.endswith("\n"):
lines.append(content[:-1])
else:
lines.append(content)
lines.append("===== {} end =====".format(name))
else:
lines.append("<{} empty>".format(name))
return "\n".join(lines)
# This includes the vendored _translate_cpu and _translate_os from
# @platforms//host:extension.bzl at version 0.0.9 so that we don't
# force the users to depend on it.
def _get_platforms_os_name(mrctx):
"""Return the name in @platforms//os for the host os.
Args:
mrctx: module_ctx or repository_ctx.
Returns:
`str`. The target name.
"""
os = mrctx.os.name.lower()
if os.startswith("mac os"):
return "osx"
if os.startswith("freebsd"):
return "freebsd"
if os.startswith("openbsd"):
return "openbsd"
if os.startswith("linux"):
return "linux"
if os.startswith("windows"):
return "windows"
return os
def _get_platforms_cpu_name(mrctx):
"""Return the name in @platforms//cpu for the host arch.
Args:
mrctx: module_ctx or repository_ctx.
Returns:
`str`. The target name.
"""
arch = mrctx.os.arch.lower()
if arch in ["i386", "i486", "i586", "i686", "i786", "x86"]:
return "x86_32"
if arch in ["amd64", "x86_64", "x64"]:
return "x86_64"
if arch in ["ppc", "ppc64", "ppc64le"]:
return "ppc"
if arch in ["arm", "armv7l"]:
return "arm"
if arch in ["aarch64"]:
return "aarch64"
if arch in ["s390x", "s390"]:
return "s390x"
if arch in ["mips64el", "mips64"]:
return "mips64"
if arch in ["riscv64"]:
return "riscv64"
return arch
# TODO: Remove after Bazel 6 support dropped
def _watch(mrctx, *args, **kwargs):
"""Calls mrctx.watch, if available."""
if not args and not kwargs:
fail("'watch' needs at least a single argument.")
if hasattr(mrctx, "watch"):
mrctx.watch(*args, **kwargs)
# TODO: Remove after Bazel 6 support dropped
def _watch_tree(mrctx, *args, **kwargs):
"""Calls mrctx.watch_tree, if available."""
if not args and not kwargs:
fail("'watch_tree' needs at least a single argument.")
if hasattr(mrctx, "watch_tree"):
mrctx.watch_tree(*args, **kwargs)
repo_utils = struct(
# keep sorted
execute_checked = _execute_checked,
execute_checked_stdout = _execute_checked_stdout,
execute_unchecked = _execute_unchecked,
get_platforms_cpu_name = _get_platforms_cpu_name,
get_platforms_os_name = _get_platforms_os_name,
getenv = _getenv,
is_repo_debug_enabled = _is_repo_debug_enabled,
logger = _logger,
watch = _watch,
watch_tree = _watch_tree,
which_checked = _which_checked,
which_unchecked = _which_unchecked,
)