pw_rpc: Expand callback tests

- Rework existing test to extract reusable components.
- Add tests for moving calls in the callback and from another thread.
- Disable failing tests. These will be enabled in a subsequent CL.

Change-Id: I164f29f0d813ff2e59c95071e1f47b08270cc5c6
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126770
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index 12a4998..f669214 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -270,6 +270,7 @@
         "//pw_sync:binary_semaphore",
         "//pw_thread:sleep",
         "//pw_thread:test_threads_header",
+        "//pw_thread:yield",
         "//pw_thread_stl:test_threads",
     ],
 )
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 7324a93..c0af29f 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -472,6 +472,7 @@
     "$dir_pw_sync:binary_semaphore",
     "$dir_pw_thread:sleep",
     "$dir_pw_thread:test_threads",
+    "$dir_pw_thread:yield",
     "$dir_pw_thread_stl:test_threads",
     "raw:client_testing",
   ]
diff --git a/pw_rpc/callback_test.cc b/pw_rpc/callback_test.cc
index 4c8be76..0ff915e 100644
--- a/pw_rpc/callback_test.cc
+++ b/pw_rpc/callback_test.cc
@@ -12,8 +12,6 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
-#include <atomic>
-
 #include "gtest/gtest.h"
 #include "pw_rpc/raw/client_testing.h"
 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
@@ -21,6 +19,7 @@
 #include "pw_thread/sleep.h"
 #include "pw_thread/test_threads.h"
 #include "pw_thread/thread.h"
