blob: 0ef3f3931f1f88c0718775959d41ec8b7b464989 [file]
"Helper utility for working with pnpm lockfile"
load("@bazel_skylib//lib:dicts.bzl", "dicts")
load(":utils.bzl", "utils")
def gather_transitive_closure(packages, package, no_optional, cache = {}):
"""Walk the dependency tree, collecting the transitive closure of dependencies and their versions.
This is needed to resolve npm dependency cycles.
Note: Starlark expressly forbids recursion and infinite loops, so we need to iterate over a large range of numbers,
where each iteration takes one item from the stack, and possibly adds many new items to the stack.
Args:
packages: dictionary from pnpm lock
package: the package to collect deps for
no_optional: whether to exclude optionalDependencies
cache: a dictionary of results from previous invocations
Returns:
A dictionary of transitive dependencies, mapping package names to dependent versions.
"""
root_package = packages[package]
transitive_closure = {}
transitive_closure[root_package["name"]] = [root_package["version"]]
stack = [_get_package_info_deps(root_package, no_optional)]
iteration_max = 999999
for i in range(0, iteration_max + 1):
if not len(stack):
break
if i == iteration_max:
msg = "gather_transitive_closure exhausted the iteration limit of {} - please report this issue".format(iteration_max)
fail(msg)
deps = stack.pop()
for name in deps.keys():
version = deps[name]
if version[0].isdigit():
package_key = utils.pnpm_name(name, version)
elif version.startswith("/"):
# an aliased dependency
package_key = version[1:]
name, version = utils.parse_pnpm_package_key(name, version)
else:
package_key = version
transitive_closure[name] = transitive_closure.get(name, [])
if version in transitive_closure[name]:
continue
transitive_closure[name].append(version)
if package_key.startswith("link:"):
# we don't need to drill down through first-party links for the transitive closure since there are no cycles
# allowed in first-party links
continue
if package_key in cache:
# Already computed for this dep, merge the cached results
for transitive_name in cache[package_key].keys():
transitive_closure[transitive_name] = transitive_closure.get(transitive_name, [])
for transitive_version in cache[package_key][transitive_name]:
if transitive_version not in transitive_closure[transitive_name]:
transitive_closure[transitive_name].append(transitive_version)
else:
# Recurse into the next level of dependencies
stack.append(_get_package_info_deps(packages[package_key], no_optional))
result = dict()
for key in sorted(transitive_closure.keys()):
result[key] = sorted(transitive_closure[key])
return result
def _get_package_info_deps(package_info, no_optional):
return package_info["dependencies"] if no_optional else dicts.add(package_info["dependencies"], package_info["optional_dependencies"])
def _gather_package_info(package_path, package_snapshot):
if package_path.startswith("/"):
# an aliased dependency
package = package_path[1:]
name, version = utils.parse_pnpm_package_key("", package_path)
friendly_version = utils.strip_peer_dep_or_patched_version(version)
package_key = package
elif package_path.startswith("file:") and utils.is_vendored_tarfile(package_snapshot):
if "name" not in package_snapshot:
fail("expected package %s to have a name field" % package_path)
name = package_snapshot["name"]
package = package_snapshot["name"]
version = package_path
if "version" in package_snapshot:
version = package_snapshot["version"]
package_key = "{}/{}".format(package, version)
friendly_version = version
elif package_path.startswith("file:"):
package = package_path
if "name" not in package_snapshot:
msg = "expected package {} to have a name field".format(package_path)
fail(msg)
name = package_snapshot["name"]
version = package_path
friendly_version = package_snapshot["version"] if "version" in package_snapshot else version
package_key = package
else:
package = package_path
if "name" not in package_snapshot:
msg = "expected package {} to have a name field".format(package_path)
fail(msg)
if "version" not in package_snapshot:
msg = "expected package {} to have a version field".format(package_path)
fail(msg)
name = package_snapshot["name"]
version = package_path
friendly_version = package_snapshot["version"]
package_key = package
if "resolution" not in package_snapshot:
msg = "package {} has no resolution field".format(package_path)
fail(msg)
id = package_snapshot["id"] if "id" in package_snapshot else None
resolution = package_snapshot["resolution"]
return package_key, {
"name": name,
"id": id,
"version": version,
"friendly_version": friendly_version,
"resolution": resolution,
"dependencies": package_snapshot.get("dependencies", {}),
"optional_dependencies": package_snapshot.get("optionalDependencies", {}),
"dev": package_snapshot.get("dev", False),
"optional": package_snapshot.get("optional", False),
"patched": package_snapshot.get("patched", False),
"has_bin": package_snapshot.get("hasBin", False),
"requires_build": package_snapshot.get("requiresBuild", False),
}
def translate_to_transitive_closure(lock_importers, lock_packages, prod = False, dev = False, no_optional = False):
"""Implementation detail of translate_package_lock, converts pnpm-lock to a different dictionary with more data.
Args:
lock_importers: lockfile importers dict
lock_packages: lockfile packages dict
prod: If true, only install dependencies
dev: If true, only install devDependencies
no_optional: If true, optionalDependencies are not installed
Returns:
Nested dictionary suitable for further processing in our repository rule
"""
# All package info mapped by package name
packages = {}
# Packages resolved to a different version
package_version_map = {}
for package_path, package_snapshot in lock_packages.items():
package, package_info = _gather_package_info(package_path, package_snapshot)
packages[package] = package_info
# tarbal versions
if package_info["resolution"].get("tarball", None) and package_info["resolution"]["tarball"].startswith("file:"):
package_version_map[package] = package_info
# Collect deps of each importer (workspace projects)
importers = {}
for importPath in lock_importers.keys():
lock_importer = lock_importers[importPath]
prod_deps = {} if dev else lock_importer.get("dependencies", {})
dev_deps = {} if prod else lock_importer.get("devDependencies", {})
opt_deps = {} if no_optional else lock_importer.get("optionalDependencies", {})
deps = dicts.add(prod_deps, opt_deps)
all_deps = dicts.add(prod_deps, dev_deps, opt_deps)
# Package versions mapped to alternate versions
for info in package_version_map.values():
if info["name"] in deps:
deps[info["name"]] = info["version"]
if info["name"] in all_deps:
all_deps[info["name"]] = info["version"]
importers[importPath] = {
# deps this importer should pass on if it is linked as a first-party package; this does
# not include devDependencies
"deps": deps,
# all deps of this importer to link in the node_modules folder of that Bazel package and
# make available to all build targets; this includes devDependencies
"all_deps": all_deps,
}
# Collect transitive dependencies for each package
cache = {}
for package in packages.keys():
package_info = packages[package]
package_info["transitive_closure"] = gather_transitive_closure(
packages,
package,
no_optional,
cache,
)
cache[package] = package_info["transitive_closure"]
return (importers, packages)