pw_rpc: Terminate-able socket client
Extends the integration test socket client so the RPC dispatch thread
can be cleanly shut down.
Change-Id: I0b02c12828d31a70d1bbcef696b5a0de39b8a5b8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/92600
Reviewed-by: Ted Pudlik <tpudlik@google.com>
Commit-Queue: Armando Montanez <amontanez@google.com>
diff --git a/pw_rpc/client_integration_test.cc b/pw_rpc/client_integration_test.cc
index 94ce807..b50395e 100644
--- a/pw_rpc/client_integration_test.cc
+++ b/pw_rpc/client_integration_test.cc
@@ -12,6 +12,8 @@
// License for the specific language governing permissions and limitations under
// the License.
+#include <sys/socket.h>
+
#include <cstring>
#include "gtest/gtest.h"
@@ -26,6 +28,10 @@
constexpr int kIterations = 3;
+// This client configures a socket read timeout to allow the RPC dispatch thread
+// to exit gracefully.
+constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
+
using namespace std::chrono_literals;
using pw::ByteSpan;
using pw::ConstByteSpan;
@@ -97,5 +103,22 @@
if (!pw::rpc::integration_test::InitializeClient(argc, argv).ok()) {
return 1;
}
- return RUN_ALL_TESTS();
+
+ // Set read timout on socket to allow
+ // pw::rpc::integration_test::TerminateClient() to complete.
+ int retval = setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
+ SOL_SOCKET,
+ SO_RCVTIMEO,
+ &rpc_test::kSocketReadTimeout,
+ sizeof(rpc_test::kSocketReadTimeout));
+ PW_CHECK_INT_EQ(retval,
+ 0,
+ "Failed to configure socket receive timeout with errno=%d",
+ errno);
+
+ int test_retval = RUN_ALL_TESTS();
+
+ pw::rpc::integration_test::TerminateClient();
+
+ return test_retval;
}
diff --git a/pw_rpc/integration_testing.cc b/pw_rpc/integration_testing.cc
index 2d715ea..576e445 100644
--- a/pw_rpc/integration_testing.cc
+++ b/pw_rpc/integration_testing.cc
@@ -53,6 +53,8 @@
return context.Start(port);
}
+void TerminateClient() { context.Terminate(); }
+
Status InitializeClient(int argc, char* argv[], const char* usage_args) {
if (argc < 2) {
PW_LOG_INFO("Usage: %s %s", argv[0], usage_args);
diff --git a/pw_rpc/public/pw_rpc/integration_test_socket_client.h b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
index 4d9e916..0583253 100644
--- a/pw_rpc/public/pw_rpc/integration_test_socket_client.h
+++ b/pw_rpc/public/pw_rpc/integration_test_socket_client.h
@@ -13,7 +13,9 @@
// the License.
#pragma once
+#include <atomic>
#include <cstdint>
+#include <optional>
#include <span>
#include <thread>
@@ -31,7 +33,8 @@
class SocketClientContext {
public:
constexpr SocketClientContext()
- : channel_output_(stream_, hdlc::kDefaultRpcAddress, "socket"),
+ : rpc_dispatch_thread_handle_(std::nullopt),
+ channel_output_(stream_, hdlc::kDefaultRpcAddress, "socket"),
channel_output_with_manipulator_(channel_output_),
channel_(
Channel::Create<kChannelId>(&channel_output_with_manipulator_)),
@@ -43,10 +46,22 @@
// packets from the socket.
Status Start(const char* host, uint16_t port) {
PW_TRY(stream_.Connect(host, port));
- std::thread{&SocketClientContext::ProcessPackets, this}.detach();
+ rpc_dispatch_thread_handle_.emplace(&SocketClientContext::ProcessPackets,
+ this);
return OkStatus();
}
+ // Terminates the client, joining the RPC dispatch thread.
+ //
+ // WARNING: This may block forever if the socket is configured to block
+ // indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
+ // nonzero timeout will allow the dispatch thread to always return.
+ void Terminate() {
+ PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
+ should_terminate_.test_and_set();
+ rpc_dispatch_thread_handle_->join();
+ }
+
int GetSocketFd() { return stream_.connection_fd(); }
void SetEgressChannelManipulator(
@@ -107,6 +122,8 @@
ChannelManipulator* channel_manipulator_;
};
+ std::atomic_flag should_terminate_ = ATOMIC_FLAG_INIT;
+ std::optional<std::thread> rpc_dispatch_thread_handle_;
stream::SocketStream stream_;
hdlc::RpcChannelOutput channel_output_;
ChannelOutputWithManipulator channel_output_with_manipulator_;
@@ -124,6 +141,10 @@
std::byte byte[1];
Result<ByteSpan> read = stream_.Read(byte);
+ if (should_terminate_.test()) {
+ return;
+ }
+
if (!read.ok() || read->size() == 0u) {
continue;
}
diff --git a/pw_rpc/public/pw_rpc/integration_testing.h b/pw_rpc/public/pw_rpc/integration_testing.h
index 52c8a6a..e26ec9e 100644
--- a/pw_rpc/public/pw_rpc/integration_testing.h
+++ b/pw_rpc/public/pw_rpc/integration_testing.h
@@ -98,4 +98,11 @@
Status InitializeClient(int port);
+// Terminates the client, joining the RPC dispatch thread.
+//
+// WARNING: This may block forever if the socket is configured to block
+// indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
+// nonzero timeout will allow the dispatch thread to always return.
+void TerminateClient();
+
} // namespace pw::rpc::integration_test
diff --git a/pw_transfer/integration_test/client.cc b/pw_transfer/integration_test/client.cc
index fcca328..eeb82d6 100644
--- a/pw_transfer/integration_test/client.cc
+++ b/pw_transfer/integration_test/client.cc
@@ -56,6 +56,10 @@
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;
+// This client configures a socket read timeout to allow the RPC dispatch thread
+// to exit gracefully.
+constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
+
thread::Options& TransferThreadOptions() {
static thread::stl::Options options;
return options;
@@ -98,8 +102,12 @@
result.completed.acquire();
transfer_thread.Terminate();
+
system_thread.join();
+ // The RPC thread must join before destroying transfer objects as the transfer
+ // service may still reference the transfer thread or transfer client objects.
+ pw::rpc::integration_test::TerminateClient();
return result.status;
}
@@ -146,6 +154,17 @@
"Failed to configure socket send buffer size with errno=%d",
errno);
+ retval =
+ setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
+ SOL_SOCKET,
+ SO_RCVTIMEO,
+ &pw::transfer::integration_test::kSocketReadTimeout,
+ sizeof(pw::transfer::integration_test::kSocketReadTimeout));
+ PW_CHECK_INT_EQ(retval,
+ 0,
+ "Failed to configure socket receive timeout with errno=%d",
+ errno);
+
if (!pw::transfer::integration_test::SendData(config).ok()) {
PW_LOG_INFO("Failed to transfer!");
return 1;