+#include "pw_thread/yield.h"
 
 namespace pw::rpc {
 namespace {
@@ -29,62 +28,177 @@
 
 using test::pw_rpc::raw::TestService;
 
-TEST(Callbacks, DISABLED_DanglingReference) {
-  struct Context {
-    RawClientTestContext<> test;
-    sync::BinarySemaphore callback_thread;
-    sync::BinarySemaphore main_thread;
-    std::atomic<bool> call_is_in_scope = true;
-    std::atomic<bool> callback_executed = false;
-  } ctx;
+void YieldToOtherThread() {
+  // Sleep for a while and then yield just to be sure the other thread runs.
+  this_thread::sleep_for(100ms);
+  this_thread::yield();
+}
 
-  thread::Thread callback_thread(
-      thread::test::TestOptionsThread0(),
-      [](void* arg) {
-        Context& context = *static_cast<Context*>(arg);
-        // Once the main thread tells this thread to go, invoke the callback.
-        context.callback_thread.acquire();
-        context.test.server().SendResponse<TestService::TestUnaryRpc>(
-            {}, Status::InvalidArgument());
-      },
-      &ctx);
+class CallbacksTest : public ::testing::Test {
+ protected:
+  CallbacksTest()
+      : callback_thread_(
+            thread::test::TestOptionsThread0(),
+            [](void* arg) {
+              static_cast<CallbacksTest*>(arg)->SendResponseAfterSemaphore();
+            },
+            this) {}
 
+  ~CallbacksTest() override {
+    EXPECT_FALSE(callback_thread_.joinable());  // Tests must join the thread!
+    EXPECT_TRUE(callback_executed_);
+  }
+
+  void RespondToCall(const RawClientReaderWriter& call) {
+    respond_to_call_ = &call;
+  }
+
+  RawClientTestContext<> context_;
+  sync::BinarySemaphore callback_thread_sem_;
+  sync::BinarySemaphore main_thread_sem_;
+
+  thread::Thread callback_thread_;
+
+  // Must be set to true by the RPC callback in each test.
+  volatile bool callback_executed_ = false;
+
+  // Variables optionally used by tests. These are in this object so lambads
+  // only need to capture [this] to access them.
+  volatile bool call_is_in_scope_ = false;
+
+  RawClientReaderWriter call_1_;
+  RawClientReaderWriter call_2_;
+
+ private:
+  void SendResponseAfterSemaphore() {
+    // Wait until the main thread says to send the response.
+    callback_thread_sem_.acquire();
+
+    context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
+        {}, respond_to_call_->id());
+  }
+
+  const RawClientReaderWriter* respond_to_call_ = &call_1_;
+};
+
+TEST_F(CallbacksTest, DISABLED_DestructorWaitsUntilCallbacksComplete) {
   {
-    RawUnaryReceiver call = TestService::TestUnaryRpc(
-        ctx.test.client(), ctx.test.channel().id(), {});
+    RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
+        context_.client(), context_.channel().id());
+    RespondToCall(local_call);
 
-    ctx.call_is_in_scope = true;
+    call_is_in_scope_ = true;
 
-    call.set_on_completed([&ctx](ConstByteSpan, Status) {
-      ctx.main_thread.release();
+    local_call.set_on_next([this](ConstByteSpan) {
+      main_thread_sem_.release();
 
       // Wait for a while so the main thread tries to destroy the call.
-      // Yes, this is a race, but this code is testing a race condition. This
-      // sleep should be plenty long enough for the main thread to run.
-      this_thread::sleep_for(200ms);
+      YieldToOtherThread();
 
       // Now, make sure the call is still in scope. The main thread should
       // block in the call's destructor until this callback completes.
-      EXPECT_TRUE(ctx.call_is_in_scope);
+      EXPECT_TRUE(call_is_in_scope_);
 
-      ctx.callback_executed = true;
+      callback_executed_ = true;
     });
 
     // Start the callback thread so it can invoke the callback.
-    ctx.callback_thread.release();
+    callback_thread_sem_.release();
 
     // Wait until the callback thread starts.
-    ctx.main_thread.acquire();
+    main_thread_sem_.acquire();
   }
 
-  // The callback thread will sleep for a bit. Meanwhile, mark the call as out
-  // of scope, then let it go out of scope.
-  ctx.call_is_in_scope = false;
+  // The callback thread will sleep for a bit. Meanwhile, let the call go out
+  // of scope, and mark it as such.
+  call_is_in_scope_ = false;
 
   // Wait for the callback thread to finish.
-  callback_thread.join();
+  callback_thread_.join();
 
-  EXPECT_TRUE(ctx.callback_executed.load());
+  EXPECT_TRUE(callback_executed_);
+}
+
+TEST_F(CallbacksTest, DISABLED_MoveActiveCall_WaitsForCallbackToComplete) {
+  call_1_ = TestService::TestBidirectionalStreamRpc(
+      context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+        main_thread_sem_.release();  // Confirm that this thread started
+
+        YieldToOtherThread();
+
+        callback_executed_ = true;
+      });
+
+  // Start the callback thread so it can invoke the callback.
+  callback_thread_sem_.release();
+
+  // Confirm that the callback thread started.
+  main_thread_sem_.acquire();
+
+  // Move the call object. This thread should wait until the on_completed
+  // callback is done.
+  EXPECT_TRUE(call_1_.active());
+  call_2_ = std::move(call_1_);
+
+  // The callback should already have finished. This thread should have waited
+  // for it to finish during the move.
+  EXPECT_TRUE(callback_executed_);
+  EXPECT_FALSE(call_1_.active());
+  EXPECT_TRUE(call_2_.active());
+
+  callback_thread_.join();
+}
+
+TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
+  call_1_ = TestService::TestBidirectionalStreamRpc(
+      context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+        main_thread_sem_.release();  // Confirm that this thread started
+
+        call_1_ = std::move(call_2_);
+
+        callback_executed_ = true;
+      });
+
+  call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
+                                                    context_.channel().id());
+
+  EXPECT_TRUE(call_1_.active());
+  EXPECT_TRUE(call_2_.active());
+
+  // Start the callback thread and wait for it to finish.
+  callback_thread_sem_.release();
+  callback_thread_.join();
+
+  EXPECT_TRUE(call_1_.active());
+  EXPECT_FALSE(call_2_.active());
+}
+
+TEST_F(CallbacksTest, MoveOwnCallInCallback) {
+  call_1_ = TestService::TestBidirectionalStreamRpc(
+      context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+        main_thread_sem_.release();  // Confirm that this thread started
+
+        // Cancel this call first, or the move will deadlock, since the moving
+        // thread will wait for the callback thread (both this thread) to
+        // terminate if the call is active.
+        EXPECT_EQ(OkStatus(), call_1_.Cancel());
+        call_2_ = std::move(call_1_);
+
+        callback_executed_ = true;
+      });
+
+  call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
+                                                    context_.channel().id());
+
+  EXPECT_TRUE(call_1_.active());
+  EXPECT_TRUE(call_2_.active());
+
+  // Start the callback thread and wait for it to finish.
+  callback_thread_sem_.release();
+  callback_thread_.join();
+
+  EXPECT_FALSE(call_1_.active());
+  EXPECT_FALSE(call_2_.active());
 }
 
 }  // namespace