pw_rpc: ChannelOutput buffer handling fixes

- Ensure ChannelOutput buffers are always released when a call is
  closed.
  * Release any held buffer when aborting a server call due to an error.
  * Permit calling AcquirePayloadBuffer() on an inactive call, but
    return an empty buffer if it is called while inactive. Previously,
    this was disallowed, but it may not be possible to prevent an
    accidental AcquirePayloadBuffer() on a closed call if it is closed
    by a different thread.
  * Release any held buffer when a client call goes out of scope or is
    aborted.
- Extend locking annotations to cover setting callbacks.
- Expose the function to end client streams as CloseClientStream().

Change-Id: I892c66bbeb1701329783b8c360d02ffe25992d98
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/78622
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index aec7e81..044c6f5 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -95,26 +95,23 @@
   endpoint().RegisterUniqueCall(*this);
 }
 
-Status Call::EndClientStream() {
-  client_stream_state_ = kClientStreamInactive;
-  return SendPacket(PacketType::CLIENT_STREAM_END, {}, {});
+Status Call::CloseClientStream() {
+  rpc_lock().lock();
+  return CloseClientStreamLocked();
 }
 
-Status Call::CloseAndSendFinalPacket(PacketType type,
-                                     ConstByteSpan response,
-                                     Status status) {
-  rpc_lock().lock();
+Status Call::CloseAndSendFinalPacketLocked(PacketType type,
+                                           ConstByteSpan response,
+                                           Status status) {
   if (!active_locked()) {
     rpc_lock().unlock();
     return Status::FailedPrecondition();
   }
-  Close();
+  UnregisterAndMarkClosed();
   return SendPacket(type, response, status);
 }
 
-ByteSpan Call::PayloadBuffer() {
-  rpc_lock().lock();
-
+ByteSpan Call::PayloadBufferInternal() {
   // Only allow having one active buffer at a time.
   if (response_.empty()) {
     Channel& c = channel();
@@ -133,7 +130,6 @@
   //     rather than creating a packet.
   ByteSpan buffer =
       response_.payload(MakePacket(PacketType::CLIENT_STREAM, {}));
-  rpc_lock().unlock();
 
   return buffer;
 }
@@ -152,12 +148,10 @@
 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
   const Packet packet = MakePacket(type, payload, status);
 
-  if (!buffer().Contains(payload)) {
+  if (!response_.Contains(payload)) {
     // TODO(pwbug/597): Ensure the call object is locked before releasing the
     //     RPC lock.
-    rpc_lock().unlock();
-    ByteSpan buffer = PayloadBuffer();
-    rpc_lock().lock();
+    ByteSpan buffer = PayloadBufferInternal();
 
     if (payload.size() > buffer.size()) {
       ReleasePayloadBufferLocked();
@@ -171,17 +165,19 @@
 }
 
 void Call::ReleasePayloadBufferLocked() {
-  PW_DCHECK(active_locked());
-  channel().Release(response_);
+  if (!response_.empty()) {
+    channel().Release(response_);
+  } else {
+    rpc_lock().unlock();
+  }
 }
 
-void Call::Close() {
+void Call::UnregisterAndMarkClosed() {
   if (active_locked()) {
     endpoint().UnregisterCall(*this);
+    rpc_state_ = kInactive;
+    client_stream_state_ = kClientStreamInactive;
   }
-
-  rpc_state_ = kInactive;
-  client_stream_state_ = kClientStreamInactive;
 }
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index 25ecfcb..177dafc 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -116,7 +116,7 @@
   EXPECT_EQ(Status::Unauthenticated(), writer.Finish());
 }
 
-TEST(ServerWriter, Close) {
+TEST(ServerWriter, Finish) {
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
@@ -126,17 +126,15 @@
   EXPECT_EQ(Status::FailedPrecondition(), writer.Finish());
 }
 
-TEST(ServerWriter, Close_ReleasesBuffer) {
+TEST(ServerWriter, Finish_ReleasesBuffer) {
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
   ASSERT_TRUE(writer.active());
-  auto buffer = writer.PayloadBuffer();
-  buffer[0] = std::byte{0};
-  EXPECT_FALSE(writer.output_buffer().empty());
+  ASSERT_FALSE(writer.PayloadBuffer().empty());
   EXPECT_EQ(OkStatus(), writer.Finish());
   EXPECT_FALSE(writer.active());
-  EXPECT_TRUE(writer.output_buffer().empty());
+  // OutputBuffer asserts if the buffer is not released.
 }
 
 TEST(ServerWriter, Open_SendsPacketWithPayload) {
diff --git a/pw_rpc/client_call.cc b/pw_rpc/client_call.cc
index c7f67ff..7358f3c 100644
--- a/pw_rpc/client_call.cc
+++ b/pw_rpc/client_call.cc
@@ -16,12 +16,15 @@
 
 namespace pw::rpc::internal {
 
-void ClientCall::SendInitialRequestLocked(ConstByteSpan payload) {
-  if (const Status status = SendPacket(PacketType::REQUEST, payload);
-      !status.ok()) {
-    rpc_lock().lock();
-    HandleError(status);
+void ClientCall::CloseClientCall() {
+  if (client_stream_open()) {
+    // TODO(pwbug/597): Ensure the call object is locked before releasing the
+    //     RPC mutex.
+    CloseClientStreamLocked();
+    rpc_lock().lock();  // Reacquire after sending the packet
   }
+  CloseAndReleasePayloadBuffer();
+  rpc_lock().lock();  // Reacquire releasing the buffer
 }
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/nanopb/common.cc b/pw_rpc/nanopb/common.cc
index 715c0d2..743f995 100644
--- a/pw_rpc/nanopb/common.cc
+++ b/pw_rpc/nanopb/common.cc
@@ -45,8 +45,10 @@
 
 Result<ByteSpan> EncodeToPayloadBuffer(Call& call,
                                        const void* payload,
-                                       NanopbSerde serde) {
-  std::span<std::byte> payload_buffer = call.PayloadBuffer();
+                                       NanopbSerde serde)
+    PW_UNLOCK_FUNCTION(rpc_lock()) {
+  std::span<std::byte> payload_buffer = call.PayloadBufferInternal();
+  rpc_lock().unlock();
 
   StatusWithSize result = serde.Encode(payload, payload_buffer);
   if (!result.ok()) {
@@ -98,22 +100,27 @@
 void NanopbSendInitialRequest(ClientCall& call,
                               NanopbSerde serde,
                               const void* payload) {
-  PW_DCHECK(call.active());
+  PW_DCHECK(call.active_locked());
 
   Result<ByteSpan> result = EncodeToPayloadBuffer(call, payload, serde);
-  rpc_lock().lock();
 
+  rpc_lock().lock();
   if (result.ok()) {
-    call.SendInitialRequestLocked(*result);
+    call.SendInitialClientRequest(*result);
   } else {
     call.HandleError(result.status());
   }
 }
 
 Status NanopbSendStream(Call& call, const void* payload, NanopbSerde serde) {
-  PW_DCHECK(call.active());
+  rpc_lock().lock();
+  if (!call.active_locked()) {
+    rpc_lock().unlock();
+    return Status::FailedPrecondition();
+  }
 
   Result<ByteSpan> result = EncodeToPayloadBuffer(call, payload, serde);
+
   PW_TRY(result.status());
   return call.Write(*result);
 }
@@ -121,7 +128,9 @@
 Status SendFinalResponse(NanopbServerCall& call,
                          const void* payload,
                          const Status status) {
-  if (!call.active()) {
+  rpc_lock().lock();
+  if (!call.active_locked()) {
+    rpc_lock().unlock();
     return Status::FailedPrecondition();
   }
 
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index c0a7fe6..ef473c0 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -40,11 +40,12 @@
                         const Request&... request) {
     CallType call(client, channel_id, service_id, method_id, serde);
 
-    call.set_on_completed(std::move(on_completed));
-    call.set_on_error(std::move(on_error));
+    rpc_lock().lock();
+    call.set_on_completed_locked(std::move(on_completed));
+    call.set_on_error_locked(std::move(on_error));
 
     if constexpr (sizeof...(Request) == 0u) {
-      call.SendInitialRequest({});
+      call.SendInitialClientRequest({});
     } else {
       NanopbSendInitialRequest(call, serde.request(), &request...);
     }
@@ -74,30 +75,18 @@
     LockGuard lock(rpc_lock());
     MoveUnaryResponseClientCallFrom(other);
     serde_ = other.serde_;
-    set_on_completed(std::move(other.nanopb_on_completed_));
+    set_on_completed_locked(std::move(other.nanopb_on_completed_));
     return *this;
   }
 
   void set_on_completed(
-      Function<void(const Response& response, Status)>&& on_completed) {
-    nanopb_on_completed_ = std::move(on_completed);
-
-    UnaryResponseClientCall::set_on_completed(
-        [this](ConstByteSpan payload, Status status) {
-          if (nanopb_on_completed_) {
-            Response response_struct{};
-            if (serde_->DecodeResponse(payload, &response_struct)) {
-              nanopb_on_completed_(response_struct, status);
-            } else {
-              // TODO: it's silly to lock this just to call the callback
-              rpc_lock().lock();
-              CallOnError(Status::DataLoss());
-            }
-          }
-        });
+      Function<void(const Response& response, Status)>&& on_completed)
+      PW_LOCKS_EXCLUDED(rpc_lock()) {
+    LockGuard lock(rpc_lock());
+    set_on_completed_locked(std::move(on_completed));
   }
 
-  Status SendClientStream(const void* payload) {
+  Status SendClientStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
     if (!active()) {
       return Status::FailedPrecondition();
     }
@@ -105,6 +94,26 @@
   }
 
  private:
+  void set_on_completed_locked(
+      Function<void(const Response& response, Status)>&& on_completed)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    nanopb_on_completed_ = std::move(on_completed);
+
+    UnaryResponseClientCall::set_on_completed_locked(
+        [this](ConstByteSpan payload, Status status) {
+          if (nanopb_on_completed_) {
+            Response response_struct{};
+            if (serde_->DecodeResponse(payload, &response_struct)) {
+              nanopb_on_completed_(response_struct, status);
+            } else {
+              // TODO(hepler): This should send a DATA_LOSS error and call the
+              //     error callback.
+              CallOnError(Status::DataLoss());
+            }
+          }
+        });
+  }
+
   const NanopbMethodSerde* serde_;
   Function<void(const Response&, Status)> nanopb_on_completed_;
 };
@@ -125,12 +134,13 @@
                         const Request&... request) {
     CallType call(client, channel_id, service_id, method_id, serde);
 
-    call.set_on_next(std::move(on_next));
-    call.set_on_completed(std::move(on_completed));
-    call.set_on_error(std::move(on_error));
+    rpc_lock().lock();
+    call.set_on_next_locked(std::move(on_next));
+    call.set_on_completed_locked(std::move(on_completed));
+    call.set_on_error_locked(std::move(on_error));
 
     if constexpr (sizeof...(Request) == 0u) {
-      call.SendInitialRequest({});
+      call.SendInitialClientRequest({});
     } else {
       NanopbSendInitialRequest(call, serde.request(), &request...);
     }
@@ -188,7 +198,8 @@
         if (serde_->DecodeResponse(payload, &response_struct)) {
           nanopb_on_next_(response_struct);
         } else {
-          rpc_lock().lock();
+          // TODO(hepler): This should send a DATA_LOSS error and call the
+          //     error callback.
           CallOnError(Status::DataLoss());
         }
       }
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/common.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/common.h
index 68f43e4..abceb35 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/common.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/common.h
@@ -99,16 +99,17 @@
 void NanopbSendInitialRequest(ClientCall& call,
                               NanopbSerde serde,
                               const void* payload)
-    PW_LOCKS_EXCLUDED(rpc_lock());
+    PW_UNLOCK_FUNCTION(rpc_lock());
 
 // [Client/Server] Encodes and sends a client or server stream message.
 // active() must be true.
-Status NanopbSendStream(Call& call, const void* payload, NanopbSerde serde);
+Status NanopbSendStream(Call& call, const void* payload, NanopbSerde serde)
+    PW_LOCKS_EXCLUDED(rpc_lock());
 
 // [Server] Encodes and sends the final response message.
 // Returns Status::FailedPrecondition if active() is false.
 Status SendFinalResponse(NanopbServerCall& call,
                          const void* payload,
-                         Status status);
+                         Status status) PW_LOCKS_EXCLUDED(rpc_lock());
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index fbe9554..30170d4 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -46,7 +46,8 @@
 
   NanopbServerCall(const CallContext& context, MethodType type);
 
-  Status SendUnaryResponse(const void* payload, Status status) {
+  Status SendUnaryResponse(const void* payload, Status status)
+      PW_LOCKS_EXCLUDED(rpc_lock()) {
     return SendFinalResponse(*this, payload, status);
   }
 
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 0d62dd0..49c768e 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -100,16 +100,31 @@
     return CloseAndSendFinalPacket(PacketType::SERVER_ERROR, {}, error);
   }
 
-  Status CloseAndSendClientError(Status error) PW_LOCKS_EXCLUDED(rpc_lock()) {
-    return CloseAndSendFinalPacket(PacketType::CLIENT_ERROR, {}, error);
-  }
+  // Public call that ends the client stream for a client call.
+  Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock());
 
-  // Ends the client stream for a client call.
-  Status EndClientStream() PW_UNLOCK_FUNCTION(rpc_lock());
+  // Internal call that closes the client stream.
+  Status CloseClientStreamLocked() PW_UNLOCK_FUNCTION(rpc_lock()) {
+    client_stream_state_ = kClientStreamInactive;
+    return SendPacket(PacketType::CLIENT_STREAM_END, {}, {});
+  }
 
   // Sends a payload in either a server or client stream packet.
   Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock());
 
+  // Sends the initial request for a client call. If the request fails, the call
+  // is closed.
+  void SendInitialClientRequest(ConstByteSpan payload)
+      PW_UNLOCK_FUNCTION(rpc_lock()) {
+    // TODO(pwbug/597): Ensure the call object is locked before releasing the
+    //     RPC mutex.
+    if (const Status status = SendPacket(PacketType::REQUEST, payload);
+        !status.ok()) {
+      rpc_lock().lock();
+      HandleError(status);
+    }
+  }
+
   // Whenever a payload arrives (in a server/client stream or in a response),
   // call the on_next_ callback.
   // Precondition: rpc_lock() must be held.
@@ -127,7 +142,7 @@
   // Handles an error condition for the call. This closes the call and calls the
   // on_error callback, if set.
   void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
-    Close();
+    CloseAndReleasePayloadBuffer();
     CallOnError(status);
   }
 
@@ -151,6 +166,7 @@
   bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     return HasClientStream(type_);
   }
+
   bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     return HasServerStream(type_);
   }
@@ -159,9 +175,27 @@
     return client_stream_state_ == kClientStreamActive;
   }
 
-  // Acquires a buffer into which to write a payload or returns a previously
-  // acquired buffer. The Call MUST be active when this is called!
-  [[nodiscard]] ByteSpan PayloadBuffer() PW_LOCKS_EXCLUDED(rpc_lock());
+  // Public function that acquires a buffer into which to write a payload or
+  // returns a previously acquired buffer. If the call is inactive, an empty
+  // buffer is returned.
+  //
+  // The payload buffer is invalidated after any Write(), Finish(),
+  // CloseClientStream().
+  //
+  // TODO(hepler): Properly document these semantics.
+  [[nodiscard]] ByteSpan PayloadBuffer() PW_LOCKS_EXCLUDED(rpc_lock()) {
+    LockGuard lock(rpc_lock());
+
+    if (!active_locked()) {
+      return {};
+    }
+    return PayloadBufferInternal();
+  }
+
+  // Internal function to get a payload buffer. Does NOT check if the call is
+  // active.
+  [[nodiscard]] ByteSpan PayloadBufferInternal()
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
   // Releases the buffer without sending a packet.
   void ReleasePayloadBuffer() PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -173,7 +207,7 @@
   // function.
   void set_on_next(Function<void(ConstByteSpan)>&& on_next)
       PW_LOCKS_EXCLUDED(rpc_lock()) {
-    LockGuard lock{rpc_lock()};
+    LockGuard lock(rpc_lock());
     set_on_next_locked(std::move(on_next));
   }
 
@@ -214,21 +248,27 @@
   Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     return *endpoint_;
   }
-  Channel& channel() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
-    return *channel_;
-  }
 
-  void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next) {
+  void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     on_next_ = std::move(on_next);
   }
 
-  void set_on_error(Function<void(Status)>&& on_error) {
+  void set_on_error(Function<void(Status)>&& on_error)
+      PW_LOCKS_EXCLUDED(rpc_lock()) {
+    LockGuard lock(rpc_lock());
+    set_on_error_locked(std::move(on_error));
+  }
+
+  void set_on_error_locked(Function<void(Status)>&& on_error)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     on_error_ = std::move(on_error);
   }
 
   // Calls the on_error callback without closing the RPC. This is used when the
   // call has already completed.
-  void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) {
+  void CallOnError(Status error) PW_LOCKS_EXCLUDED(rpc_lock()) {
+    rpc_lock().lock();
     const bool invoke = on_error_ != nullptr;
 
     // TODO(pwbug/597): Ensure on_error_ is properly guarded.
@@ -242,19 +282,17 @@
     client_stream_state_ = kClientStreamInactive;
   }
 
-  constexpr const Channel::OutputBuffer& buffer() const { return response_; }
+  // Fully closes the call. Calls UnregisterAndMarkedClosed() and releases the
+  // payload buffer, if held. Call must be active().
+  void CloseAndReleasePayloadBuffer() PW_UNLOCK_FUNCTION(rpc_lock()) {
+    UnregisterAndMarkClosed();
+    ReleasePayloadBufferLocked();
+  }
 
-  // Sends a payload with the specified type. The payload may either be in an
-  // previously acquired buffer or in a standalone buffer.
-  //
-  // Returns FAILED_PRECONDITION if the call is not active().
-  Status SendPacket(PacketType type,
-                    ConstByteSpan payload,
-                    Status status = OkStatus()) PW_UNLOCK_FUNCTION(rpc_lock());
-
-  // Unregisters the RPC from the endpoint & marks as closed. The call may be
-  // active or inactive when this is called.
-  void Close() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+  Status CloseAndSendResponseLocked(Status status)
+      PW_UNLOCK_FUNCTION(rpc_lock()) {
+    return CloseAndSendFinalPacketLocked(PacketType::RESPONSE, {}, status);
+  }
 
   // Cancels an RPC. For client calls only.
   Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -279,6 +317,10 @@
        MethodType type,
        CallType call_type);
 
+  Channel& channel() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    return *channel_;
+  }
+
   void ReleasePayloadBufferLocked() PW_UNLOCK_FUNCTION(rpc_lock());
 
   Packet MakePacket(PacketType type,
@@ -294,9 +336,29 @@
                   status);
   }
 
