pw_rpc: Handle replacing a call with an acquired ChannelOutput buffer

- When a pending RPC is called again, move its ChannelOutput buffer to
  the new call object. Previously, the ChannelOutput buffer was left
  active in the original call, which caused crashes.

  Moving the ChannelOutput buffer rather than closing it prevents code
  working with the original call object in another thread from sending a
  stale buffer if the call object is replaced. This is an incomplete
  solution, though, and more thought is needed. If the RPC body uses the
  OutputBuffer before passing it off to the other thread, that thread
  will use a stale buffer reference.
- Rearrange the code that replaces the old call to avoid unlocking and
  relocking, which could cause problems if another call arrived while
  the lock wasn't held.
- Expand tests to cover replacing a call with a ChannelOutput buffer
  acquired.

Bug: 591
Change-Id: Iecb6be66d76a248683319c73fd806896d0a93be1
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/76920
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
index 7ea48e0..0ffee0f 100644
--- a/pw_rpc/endpoint.cc
+++ b/pw_rpc/endpoint.cc
@@ -66,12 +66,13 @@
   Call* const existing_call =
       FindCallById(call.channel_id(), call.service_id(), call.method_id());
 
-  if (existing_call != nullptr) {
-    existing_call->HandleError(Status::Cancelled());
-    rpc_lock().lock();  // Reacquire after releasing to call the user callback
-  }
-
   RegisterUniqueCall(call);
+
+  if (existing_call != nullptr) {
+    existing_call->ReplaceWithNewInstance(call);
+  } else {
+    rpc_lock().unlock();
+  }
 }
 
 Channel* Endpoint::GetInternalChannel(uint32_t id) const {
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 2589d23..eec5870 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -113,6 +113,23 @@
     on_error(status);
   }
 
+  // Replaces this Call with a new Call object for the same RPC.
+  void ReplaceWithNewInstance(Call& call) PW_UNLOCK_FUNCTION(rpc_lock()) {
+    // If the original call had acquired a buffer from a ChannelOutput, move it
+    // into the new call instance. Moving the ChannelOutput buffer rather than
+    // closing it prevents code working with the original call object in another
+    // thread from sending a stale buffer if the call object is replaced.
+    //
+    // However, this does NOT fix the stale buffer issue if the RPC body uses
+    // the OutputBuffer before passing it off to the other thread.
+    //
+    // TODO(pwbug/591): Resolve how to handle replacing a call that is holding a
+    //     buffer reference. Easiest solution: ban replying to RPCs on multiple
+    //     threads.
+    call.response_ = std::move(response_);
+    HandleError(Status::Cancelled());
+  }
+
   bool has_client_stream() const { return HasClientStream(type_); }
   bool has_server_stream() const { return HasServerStream(type_); }
 
diff --git a/pw_rpc/raw/server_reader_writer_test.cc b/pw_rpc/raw/server_reader_writer_test.cc
index 4e699d5..e2e1412 100644
--- a/pw_rpc/raw/server_reader_writer_test.cc
+++ b/pw_rpc/raw/server_reader_writer_test.cc
@@ -146,6 +146,37 @@
   EXPECT_EQ(completions.back(), OkStatus());
 }
 
+TEST(RawUnaryResponder, ReplaceActiveCall_DoesNotFinishCall) {
+  ReaderWriterTestContext ctx;
+  RawUnaryResponder active_call =
+      RawUnaryResponder::Open<TestService::TestUnaryRpc>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  std::span buffer = active_call.PayloadBuffer();
+  constexpr const char kData[] = "Some data!";
+  ASSERT_GE(buffer.size(), sizeof(kData));
+  std::memcpy(buffer.data(), kData, sizeof(kData));
+
+  RawUnaryResponder new_active_call =
+      RawUnaryResponder::Open<TestService::TestUnaryRpc>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  active_call = std::move(new_active_call);
+
+  ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
+
+  EXPECT_EQ(OkStatus(), active_call.Finish(buffer, Status::InvalidArgument()));
+
+  EXPECT_STREQ(
+      reinterpret_cast<const char*>(
+          ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
+      kData);
+
+  const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
+  ASSERT_EQ(completions.size(), 1u);
+  EXPECT_EQ(completions.back(), Status::InvalidArgument());
+}
+
 TEST(RawUnaryResponder, OutOfScope_FinishesActiveCall) {
   ReaderWriterTestContext ctx;
 
@@ -160,6 +191,53 @@
   EXPECT_EQ(completions.back(), OkStatus());
 }
 
+TEST(RawServerWriter, Move_InactiveToActive_FinishesActiveCall) {
+  ReaderWriterTestContext ctx;
+  RawServerWriter active_call =
+      RawServerWriter::Open<TestService::TestServerStreamRpc>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  EXPECT_GT(active_call.PayloadBuffer().size(), 0u);
+
+  RawServerWriter inactive_call;
+
+  active_call = std::move(inactive_call);
+
+  const auto completions =
+      ctx.output.completions<TestService::TestServerStreamRpc>();
+  ASSERT_EQ(completions.size(), 1u);
+  EXPECT_EQ(completions.back(), OkStatus());
+}
+
+TEST(RawServerWriter, ReplaceActiveCall_DoesNotFinishCall) {
+  ReaderWriterTestContext ctx;
+  RawServerWriter active_call =
+      RawServerWriter::Open<TestService::TestServerStreamRpc>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  std::span buffer = active_call.PayloadBuffer();
+  constexpr const char kData[] = "Some data!";
+  ASSERT_GE(buffer.size(), sizeof(kData));
+  std::memcpy(buffer.data(), kData, sizeof(kData));
+
+  RawServerWriter new_active_call =
+      RawServerWriter::Open<TestService::TestServerStreamRpc>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  active_call = std::move(new_active_call);
+
+  ASSERT_TRUE(
+      ctx.output.completions<TestService::TestServerStreamRpc>().empty());
+
+  EXPECT_EQ(OkStatus(), active_call.Write(buffer));
+
+  EXPECT_STREQ(reinterpret_cast<const char*>(
+                   ctx.output.payloads<TestService::TestServerStreamRpc>()
+                       .back()
+                       .data()),
+               kData);
+}
+
 constexpr const char kWriterData[] = "20X6";
 
 void WriteAsWriter(Writer& writer) {