refactor: avoid conflict merging when shared libraries are present (#3448)

Today, when a shared library is present, an extra VenvSymlinkEntry is
generated so that
it is linked directly. Unfortunately, this will always have a path
overlap conflict with
the rest of the venv symlinks, which triggers the conflict merge logic
later in
py_executable. That logic is expensive, as it must flatten all the files
and then
link each file individually (essentially doubling the number of files
materialized).
For large packages like torch (10k+ files), this can dramatically
increase overhead.

To fix, generate VenvSymlinkEntries that don't overlap. The basic logic
for how this works
is to identify paths that *must* be directly linked, marking all their
parent directories
as not being able to be directly linked, and then grouping what remains
into the highest
directly-linkable path.

Along the way, drop the logic that only considers code files and special
cases `__init__.py`
files and implicit packages. This is simplify the code and more
correctly map the
extracted wheel into the venv.
diff --git a/python/private/venv_runfiles.bzl b/python/private/venv_runfiles.bzl
index eeedda4..43fcab6 100644
--- a/python/private/venv_runfiles.bzl
+++ b/python/private/venv_runfiles.bzl
@@ -3,7 +3,6 @@
 load("@bazel_skylib//lib:paths.bzl", "paths")
 load(
     ":common.bzl",
-    "PYTHON_FILE_EXTENSIONS",
     "is_file",
     "relative_path",
     "runfiles_root_path",
@@ -58,7 +57,6 @@
             if is_file(link_to):
                 symlink_from = "{}/{}".format(ctx.label.package, bin_venv_path)
                 runfiles_symlinks[symlink_from] = link_to
-
             else:
                 venv_link = ctx.actions.declare_symlink(bin_venv_path)
                 venv_link_rf_path = runfiles_root_path(ctx, venv_link.short_path)
@@ -77,14 +75,16 @@
     )
 
 # Visible for testing
-def build_link_map(ctx, entries):
+def build_link_map(ctx, entries, return_conflicts = False):
     """Compute the mapping of venv paths to their backing objects.
 
-
     Args:
         ctx: {type}`ctx` current ctx.
         entries: {type}`list[VenvSymlinkEntry]` the entries that describe the
             venv-relative
+        return_conflicts: {type}`bool`. Only present for testing. If True,
+            also return a list of the groups that had overlapping paths and had
+            to be resolved and merged.
 
     Returns:
         {type}`dict[str, dict[str, str|File]]` Mappings of venv paths to their
@@ -114,6 +114,7 @@
 
     # final paths to keep, grouped by kind
     keep_link_map = {}  # dict[str kind, dict[path, str|File]]
+    conflicts = [] if return_conflicts else None
     for kind, entries in entries_by_kind.items():
         # dict[str kind-relative path, str|File link_to]
         keep_kind_link_map = {}
@@ -129,12 +130,17 @@
                 else:
                     keep_kind_link_map[entry.venv_path] = entry.link_to_path
             else:
+                if return_conflicts:
+                    conflicts.append(group)
+
                 # Merge a group of overlapping prefixes
                 _merge_venv_path_group(ctx, group, keep_kind_link_map)
 
         keep_link_map[kind] = keep_kind_link_map
-
-    return keep_link_map
+    if return_conflicts:
+        return keep_link_map, conflicts
+    else:
+        return keep_link_map
 
 def _group_venv_path_entries(entries):
     """Group entries by VenvSymlinkEntry.venv_path overlap.
@@ -235,109 +241,115 @@
     # Append slash to prevent incorrect prefix-string matches
     site_packages_root += "/"
 
-    # We have to build a list of (runfiles path, site-packages path) pairs of the files to
-    # create in the consuming binary's venv site-packages directory. To minimize the number of
-    # files to create, we just return the paths to the directories containing the code of
-    # interest.
-    #
-    # However, namespace packages complicate matters: multiple distributions install in the
-    # same directory in site-packages. This works out because they don't overlap in their
-    # files. Typically, they install to different directories within the namespace package
-    # directory. We also need to ensure that we can handle a case where the main package (e.g.
-    # airflow) has directories only containing data files and then namespace packages coming
-    # along and being next to it.
-    #
-    # Lastly we have to assume python modules just being `.py` files (e.g. typing-extensions)
-    # is just a single Python file.
-
-    dir_symlinks = {}  # dirname -> runfile path
-    venv_symlinks = []
-
-    # Sort so order is top-down
     all_files = sorted(files, key = lambda f: f.short_path)
 
+    # venv paths that cannot be directly linked. Dict acting as set.
+    cannot_be_linked_directly = {}
+
+    # dict[str path, VenvSymlinkEntry]
+    # Where path is the venv path (i.e. relative to site_packages_prefix)
+    venv_symlinks = {}
+
+    # List of (File, str venv_path) tuples
+    files_left_to_link = []
+
+    # We want to minimize the number of files symlinked. Ideally, only the
+    # top-level directories are symlinked. Unfortunately, shared libraries
+    # complicate matters: if a shared library's directory is linked, then the
+    # dynamic linker computes the wrong search path.
+    #
+    # To fix, we have to directly link shared libraries. This then means that
+    # all the parent directories of the shared library can't be linked
+    # directly.
     for src in all_files:
-        path = _repo_relative_short_path(src.short_path)
-        if not path.startswith(site_packages_root):
+        rf_root_path = runfiles_root_path(ctx, src.short_path)
+        _, _, repo_rel_path = rf_root_path.partition("/")
+        head, found_sp_root, venv_path = repo_rel_path.partition(site_packages_root)
+        if head or not found_sp_root:
+            # If head is set, then the path didn't start with site_packages_root
+            # if found_sp_root is empty, then it means it wasn't found at all.
             continue
-        path = path.removeprefix(site_packages_root)
-        dir_name, _, filename = path.rpartition("/")
-        runfiles_dir_name, _, _ = runfiles_root_path(ctx, src.short_path).partition("/")
 
+        filename = paths.basename(venv_path)
         if _is_linker_loaded_library(filename):
-            entry = VenvSymlinkEntry(
+            venv_symlinks[venv_path] = VenvSymlinkEntry(
                 kind = VenvSymlinkKind.LIB,
-                link_to_path = paths.join(runfiles_dir_name, site_packages_root, filename),
+                link_to_path = rf_root_path,
                 link_to_file = src,
                 package = package,
                 version = version_str,
-                venv_path = path,
                 files = depset([src]),
+                venv_path = venv_path,
             )
-            venv_symlinks.append(entry)
+            parent = paths.dirname(venv_path)
+            for _ in range(len(venv_path) + 1):  # Iterate enough times to traverse up
+                if not parent:
+                    break
+                if cannot_be_linked_directly.get(parent, False):
+                    # Already seen
+                    break
+                cannot_be_linked_directly[parent] = True
+                parent = paths.dirname(parent)
+        else:
+            files_left_to_link.append((src, venv_path))
+
+    # At this point, venv_symlinks has entries for the shared libraries
+    # and cannot_be_linked_directly has the directories that cannot be
+    # directly linked. Next, we loop over the remaining files and group
+    # them into the highest level directory that can be linked.
+
+    # dict[str venv_path, list[File]]
+    optimized_groups = {}
+
+    for src, venv_path in files_left_to_link:
+        parent = paths.dirname(venv_path)
+        if not parent:
+            # File in root, must be linked directly
+            optimized_groups.setdefault(venv_path, [])
+            optimized_groups[venv_path].append(src)
             continue
 
-        if dir_name in dir_symlinks:
-            # we already have this dir, this allows us to short-circuit since most of the
-            # ctx.files.data might share the same directories as ctx.files.srcs
-            continue
+        if parent in cannot_be_linked_directly:
+            # File in a directory that cannot be directly linked,
+            # so link the file directly
+            optimized_groups.setdefault(venv_path, [])
+            optimized_groups[venv_path].append(src)
+        else:
+            # This path can be grouped. Find the highest-level directory to link.
+            venv_path = parent
+            next_parent = paths.dirname(parent)
+            for _ in range(len(venv_path) + 1):  # Iterate enough times
+                if next_parent:
+                    if next_parent not in cannot_be_linked_directly:
+                        venv_path = next_parent
+                        next_parent = paths.dirname(next_parent)
+                    else:
+                        break
+                else:
+                    break
 
-        if dir_name:
-            # This can be either:
-            # * a directory with libs (e.g. numpy.libs, created by auditwheel)
-            # * a directory with `__init__.py` file that potentially also needs to be
-            #   symlinked.
-            # * `.dist-info` directory
-            #
-            # This could be also regular files, that just need to be symlinked, so we will
-            # add the directory here.
-            dir_symlinks[dir_name] = runfiles_dir_name
-        elif src.extension in PYTHON_FILE_EXTENSIONS:
-            # This would be files that do not have directories and we just need to add
-            # direct symlinks to them as is, we only allow Python files in here
-            entry = VenvSymlinkEntry(
-                kind = VenvSymlinkKind.LIB,
-                link_to_path = paths.join(runfiles_dir_name, site_packages_root, filename),
-                link_to_file = src,
-                package = package,
-                version = version_str,
-                venv_path = path,
-                files = depset([src]),
-            )
-            venv_symlinks.append(entry)
+            optimized_groups.setdefault(venv_path, [])
+            optimized_groups[venv_path].append(src)
 
-    # Sort so that we encounter `foo` before `foo/bar`. This ensures we
-    # see the top-most explicit package first.
-    dirnames = sorted(dir_symlinks.keys())
-    first_level_explicit_packages = []
-    for d in dirnames:
-        is_sub_package = False
-        for existing in first_level_explicit_packages:
-            # Suffix with / to prevent foo matching foobar
-            if d.startswith(existing + "/"):
-                is_sub_package = True
-                break
-        if not is_sub_package:
-            first_level_explicit_packages.append(d)
-
-    for dirname in first_level_explicit_packages:
-        prefix = dir_symlinks[dirname]
-        link_to_path = paths.join(prefix, site_packages_root, dirname)
-        entry = VenvSymlinkEntry(
+    # Finally, for each group, we create the VenvSymlinkEntry objects
+    for venv_path, files in optimized_groups.items():
+        link_to_path = (
+            _get_label_runfiles_repo(ctx, files[0].owner) +
+            "/" +
+            site_packages_root +
+            venv_path
+        )
+        venv_symlinks[venv_path] = VenvSymlinkEntry(
             kind = VenvSymlinkKind.LIB,
             link_to_path = link_to_path,
+            link_to_file = None,
             package = package,
             version = version_str,
-            venv_path = dirname,
-            files = depset([
-                f
-                for f in all_files
-                if runfiles_root_path(ctx, f.short_path).startswith(link_to_path + "/")
-            ]),
+            venv_path = venv_path,
+            files = depset(files),
         )
-        venv_symlinks.append(entry)
 
-    return venv_symlinks
+    return venv_symlinks.values()
 
 def _is_linker_loaded_library(filename):
     """Tells if a filename is one that `dlopen()` or the runtime linker handles.
@@ -357,9 +369,10 @@
         return True
     return False
 
-def _repo_relative_short_path(short_path):
-    # Convert `../+pypi+foo/some/file.py` to `some/file.py`
-    if short_path.startswith("../"):
-        return short_path[3:].partition("/")[2]
+def _get_label_runfiles_repo(ctx, label):
+    repo = label.repo_name
+    if repo:
+        return repo
     else:
-        return short_path
+        # For files, empty repo means the main repo
+        return ctx.workspace_name
diff --git a/tests/venv_site_packages_libs/app_files_building/app_files_building_tests.bzl b/tests/venv_site_packages_libs/app_files_building/app_files_building_tests.bzl
index db2f21c..fc0b5d0 100644
--- a/tests/venv_site_packages_libs/app_files_building/app_files_building_tests.bzl
+++ b/tests/venv_site_packages_libs/app_files_building/app_files_building_tests.bzl
@@ -26,6 +26,11 @@
 
 _tests = []
 
+# NOTE: In bzlmod, the workspace name is always "_main".
+# Under workspace, the workspace name is the name configured in WORKSPACE,
+# or "__main__" if was unspecified.
+# NOTE: ctx.workspace_name is always the root workspace, not the workspace
+# of the target being processed (ctx.label).
 def _ctx(workspace_name = "_main"):
     return struct(
         workspace_name = workspace_name,
@@ -36,6 +41,23 @@
         short_path = short_path,
     )
 
+def _venv_symlink(venv_path, *, link_to_path = None, files = []):
+    return struct(
+        link_to_path = link_to_path,
+        venv_path = venv_path,
+        files = files,
+    )
+
+def _venv_symlinks_from_entries(entries):
+    result = []
+    for symlink_entry in entries:
+        result.append(struct(
+            venv_path = symlink_entry.venv_path,
+            link_to_path = symlink_entry.link_to_path,
+            files = [f.short_path for f in symlink_entry.files.to_list()],
+        ))
+    return sorted(result, key = lambda e: (e.link_to_path, e.venv_path))
+
 def _entry(venv_path, link_to_path, files = [], **kwargs):
     kwargs.setdefault("kind", VenvSymlinkKind.LIB)
     kwargs.setdefault("package", None)
@@ -89,7 +111,7 @@
         _entry("duplicate", "+dupe_b/site-packages/duplicate", ["d.py"]),
     ]
 
-    actual = build_link_map(_ctx(), entries)
+    actual, conflicts = build_link_map(_ctx(), entries, return_conflicts = True)
     expected_libs = {
         "a-1.0.dist-info": "+pypi_a/site-packages/a-1.0.dist-info",
         "a/a.txt": _file("../+pypi_a/site-packages/a/a.txt"),
@@ -101,6 +123,146 @@
     env.expect.that_dict(actual[VenvSymlinkKind.LIB]).contains_exactly(expected_libs)
     env.expect.that_dict(actual).keys().contains_exactly([VenvSymlinkKind.LIB])
 
+    env.expect.that_int(len(conflicts)).is_greater_than(0)
+
+def _test_optimized_grouping_complex(name):
+    empty_files(
+        name = name + "_files",
+        paths = [
+            "site-packages/pkg1/a.txt",
+            "site-packages/pkg1/b/b_mod.so",
+            "site-packages/pkg1/c/c1.txt",
+            "site-packages/pkg1/c/c2.txt",
+            "site-packages/pkg1/d/d1.txt",
+            "site-packages/pkg1/dd/dd1.txt",
+            "site-packages/pkg1/q1/q1.txt",
+            "site-packages/pkg1/q1/q2a/libq.so",
+            "site-packages/pkg1/q1/q2a/q2.txt",
+            "site-packages/pkg1/q1/q2a/q3/q3a.txt",
+            "site-packages/pkg1/q1/q2a/q3/q3b.txt",
+            "site-packages/pkg1/q1/q2b/q2b.txt",
+        ],
+    )
+    analysis_test(
+        name = name,
+        impl = _test_optimized_grouping_complex_impl,
+        target = name + "_files",
+    )
+
+_tests.append(_test_optimized_grouping_complex)
+
+def _test_optimized_grouping_complex_impl(env, target):
+    test_ctx = _ctx(workspace_name = env.ctx.workspace_name)
+    entries = get_venv_symlinks(
+        test_ctx,
+        target.files.to_list(),
+        package = "pkg1",
+        version_str = "1.0",
+        site_packages_root = env.ctx.label.package + "/site-packages",
+    )
+    actual = _venv_symlinks_from_entries(entries)
+
+    rr = "{}/{}/site-packages/".format(test_ctx.workspace_name, env.ctx.label.package)
+    expected = [
+        _venv_symlink(
+            "pkg1/a.txt",
+            link_to_path = rr + "pkg1/a.txt",
+            files = [
+                "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/a.txt",
+            ],
+        ),
+        _venv_symlink(
+            "pkg1/b",
+            link_to_path = rr + "pkg1/b",
+            files = [
+                "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/b/b_mod.so",
+            ],
+        ),
+        _venv_symlink("pkg1/c", link_to_path = rr + "pkg1/c", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/c/c1.txt",
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/c/c2.txt",
+        ]),
+        _venv_symlink("pkg1/d", link_to_path = rr + "pkg1/d", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/d/d1.txt",
+        ]),
+        _venv_symlink("pkg1/dd", link_to_path = rr + "pkg1/dd", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/dd/dd1.txt",
+        ]),
+        _venv_symlink("pkg1/q1/q1.txt", link_to_path = rr + "pkg1/q1/q1.txt", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q1.txt",
+        ]),
+        _venv_symlink("pkg1/q1/q2a/libq.so", link_to_path = rr + "pkg1/q1/q2a/libq.so", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q2a/libq.so",
+        ]),
+        _venv_symlink("pkg1/q1/q2a/q2.txt", link_to_path = rr + "pkg1/q1/q2a/q2.txt", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q2a/q2.txt",
+        ]),
+        _venv_symlink("pkg1/q1/q2a/q3", link_to_path = rr + "pkg1/q1/q2a/q3", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q2a/q3/q3a.txt",
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q2a/q3/q3b.txt",
+        ]),
+        _venv_symlink("pkg1/q1/q2b", link_to_path = rr + "pkg1/q1/q2b", files = [
+            "tests/venv_site_packages_libs/app_files_building/site-packages/pkg1/q1/q2b/q2b.txt",
+        ]),
+    ]
+    expected = sorted(expected, key = lambda e: (e.link_to_path, e.venv_path))
+    env.expect.that_collection(
+        actual,
+    ).contains_exactly(expected)
+    _, conflicts = build_link_map(test_ctx, entries, return_conflicts = True)
+
+    # The point of the optimization is to avoid having to merge conflicts.
+    env.expect.that_collection(conflicts).contains_exactly([])
+
+def _test_optimized_grouping_single_toplevel(name):
+    empty_files(
+        name = name + "_files",
+        paths = [
+            "site-packages/pkg2/a.txt",
+            "site-packages/pkg2/b_mod.so",
+        ],
+    )
+    analysis_test(
+        name = name,
+        impl = _test_optimized_grouping_single_toplevel_impl,
+        target = name + "_files",
+    )
+
+_tests.append(_test_optimized_grouping_single_toplevel)
+
+def _test_optimized_grouping_single_toplevel_impl(env, target):
+    test_ctx = _ctx(workspace_name = env.ctx.workspace_name)
+    entries = get_venv_symlinks(
+        test_ctx,
+        target.files.to_list(),
+        package = "pkg2",
+        version_str = "1.0",
+        site_packages_root = env.ctx.label.package + "/site-packages",
+    )
+    actual = _venv_symlinks_from_entries(entries)
+
+    rr = "{}/{}/site-packages/".format(test_ctx.workspace_name, env.ctx.label.package)
+    expected = [
+        _venv_symlink(
+            "pkg2",
+            link_to_path = rr + "pkg2",
+            files = [
+                "tests/venv_site_packages_libs/app_files_building/site-packages/pkg2/a.txt",
+                "tests/venv_site_packages_libs/app_files_building/site-packages/pkg2/b_mod.so",
+            ],
+        ),
+    ]
+    expected = sorted(expected, key = lambda e: (e.link_to_path, e.venv_path))
+
+    env.expect.that_collection(
+        actual,
+    ).contains_exactly(expected)
+
+    _, conflicts = build_link_map(test_ctx, entries, return_conflicts = True)
+
+    # The point of the optimization is to avoid having to merge conflicts.
+    env.expect.that_collection(conflicts).contains_exactly([])
+
 def _test_package_version_filtering(name):
     analysis_test(
         name = name,
@@ -219,8 +381,7 @@
             "libfile",
             "+pypi_lib/site-packages/libfile",
             ["lib.txt"],
-            kind =
-                VenvSymlinkKind.LIB,
+            kind = VenvSymlinkKind.LIB,
         ),
         _entry(
             "binfile",
@@ -294,34 +455,33 @@
         site_packages_root = env.ctx.label.package + "/site-packages",
     )
 
-    actual = [e for e in actual_entries if e.venv_path == "foo.libs/libx.so"]
-    if not actual:
-        fail("Did not find VenvSymlinkEntry with venv_path equal to foo.libs/libx.so. " +
-             "Found: {}".format(actual_entries))
-    elif len(actual) > 1:
-        fail("Found multiple entries with venv_path=foo.libs/libx.so. " +
-             "Found: {}".format(actual_entries))
-    actual = actual[0]
+    actual = _venv_symlinks_from_entries(actual_entries)
 
-    actual_files = actual.files.to_list()
-    expected_lib_dso = [f for f in srcs if f.basename == "libx.so"]
-    env.expect.that_collection(actual_files).contains_exactly(expected_lib_dso)
+    env.expect.that_collection(actual).contains_at_least([
+        _venv_symlink(
+            "bar/libs/liby.so",
+            link_to_path = "_main/tests/venv_site_packages_libs/app_files_building/site-packages/bar/libs/liby.so",
+            files = [
+                "tests/venv_site_packages_libs/app_files_building/site-packages/bar/libs/liby.so",
+            ],
+        ),
+        _venv_symlink(
+            "foo.libs/libx.so",
+            link_to_path = "_main/tests/venv_site_packages_libs/app_files_building/site-packages/foo.libs/libx.so",
+            files = [
+                "tests/venv_site_packages_libs/app_files_building/site-packages/foo.libs/libx.so",
+            ],
+        ),
+    ])
 
-    entries = actual_entries
-    actual = build_link_map(_ctx(), entries)
+    actual = build_link_map(_ctx(), actual_entries)
 
     # The important condition is that each lib*.so file is linked directly.
     expected_libs = {
         "bar/libs/liby.so": srcs[0],
-        "bar/x.py": srcs[1],
-        "bar/y.so": srcs[2],
-        "foo": "_main/tests/venv_site_packages_libs/app_files_building/site-packages/foo",
         "foo.libs/libx.so": srcs[3],
-        "root.pth": srcs[-3],
-        "root.py": srcs[-2],
-        "root.so": srcs[-1],
     }
-    env.expect.that_dict(actual[VenvSymlinkKind.LIB]).contains_exactly(expected_libs)
+    env.expect.that_dict(actual[VenvSymlinkKind.LIB]).contains_at_least(expected_libs)
 
 def app_files_building_test_suite(name):
     test_suite(