+  // Sends a payload with the specified type. The payload may either be in a
+  // previously acquired buffer or in a standalone buffer.
+  //
+  // Returns FAILED_PRECONDITION if the call is not active().
+  Status SendPacket(PacketType type,
+                    ConstByteSpan payload,
+                    Status status = OkStatus()) PW_UNLOCK_FUNCTION(rpc_lock());
+
   Status CloseAndSendFinalPacket(PacketType type,
                                  ConstByteSpan response,
-                                 Status status) PW_LOCKS_EXCLUDED(rpc_lock());
+                                 Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
+    rpc_lock().lock();
+    return CloseAndSendFinalPacketLocked(type, response, status);
+  }
+
+  Status CloseAndSendFinalPacketLocked(PacketType type,
+                                       ConstByteSpan response,
+                                       Status status)
+      PW_UNLOCK_FUNCTION(rpc_lock());
+
+  // Unregisters the RPC from the endpoint & marks as closed. The call may be
+  // active or inactive when this is called.
+  void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
   internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
   internal::Channel* channel_ PW_GUARDED_BY(rpc_lock());
diff --git a/pw_rpc/public/pw_rpc/internal/channel.h b/pw_rpc/public/pw_rpc/internal/channel.h
index e4ac023..6b56092 100644
--- a/pw_rpc/public/pw_rpc/internal/channel.h
+++ b/pw_rpc/public/pw_rpc/internal/channel.h
@@ -33,7 +33,7 @@
       : rpc::Channel(id, output) {}
 
   // Represents a buffer acquired from a ChannelOutput.
