pw_rpc: Expand callback tests
- Rework existing test to extract reusable components.
- Add tests for moving calls in the callback and from another thread.
- Disable failing tests. These will be enabled in a subsequent CL.
Change-Id: I164f29f0d813ff2e59c95071e1f47b08270cc5c6
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126770
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index 12a4998..f669214 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -270,6 +270,7 @@
"//pw_sync:binary_semaphore",
"//pw_thread:sleep",
"//pw_thread:test_threads_header",
+ "//pw_thread:yield",
"//pw_thread_stl:test_threads",
],
)
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 7324a93..c0af29f 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -472,6 +472,7 @@
"$dir_pw_sync:binary_semaphore",
"$dir_pw_thread:sleep",
"$dir_pw_thread:test_threads",
+ "$dir_pw_thread:yield",
"$dir_pw_thread_stl:test_threads",
"raw:client_testing",
]
diff --git a/pw_rpc/callback_test.cc b/pw_rpc/callback_test.cc
index 4c8be76..0ff915e 100644
--- a/pw_rpc/callback_test.cc
+++ b/pw_rpc/callback_test.cc
@@ -12,8 +12,6 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include <atomic>
-
#include "gtest/gtest.h"
#include "pw_rpc/raw/client_testing.h"
#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
@@ -21,6 +19,7 @@
#include "pw_thread/sleep.h"
#include "pw_thread/test_threads.h"
#include "pw_thread/thread.h"
+#include "pw_thread/yield.h"
namespace pw::rpc {
namespace {
@@ -29,62 +28,177 @@
using test::pw_rpc::raw::TestService;
-TEST(Callbacks, DISABLED_DanglingReference) {
- struct Context {
- RawClientTestContext<> test;
- sync::BinarySemaphore callback_thread;
- sync::BinarySemaphore main_thread;
- std::atomic<bool> call_is_in_scope = true;
- std::atomic<bool> callback_executed = false;
- } ctx;
+void YieldToOtherThread() {
+ // Sleep for a while and then yield just to be sure the other thread runs.
+ this_thread::sleep_for(100ms);
+ this_thread::yield();
+}
- thread::Thread callback_thread(
- thread::test::TestOptionsThread0(),
- [](void* arg) {
- Context& context = *static_cast<Context*>(arg);
- // Once the main thread tells this thread to go, invoke the callback.
- context.callback_thread.acquire();
- context.test.server().SendResponse<TestService::TestUnaryRpc>(
- {}, Status::InvalidArgument());
- },
- &ctx);
+class CallbacksTest : public ::testing::Test {
+ protected:
+ CallbacksTest()
+ : callback_thread_(
+ thread::test::TestOptionsThread0(),
+ [](void* arg) {
+ static_cast<CallbacksTest*>(arg)->SendResponseAfterSemaphore();
+ },
+ this) {}
+ ~CallbacksTest() override {
+ EXPECT_FALSE(callback_thread_.joinable()); // Tests must join the thread!
+ EXPECT_TRUE(callback_executed_);
+ }
+
+ void RespondToCall(const RawClientReaderWriter& call) {
+ respond_to_call_ = &call;
+ }
+
+ RawClientTestContext<> context_;
+ sync::BinarySemaphore callback_thread_sem_;
+ sync::BinarySemaphore main_thread_sem_;
+
+ thread::Thread callback_thread_;
+
+ // Must be set to true by the RPC callback in each test.
+ volatile bool callback_executed_ = false;
+
+ // Variables optionally used by tests. These are in this object so lambads
+ // only need to capture [this] to access them.
+ volatile bool call_is_in_scope_ = false;
+
+ RawClientReaderWriter call_1_;
+ RawClientReaderWriter call_2_;
+
+ private:
+ void SendResponseAfterSemaphore() {
+ // Wait until the main thread says to send the response.
+ callback_thread_sem_.acquire();
+
+ context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
+ {}, respond_to_call_->id());
+ }
+
+ const RawClientReaderWriter* respond_to_call_ = &call_1_;
+};
+
+TEST_F(CallbacksTest, DISABLED_DestructorWaitsUntilCallbacksComplete) {
{
- RawUnaryReceiver call = TestService::TestUnaryRpc(
- ctx.test.client(), ctx.test.channel().id(), {});
+ RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
+ context_.client(), context_.channel().id());
+ RespondToCall(local_call);
- ctx.call_is_in_scope = true;
+ call_is_in_scope_ = true;
- call.set_on_completed([&ctx](ConstByteSpan, Status) {
- ctx.main_thread.release();
+ local_call.set_on_next([this](ConstByteSpan) {
+ main_thread_sem_.release();
// Wait for a while so the main thread tries to destroy the call.
- // Yes, this is a race, but this code is testing a race condition. This
- // sleep should be plenty long enough for the main thread to run.
- this_thread::sleep_for(200ms);
+ YieldToOtherThread();
// Now, make sure the call is still in scope. The main thread should
// block in the call's destructor until this callback completes.
- EXPECT_TRUE(ctx.call_is_in_scope);
+ EXPECT_TRUE(call_is_in_scope_);
- ctx.callback_executed = true;
+ callback_executed_ = true;
});
// Start the callback thread so it can invoke the callback.
- ctx.callback_thread.release();
+ callback_thread_sem_.release();
// Wait until the callback thread starts.
- ctx.main_thread.acquire();
+ main_thread_sem_.acquire();
}
- // The callback thread will sleep for a bit. Meanwhile, mark the call as out
- // of scope, then let it go out of scope.
- ctx.call_is_in_scope = false;
+ // The callback thread will sleep for a bit. Meanwhile, let the call go out
+ // of scope, and mark it as such.
+ call_is_in_scope_ = false;
// Wait for the callback thread to finish.
- callback_thread.join();
+ callback_thread_.join();
- EXPECT_TRUE(ctx.callback_executed.load());
+ EXPECT_TRUE(callback_executed_);
+}
+
+TEST_F(CallbacksTest, DISABLED_MoveActiveCall_WaitsForCallbackToComplete) {
+ call_1_ = TestService::TestBidirectionalStreamRpc(
+ context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+ main_thread_sem_.release(); // Confirm that this thread started
+
+ YieldToOtherThread();
+
+ callback_executed_ = true;
+ });
+
+ // Start the callback thread so it can invoke the callback.
+ callback_thread_sem_.release();
+
+ // Confirm that the callback thread started.
+ main_thread_sem_.acquire();
+
+ // Move the call object. This thread should wait until the on_completed
+ // callback is done.
+ EXPECT_TRUE(call_1_.active());
+ call_2_ = std::move(call_1_);
+
+ // The callback should already have finished. This thread should have waited
+ // for it to finish during the move.
+ EXPECT_TRUE(callback_executed_);
+ EXPECT_FALSE(call_1_.active());
+ EXPECT_TRUE(call_2_.active());
+
+ callback_thread_.join();
+}
+
+TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
+ call_1_ = TestService::TestBidirectionalStreamRpc(
+ context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+ main_thread_sem_.release(); // Confirm that this thread started
+
+ call_1_ = std::move(call_2_);
+
+ callback_executed_ = true;
+ });
+
+ call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
+ context_.channel().id());
+
+ EXPECT_TRUE(call_1_.active());
+ EXPECT_TRUE(call_2_.active());
+
+ // Start the callback thread and wait for it to finish.
+ callback_thread_sem_.release();
+ callback_thread_.join();
+
+ EXPECT_TRUE(call_1_.active());
+ EXPECT_FALSE(call_2_.active());
+}
+
+TEST_F(CallbacksTest, MoveOwnCallInCallback) {
+ call_1_ = TestService::TestBidirectionalStreamRpc(
+ context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+ main_thread_sem_.release(); // Confirm that this thread started
+
+ // Cancel this call first, or the move will deadlock, since the moving
+ // thread will wait for the callback thread (both this thread) to
+ // terminate if the call is active.
+ EXPECT_EQ(OkStatus(), call_1_.Cancel());
+ call_2_ = std::move(call_1_);
+
+ callback_executed_ = true;
+ });
+
+ call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
+ context_.channel().id());
+
+ EXPECT_TRUE(call_1_.active());
+ EXPECT_TRUE(call_2_.active());
+
+ // Start the callback thread and wait for it to finish.
+ callback_thread_sem_.release();
+ callback_thread_.join();
+
+ EXPECT_FALSE(call_1_.active());
+ EXPECT_FALSE(call_2_.active());
}
} // namespace