Stop the child output reading when the child process ends.

Otherwise, the child process could terminate without fully closing the pipes, causing unwanted delays - see the added test.

PiperOrigin-RevId: 762172820
diff --git a/fuzztest/internal/subprocess.cc b/fuzztest/internal/subprocess.cc
index 959bcef..af62789 100644
--- a/fuzztest/internal/subprocess.cc
+++ b/fuzztest/internal/subprocess.cc
@@ -24,6 +24,8 @@
 #include <fcntl.h>
 #include <poll.h>
 #include <spawn.h>
+#include <sys/socket.h>
+#include <sys/types.h>
 #include <sys/wait.h>
 #include <unistd.h>
 #endif  // !defined(_MSC_VER)
@@ -64,9 +66,9 @@
       absl::Duration timeout);
 
  private:
-  void CreatePipes();
-  void CloseChildPipes();
-  void CloseParentPipes();
+  void CreateCommunicationFds();
+  void CloseChildFds();
+  void CloseParentFds();
   posix_spawn_file_actions_t CreateChildFileActions();
   void StartWatchdog(absl::Duration timeout);
   pid_t StartChild(
@@ -77,36 +79,41 @@
   // Pipe file descriptors pairs. Index 0 is for stdout, index 1 is for stderr.
   static constexpr int kStdOutIdx = 0;
   static constexpr int kStdErrIdx = 1;
-  int parent_pipe_[2];
-  int child_pipe_[2];
+  int parent_fds_[2];
+  int child_fds_[2];
 };
 
-// Creates parent/child pipes for piping stdout/stderr from child to parent.
-void SubProcess::CreatePipes() {
+// Creates communication fds for passing through stdout/stderr from child to
+// parent.
+void SubProcess::CreateCommunicationFds() {
   for (int channel : {kStdOutIdx, kStdErrIdx}) {
-    int pipe_fds[2];
-    FUZZTEST_INTERNAL_CHECK(pipe(pipe_fds) == 0,
-                            "Cannot create pipe: ", strerror(errno));
+    int fds[2];
+    FUZZTEST_INTERNAL_CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0,
+                            "Cannot create socketpair: ", strerror(errno));
 
-    parent_pipe_[channel] = pipe_fds[0];
-    child_pipe_[channel] = pipe_fds[1];
+    parent_fds_[channel] = fds[0];
+    child_fds_[channel] = fds[1];
+    FUZZTEST_INTERNAL_CHECK(shutdown(fds[0], SHUT_WR) == 0,
+                            "Cannot shutdown fds: ", strerror(errno));
+    FUZZTEST_INTERNAL_CHECK(shutdown(fds[1], SHUT_RD) == 0,
+                            "Cannot shutdown fds: ", strerror(errno));
 
     FUZZTEST_INTERNAL_CHECK(
-        fcntl(parent_pipe_[channel], F_SETFL, O_NONBLOCK) != -1,
-        "Cannot make pipe non-blocking: ", strerror(errno));
+        fcntl(parent_fds_[channel], F_SETFL, O_NONBLOCK) != -1,
+        "Cannot make communication fds non-blocking: ", strerror(errno));
   }
 }
 