-  class OutputBuffer {
+  class [[nodiscard]] OutputBuffer {
    public:
     constexpr OutputBuffer() = default;
 
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index c83b6e5..7eb6937 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -27,24 +27,10 @@
  public:
   ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) {
     rpc_lock().lock();
-    if (client_stream_open()) {
-      // TODO(pwbug/597): Ensure the call object is locked before releasing the
-      //     RPC mutex.
-      EndClientStream();
-      rpc_lock().lock();  // Reacquire after sending the packet
-    }
-    Close();
+    CloseClientCall();
     rpc_lock().unlock();
   }
 
-  void SendInitialRequest(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
-    rpc_lock().lock();
-    SendInitialRequestLocked(payload);
-  }
-
-  void SendInitialRequestLocked(ConstByteSpan payload)
-      PW_UNLOCK_FUNCTION(rpc_lock());
-
  protected:
   constexpr ClientCall() = default;
 
@@ -55,15 +41,13 @@
              MethodType type)
       : Call(client, channel_id, service_id, method_id, type) {}
 
+  // Sends CLIENT_STREAM_END if applicable, releases any held payload buffer,
+  // and marks the call as closed.
+  void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+
   void MoveClientCallFrom(ClientCall& other)
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
-    if (client_stream_open()) {
-      // TODO(pwbug/597): Ensure the call object is locked before releasing the
-      //     RPC mutex.
-      EndClientStream();
-      rpc_lock().lock();  // Reacquire after sending the packet
-    }
-    Close();
+    CloseClientCall();
     MoveFrom(other);
   }
 };
