pw_rpc: Terminate-able socket client

Extends the integration test socket client so the RPC dispatch thread
can be cleanly shut down.

Change-Id: I0b02c12828d31a70d1bbcef696b5a0de39b8a5b8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/92600
Reviewed-by: Ted Pudlik <tpudlik@google.com>
Commit-Queue: Armando Montanez <amontanez@google.com>
diff --git a/pw_rpc/client_integration_test.cc b/pw_rpc/client_integration_test.cc
index 94ce807..b50395e 100644
--- a/pw_rpc/client_integration_test.cc
+++ b/pw_rpc/client_integration_test.cc
@@ -12,6 +12,8 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
+#include <sys/socket.h>
+
 #include <cstring>
 
 #include "gtest/gtest.h"
@@ -26,6 +28,10 @@
 
 constexpr int kIterations = 3;
 
+// This client configures a socket read timeout to allow the RPC dispatch thread
+// to exit gracefully.
+constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
+
 using namespace std::chrono_literals;
 using pw::ByteSpan;
 using pw::ConstByteSpan;
@@ -97,5 +103,22 @@
   if (!pw::rpc::integration_test::InitializeClient(argc, argv).ok()) {
     return 1;
   }
-  return RUN_ALL_TESTS();
+
+  // Set read timout on socket to allow
+  // pw::rpc::integration_test::TerminateClient() to complete.
+  int retval = setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
+                          SOL_SOCKET,
+                          SO_RCVTIMEO,
+                          &rpc_test::kSocketReadTimeout,
+                          sizeof(rpc_test::kSocketReadTimeout));
+  PW_CHECK_INT_EQ(retval,
+                  0,
+                  "Failed to configure socket receive timeout with errno=%d",
+                  errno);
+
+  int test_retval = RUN_ALL_TESTS();
+
+  pw::rpc::integration_test::TerminateClient();
+
+  return test_retval;
 }
diff --git a/pw_rpc/integration_testing.cc b/pw_rpc/integration_testing.cc
index 2d715ea..576e445 100644
--- a/pw_rpc/integration_testing.cc
+++ b/pw_rpc/integration_testing.cc
@@ -53,6 +53,8 @@
   return context.Start(port);
 }
 
+void TerminateClient() { context.Terminate(); }
+
 Status InitializeClient(int argc, char* argv[], const char* usage_args) {
   if (argc < 2) {
     PW_LOG_INFO("Usage: %s %s", argv[0], usage_args);
diff --git a/pw_rpc/public/pw_rpc/integration_test_socket_client.h b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
index 4d9e916..0583253 100644
--- a/pw_rpc/public/pw_rpc/integration_test_socket_client.h
+++ b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
@@ -13,7 +13,9 @@
 // the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
+#include <optional>
 #include <span>
 #include <thread>
 
@@ -31,7 +33,8 @@
 class SocketClientContext {
  public:
   constexpr SocketClientContext()
-      : channel_output_(stream_, hdlc::kDefaultRpcAddress, "socket"),
+      : rpc_dispatch_thread_handle_(std::nullopt),
+        channel_output_(stream_, hdlc::kDefaultRpcAddress, "socket"),
         channel_output_with_manipulator_(channel_output_),
         channel_(
             Channel::Create<kChannelId>(&channel_output_with_manipulator_)),
@@ -43,10 +46,22 @@
   // packets from the socket.
   Status Start(const char* host, uint16_t port) {
     PW_TRY(stream_.Connect(host, port));
-    std::thread{&SocketClientContext::ProcessPackets, this}.detach();
+    rpc_dispatch_thread_handle_.emplace(&SocketClientContext::ProcessPackets,
+                                        this);
     return OkStatus();
   }
 
+  // Terminates the client, joining the RPC dispatch thread.
+  //
+  // WARNING: This may block forever if the socket is configured to block
+  // indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
+  // nonzero timeout will allow the dispatch thread to always return.
+  void Terminate() {
+    PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
+    should_terminate_.test_and_set();
+    rpc_dispatch_thread_handle_->join();
+  }
+
   int GetSocketFd() { return stream_.connection_fd(); }
 
   void SetEgressChannelManipulator(
@@ -107,6 +122,8 @@
     ChannelManipulator* channel_manipulator_;
   };
 
+  std::atomic_flag should_terminate_ = ATOMIC_FLAG_INIT;
+  std::optional<std::thread> rpc_dispatch_thread_handle_;
   stream::SocketStream stream_;
   hdlc::RpcChannelOutput channel_output_;
   ChannelOutputWithManipulator channel_output_with_manipulator_;
@@ -124,6 +141,10 @@
     std::byte byte[1];
     Result<ByteSpan> read = stream_.Read(byte);
 
+    if (should_terminate_.test()) {
+      return;
+    }
+
     if (!read.ok() || read->size() == 0u) {
       continue;
     }
diff --git a/pw_rpc/public/pw_rpc/integration_testing.h b/pw_rpc/public/pw_rpc/integration_testing.h
index 52c8a6a..e26ec9e 100644
--- a/pw_rpc/public/pw_rpc/integration_testing.h
+++ b/pw_rpc/public/pw_rpc/integration_testing.h
@@ -98,4 +98,11 @@
 
 Status InitializeClient(int port);
 
+// Terminates the client, joining the RPC dispatch thread.
+//
+// WARNING: This may block forever if the socket is configured to block
+// indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
+// nonzero timeout will allow the dispatch thread to always return.
+void TerminateClient();
+
 }  // namespace pw::rpc::integration_test
diff --git a/pw_transfer/integration_test/client.cc b/pw_transfer/integration_test/client.cc
index fcca328..eeb82d6 100644
--- a/pw_transfer/integration_test/client.cc
+++ b/pw_transfer/integration_test/client.cc
@@ -56,6 +56,10 @@
 // smaller receive buffer size.
 constexpr int kMaxSocketSendBufferSize = 1;
 
+// This client configures a socket read timeout to allow the RPC dispatch thread
+// to exit gracefully.
+constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
+
 thread::Options& TransferThreadOptions() {
   static thread::stl::Options options;
   return options;
@@ -98,8 +102,12 @@
   result.completed.acquire();
 
   transfer_thread.Terminate();
+
   system_thread.join();
 
+  // The RPC thread must join before destroying transfer objects as the transfer
+  // service may still reference the transfer thread or transfer client objects.
+  pw::rpc::integration_test::TerminateClient();
   return result.status;
 }
 
@@ -146,6 +154,17 @@
                   "Failed to configure socket send buffer size with errno=%d",
                   errno);
 
+  retval =
+      setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
+                 SOL_SOCKET,
+                 SO_RCVTIMEO,
+                 &pw::transfer::integration_test::kSocketReadTimeout,
+                 sizeof(pw::transfer::integration_test::kSocketReadTimeout));
+  PW_CHECK_INT_EQ(retval,
+                  0,
+                  "Failed to configure socket receive timeout with errno=%d",
+                  errno);
+
   if (!pw::transfer::integration_test::SendData(config).ok()) {
     PW_LOG_INFO("Failed to transfer!");
     return 1;