-void SubProcess::CloseChildPipes() {
+void SubProcess::CloseChildFds() {
   for (int channel : {kStdOutIdx, kStdErrIdx}) {
-    FUZZTEST_INTERNAL_CHECK(close(child_pipe_[channel]) != -1,
+    FUZZTEST_INTERNAL_CHECK(close(child_fds_[channel]) != -1,
                             "Cannot close pipe: ", strerror(errno));
   }
 }
 
-void SubProcess::CloseParentPipes() {
+void SubProcess::CloseParentFds() {
   for (int channel : {kStdOutIdx, kStdErrIdx}) {
-    FUZZTEST_INTERNAL_CHECK(close(parent_pipe_[channel]) != -1,
+    FUZZTEST_INTERNAL_CHECK(close(parent_fds_[channel]) != -1,
                             "Cannot close pipe: ", strerror(errno));
   }
 }
@@ -128,16 +135,16 @@
 
   for (int channel : {kStdOutIdx, kStdErrIdx}) {
     // Close parent-side pipes.
-    err = posix_spawn_file_actions_addclose(&actions, parent_pipe_[channel]);
+    err = posix_spawn_file_actions_addclose(&actions, parent_fds_[channel]);
     FUZZTEST_INTERNAL_CHECK(err == 0,
                             "Cannot add close() action: ", strerror(err));
 
     // Replace stdout/stderr file descriptors with the pipes.
     int fd = channel == kStdOutIdx ? STDOUT_FILENO : STDERR_FILENO;
-    err = posix_spawn_file_actions_adddup2(&actions, child_pipe_[channel], fd);
+    err = posix_spawn_file_actions_adddup2(&actions, child_fds_[channel], fd);
     FUZZTEST_INTERNAL_CHECK(err == 0,
                             "Cannot add dup2() action: ", strerror(err));
-    err = posix_spawn_file_actions_addclose(&actions, child_pipe_[channel]);
+    err = posix_spawn_file_actions_addclose(&actions, child_fds_[channel]);
     FUZZTEST_INTERNAL_CHECK(err == 0,
                             "Cannot add close() action: ", strerror(err));
   }
@@ -193,10 +200,10 @@
                                  std::string* stderr_output) {
   // Set up poll()-ing the pipes.
   constexpr int fd_count = 2;
-  struct pollfd pfd[fd_count];
+  struct pollfd pfd[fd_count + 1];
   std::string* out_str[fd_count];
   for (int channel : {kStdOutIdx, kStdErrIdx}) {
-    pfd[channel].fd = parent_pipe_[channel];
+    pfd[channel].fd = parent_fds_[channel];
     pfd[channel].events = POLLIN;
     pfd[channel].revents = 0;
     out_str[channel] = channel == kStdOutIdx ? stdout_output : stderr_output;
@@ -247,7 +254,7 @@
 // TODO(lszekeres): Consider optimizing this further by eliminating polling.
 // Could potentially be done using pselect() to wait for SIGCHLD with a timeout.
 // I.e., by setting all args to null, except timeout with a sigmask for SIGCHLD.
-int WaitWithTimeout(pid_t pid, absl::Duration timeout) {
+int WaitWithTimeout(pid_t pid, int fds_to_shutdown[2], absl::Duration timeout) {
   int status;
   const absl::Time wait_until = absl::Now() + timeout;
   constexpr absl::Duration sleep_duration = absl::Milliseconds(100);
@@ -259,17 +266,25 @@
       if (absl::Now() > wait_until) {
         FUZZTEST_INTERNAL_CHECK(kill(pid, SIGTERM) == 0,
                                 "Cannot kill(): ", strerror(errno));
-        return Wait(pid);
+        status = Wait(pid);
+        break;
       } else {
         absl::SleepFor(sleep_duration);
         continue;
       }
     } else if (ret == pid && (WIFEXITED(status) || WIFSIGNALED(status))) {
-      return status;
+      break;
     } else {
       FUZZTEST_INTERNAL_CHECK(false, "wait() error: ", strerror(errno));
     }
   }
+  FUZZTEST_INTERNAL_CHECK(
+      shutdown(fds_to_shutdown[0], SHUT_RD) == 0,
+      "Cannot shutdown fds_to_shutdown: ", fds_to_shutdown[0], strerror(errno));
+  FUZZTEST_INTERNAL_CHECK(
+      shutdown(fds_to_shutdown[1], SHUT_RD) == 0,
+      "Cannot shutdown fds_to_shutdown: ", fds_to_shutdown[1], strerror(errno));
+  return status;
 }
 
 }  // anonymous namespace
@@ -278,15 +293,17 @@
     const std::vector<std::string>& command_line,
     const absl::flat_hash_map<std::string, std::string>& environment,
     absl::Duration timeout) {
-  CreatePipes();
+  CreateCommunicationFds();
   pid_t child_pid = StartChild(command_line, environment);
-  CloseChildPipes();
-  std::future<int> status =
-      std::async(std::launch::async, &WaitWithTimeout, child_pid, timeout);
+  CloseChildFds();
+  std::future<int> status = std::async(std::launch::async, &WaitWithTimeout,
+                                       child_pid, parent_fds_, timeout);
   std::string stdout_output, stderr_output;
   ReadChildOutput(&stdout_output, &stderr_output);
-  CloseParentPipes();
-  return {TerminationStatus(status.get()), stdout_output, stderr_output};
+  RunResults result = {TerminationStatus(status.get()), stdout_output,
+                       stderr_output};
+  CloseParentFds();
+  return result;
 }
 
 #endif  // !defined(_MSC_VER) && !(defined(__ANDROID_MIN_SDK_VERSION__) &&
diff --git a/fuzztest/internal/subprocess_test.cc b/fuzztest/internal/subprocess_test.cc
index bfb3c3a..664c9ef 100644
--- a/fuzztest/internal/subprocess_test.cc
+++ b/fuzztest/internal/subprocess_test.cc
@@ -21,6 +21,7 @@
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "absl/strings/str_cat.h"
+#include "absl/time/clock.h"
 #include "absl/time/time.h"
 
 namespace fuzztest::internal {
@@ -79,5 +80,14 @@
   EXPECT_EQ(std_err.size(), 0);
 }
 
+TEST(SubProcessTest, ReturnsAfterChildProcessEnds) {
+  const auto before_time = absl::Now();
+  auto [status, std_out, std_err] = RunCommand({"bash", "-c", "sleep 4&"});
+  const auto after_time = absl::Now();
+  EXPECT_TRUE(status.Exited());
+  EXPECT_EQ(status, ExitCode(0));
+  EXPECT_LT(after_time - before_time, absl::Seconds(2));
+}
+
 }  // namespace
 }  // namespace fuzztest::internal