@@ -80,21 +64,21 @@
                         Function<void(ConstByteSpan, Status)>&& on_completed,
                         Function<void(Status)>&& on_error,
                         ConstByteSpan request) {
+    // TODO(pwbug/597): Consider requring the lock during call construction.
     CallType call(client, channel_id, service_id, method_id);
 
-    call.set_on_completed(std::move(on_completed));
-    call.set_on_error(std::move(on_error));
+    rpc_lock().lock();
+    call.set_on_completed_locked(std::move(on_completed));
+    call.set_on_error_locked(std::move(on_error));
 
-    call.SendInitialRequest(request);
+    call.SendInitialClientRequest(request);
     return call;
   }
 
   void HandleCompleted(ConstByteSpan response, Status status)
       PW_UNLOCK_FUNCTION(rpc_lock()) {
-    Close();
     const bool invoke_callback = on_completed_ != nullptr;
-
-    rpc_lock().unlock();
+    CloseAndReleasePayloadBuffer();
 
     if (invoke_callback) {
       on_completed_(response, status);
@@ -128,8 +112,16 @@
     on_completed_ = std::move(other.on_completed_);
   }
 
-  void set_on_completed(Function<void(ConstByteSpan, Status)>&& on_completed) {
+  void set_on_completed(Function<void(ConstByteSpan, Status)>&& on_completed)
+      PW_LOCKS_EXCLUDED(rpc_lock()) {
     // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
+    LockGuard lock(rpc_lock());
+    set_on_completed_locked(std::move(on_completed));
+  }
+
+  void set_on_completed_locked(
+      Function<void(ConstByteSpan, Status)>&& on_completed)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     on_completed_ = std::move(on_completed);
   }
 
@@ -152,22 +144,23 @@
                         Function<void(Status)>&& on_completed,
                         Function<void(Status)>&& on_error,
                         ConstByteSpan request) {
+    // TODO(hepler): FIGURE OUT LOCKING HERE
     CallType call(client, channel_id, service_id, method_id);
 
-    call.set_on_next(std::move(on_next));
-    call.set_on_completed(std::move(on_completed));
-    call.set_on_error(std::move(on_error));
+    rpc_lock().lock();
+    call.set_on_next_locked(std::move(on_next));
+    call.set_on_completed_locked(std::move(on_completed));
+    call.set_on_error_locked(std::move(on_error));
 
-    call.SendInitialRequest(request);
+    call.SendInitialClientRequest(request);
     return call;
   }
 
   void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
-    Close();
     const bool invoke_callback = on_completed_ != nullptr;
 
     // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
-    rpc_lock().unlock();
+    CloseAndReleasePayloadBuffer();
 
     if (invoke_callback) {
       on_completed_(status);
@@ -201,8 +194,15 @@
     on_completed_ = std::move(other.on_completed_);
   }
 
-  void set_on_completed(Function<void(Status)>&& on_completed) {
+  void set_on_completed(Function<void(Status)>&& on_completed)
+      PW_LOCKS_EXCLUDED(rpc_lock()) {
     // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
+    LockGuard lock(rpc_lock());
+    set_on_completed_locked(std::move(on_completed));
+  }
+
+  void set_on_completed_locked(Function<void(Status)>&& on_completed)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     on_completed_ = std::move(on_completed);
   }
 
diff --git a/pw_rpc/public/pw_rpc/internal/test_method_context.h b/pw_rpc/public/pw_rpc/internal/test_method_context.h
index 45547d8..143d52a 100644
--- a/pw_rpc/public/pw_rpc/internal/test_method_context.h
+++ b/pw_rpc/public/pw_rpc/internal/test_method_context.h
@@ -68,6 +68,7 @@
                                         channel_.id(),
                                         service_.id(),
                                         kMethodId,
+                                        0,
                                         {},
                                         error)
                                      .Encode(packet)
@@ -76,21 +77,6 @@
                   .ok());
   }
 
-  void SendCancel() {
-    std::byte packet[kNoPayloadPacketSizeBytes];
-    PW_ASSERT(server_
-                  .ProcessPacket(Packet(PacketType::CLIENT_ERROR,
-                                        channel_.id(),
-                                        service_.id(),
-                                        kMethodId,
-                                        {},
-                                        Status::Cancelled())
-                                     .Encode(packet)
-                                     .value(),
-                                 output_)
-                  .ok());
-  }
-
  protected:
   // Constructs the invocation context. The args for the ChannelOutput type are
   // passed in a std::tuple. The args for the Service are forwarded directly
diff --git a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
index 1badc02..483dc88 100644
--- a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
+++ b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
@@ -62,7 +62,6 @@
 
   // Expose a few additional methods for test use.
   ServerCall& as_server_call() { return *this; }
-  const Channel::OutputBuffer& output_buffer() { return buffer(); }
 };
 
 class FakeServerWriter : private FakeServerReaderWriter {
@@ -81,7 +80,6 @@
 
   // Functions for test use.
   using FakeServerReaderWriter::as_server_call;
-  using FakeServerReaderWriter::output_buffer;
   using FakeServerReaderWriter::PayloadBuffer;
 };
 
diff --git a/pw_rpc/raw/client_test.cc b/pw_rpc/raw/client_test.cc
index 36d4a19..c28af12 100644
--- a/pw_rpc/raw/client_test.cc
+++ b/pw_rpc/raw/client_test.cc
@@ -98,7 +98,8 @@
 TEST(Client, ProcessPacket_InvokesUnaryCallbacks) {
   RawClientTestContext context;
   TestUnaryCall call = MakeCall<UnaryMethod, TestUnaryCall>(context);
-  call.SendInitialRequest({});
+  internal::rpc_lock().lock();
+  call.SendInitialClientRequest({});
 
   ASSERT_NE(call.completed, OkStatus());
 
@@ -113,7 +114,8 @@
 TEST(Client, ProcessPacket_InvokesStreamCallbacks) {
   RawClientTestContext context;
   auto call = MakeCall<BidirectionalStreamMethod, TestStreamCall>(context);
-  call.SendInitialRequest({});
+  internal::rpc_lock().lock();
+  call.SendInitialClientRequest({});
 
   context.server().SendServerStream<BidirectionalStreamMethod>(
       std::as_bytes(std::span("<=>")));
@@ -129,7 +131,8 @@
 TEST(Client, ProcessPacket_InvokesErrorCallback) {
   RawClientTestContext context;
   auto call = MakeCall<BidirectionalStreamMethod, TestStreamCall>(context);
-  call.SendInitialRequest({});
+  internal::rpc_lock().lock();
+  call.SendInitialClientRequest({});
 
   context.server().SendServerError<BidirectionalStreamMethod>(
       Status::Aborted());
diff --git a/pw_rpc/raw/codegen_test.cc b/pw_rpc/raw/codegen_test.cc
index e98eeed..0dcbe1d 100644
--- a/pw_rpc/raw/codegen_test.cc
+++ b/pw_rpc/raw/codegen_test.cc
@@ -69,11 +69,16 @@
     return StatusWithSize(status, test_response.size());
   }
 
-  static void TestAnotherUnaryRpc(ConstByteSpan request,
-                                  RawUnaryResponder& responder) {
-    ByteSpan response = responder.PayloadBuffer();
-    StatusWithSize sws = TestUnaryRpc(request, response);
-    responder.Finish(response.first(sws.size()), sws.status());
+  void TestAnotherUnaryRpc(ConstByteSpan request,
+                           RawUnaryResponder& responder) {
+    if (request.empty()) {
+      last_responder_ = std::move(responder);
+    } else {
+      ByteSpan response = responder.PayloadBuffer();
+      StatusWithSize sws = TestUnaryRpc(request, response);
+
+      responder.Finish(response.first(sws.size()), sws.status());
+    }
   }
 
   void TestServerStreamRpc(ConstByteSpan request, RawServerWriter& writer) {
@@ -113,9 +118,7 @@
     });
   }
 
- protected:
-  RawServerReader last_reader_;
-  RawServerReaderWriter last_reader_writer_;
+  RawUnaryResponder& last_responder() { return last_responder_; }
 
  private:
   static uint32_t ReadInteger(ConstByteSpan request) {
@@ -166,6 +169,10 @@
     EXPECT_TRUE(has_status);
     return has_integer && has_status;
   }
+
+  RawUnaryResponder last_responder_;
+  RawServerReader last_reader_;
+  RawServerReaderWriter last_reader_writer_;
 };
 
 }  // namespace test
@@ -218,6 +225,73 @@
   }
 }
 
+TEST(RawCodegen, Server_HandleErrorWhileHoldingBuffer) {
+  PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
+
+  ASSERT_FALSE(ctx.service().last_responder().active());
+  ctx.call({});
+  ASSERT_TRUE(ctx.service().last_responder().active());
+
+  ASSERT_FALSE(ctx.service().last_responder().PayloadBuffer().empty());
+
+  ctx.SendClientError(Status::Unimplemented());
+
+  EXPECT_FALSE(ctx.service().last_responder().active());
+}
+
+TEST(RawCodegen, Server_FinishWhileHoldingBuffer) {
+  PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
+
+  ASSERT_FALSE(ctx.service().last_responder().active());
+  ctx.call({});
+  ASSERT_TRUE(ctx.service().last_responder().active());
+
+  ASSERT_FALSE(ctx.service().last_responder().PayloadBuffer().empty());
+
+  ctx.service().last_responder().Finish({});
+
+  EXPECT_FALSE(ctx.service().last_responder().active());
+}
+
+TEST(RawCodegen, Server_MoveIntoCallHoldingBuffer) {
+  // Create two call objects on different channels so they are unique.
+  PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
+
+  RawUnaryResponder call;
+
+  ctx.call({});
+  ASSERT_FALSE(ctx.service().last_responder().PayloadBuffer().empty());
+
+  call = std::move(ctx.service().last_responder());
+}
+
+TEST(RawCodegen, Server_MoveBetweenActiveCallsWithBuffers) {
+  // Create two call objects on different channels so they are unique.
+  PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_1;
+  ctx_1.set_channel_id(1);
+
+  PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_2;
+  ctx_2.set_channel_id(2);
+
+  ctx_1.call({});
+  ASSERT_FALSE(ctx_1.service().last_responder().PayloadBuffer().empty());
+
+  ctx_2.call({});
+  ASSERT_FALSE(ctx_2.service().last_responder().PayloadBuffer().empty());
+
+  ctx_1.service().last_responder() =
+      std::move(ctx_2.service().last_responder());
+
+  ASSERT_TRUE(ctx_1.service().last_responder().active());
+  ASSERT_FALSE(ctx_2.service().last_responder().active());
+
+  ctx_2.service().last_responder() =
+      std::move(ctx_1.service().last_responder());
+
+  ASSERT_FALSE(ctx_1.service().last_responder().active());
+  ASSERT_TRUE(ctx_2.service().last_responder().active());
+}
+
 TEST(RawCodegen, Server_InvokeServerStreamingRpc) {
   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestServerStreamRpc) context;
 
@@ -326,6 +400,8 @@
 
   test::pw_rpc::raw::TestService::Client service_client_;
 
+  // Store the payload as a null-terminated string for convenience. Use nullptr
+  // for an empty payload.
   std::optional<const char*> payload_;
   std::optional<Status> status_;
   std::optional<Status> error_;
@@ -333,8 +409,13 @@
  private:
   void CopyPayload(ConstByteSpan c_string) {
     ASSERT_LE(c_string.size(), sizeof(buffer_));
-    std::memcpy(buffer_, c_string.data(), c_string.size());
-    payload_ = buffer_;
+
+    if (c_string.empty()) {
+      payload_ = nullptr;
+    } else {
+      std::memcpy(buffer_, c_string.data(), c_string.size());
+      payload_ = buffer_;
+    }
   }
 
   char buffer_[64];
@@ -444,6 +525,69 @@
   EXPECT_FALSE(error_.has_value());
 }
 
+TEST_F(RawCodegenClientTest, ClientStream_FinishWhileHoldingBuffer) {
+  RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
+      context_.client(),
+      context_.channel().id(),
+      UnaryOnCompleted(),
+      OnError());
+
+  ASSERT_FALSE(call.PayloadBuffer().empty());
+
+  context_.server()
+      .SendResponse<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
+          {}, OkStatus());
+
+  ASSERT_TRUE(payload_.has_value());
+  EXPECT_EQ(payload_.value(), nullptr);
+  EXPECT_EQ(status_, OkStatus());
+}
+
+TEST_F(RawCodegenClientTest, ClientStream_CancelWhileHoldingBuffer) {
+  RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
+      context_.client(),
+      context_.channel().id(),
+      UnaryOnCompleted(),
+      OnError());
+
+  ASSERT_FALSE(call.PayloadBuffer().empty());
+
+  EXPECT_EQ(call.Cancel(), OkStatus());
+}
+
+TEST_F(RawCodegenClientTest, ClientStream_MoveIntoCallHoldingBuffer) {
+  RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
+      context_.client(),
+      context_.channel().id(),
+      UnaryOnCompleted(),
+      OnError());
+
+  // End the client stream so that ending it when moving doesn't free the
+  // acquired payload buffer.
+  EXPECT_EQ(OkStatus(), call.CloseClientStream());
+
+  ASSERT_FALSE(call.PayloadBuffer().empty());
+
+  RawClientWriter call_2;
+
+  call = std::move(call_2);
+  EXPECT_FALSE(call.active());
+}
+
+TEST_F(RawCodegenClientTest, ClientStream_GoOutOfScopeWhileHoldingBuffer) {
+  RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
+      context_.client(),
+      context_.channel().id(),
+      UnaryOnCompleted(),
+      OnError());
+
+  // End the client stream so that ending it when moving doesn't free the
+  // acquired payload buffer.
+  EXPECT_EQ(OkStatus(), call.CloseClientStream());
+
+  ASSERT_FALSE(call.PayloadBuffer().empty());
+}
+
 TEST_F(RawCodegenClientTest, InvokeClientStreamRpc_Error) {
   RawClientWriter call =
       service_client_.TestClientStreamRpc(UnaryOnCompleted(), OnError());
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
index b1d4cc2..d6b3ff6 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
@@ -54,6 +54,9 @@
   // arbitrary external buffer.
   using internal::Call::Write;
 
+  // Notifies the server that no further client stream messages will be sent.
+  using internal::Call::CloseClientStream;
+
   // Cancels this RPC.
   using internal::Call::Cancel;
 
@@ -119,6 +122,7 @@
   using internal::UnaryResponseClientCall::set_on_error;
 
   using internal::Call::Cancel;
+  using internal::Call::CloseClientStream;
   using internal::Call::PayloadBuffer;
   using internal::Call::Write;
 
diff --git a/pw_rpc/server_call.cc b/pw_rpc/server_call.cc
index 1b9b041..7768af6 100644
--- a/pw_rpc/server_call.cc
+++ b/pw_rpc/server_call.cc
@@ -19,12 +19,9 @@
 void ServerCall::MoveServerCallFrom(ServerCall& other) {
   // If this call is active, finish it first.
   if (active_locked()) {
-    Close();
     // TODO(pwbug/597): Ensure the call object is locked before releasing the
     //     RPC mutex.
-    SendPacket(PacketType::RESPONSE,
-               {},
-               OkStatus());  // Unlocks when it sends a packet
+    CloseAndSendResponseLocked(OkStatus());  // Unlocks when it sends a packet
     rpc_lock().lock();
   }