pw_rpc: Update ChannelOutput release API

This changes ChannelOutput::SendAndReleaseBuffer to take a span instead
of a size. The previous API restricted a ChannelOutput implementation to
using a single buffer, which is limiting for larger systems. With the
new API, a ChannelOutput can hand out multiple buffers simultaneously.

Change-Id: Ie4c55bf66063493bf280353795c0295ceb7e450a
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/24444
Reviewed-by: Wyatt Hepler <hepler@google.com>
diff --git a/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h b/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
index 4f0e401..8289784 100644
--- a/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
+++ b/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
@@ -16,6 +16,7 @@
 #include <array>
 #include <span>
 
+#include "pw_assert/light.h"
 #include "pw_hdlc_lite/encoder.h"
 #include "pw_rpc/channel.h"
 #include "pw_stream/stream.h"
@@ -24,6 +25,9 @@
 
 // Custom HDLC ChannelOutput class to write and read data through serial using
 // the HDLC-Lite protocol.
+//
+// WARNING: This ChannelOutput is not thread-safe.
+// TODO(frolv): Update this to use OS locking primitives.
 class RpcChannelOutput : public rpc::ChannelOutput {
  public:
   // The RpcChannelOutput class does not own the buffer it uses to store the
@@ -40,9 +44,12 @@
 
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  Status SendAndReleaseBuffer(size_t size) override {
-    return hdlc_lite::WriteInformationFrame(
-        address_, buffer_.first(size), writer_);
+  Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+    PW_DASSERT(buffer.data() == buffer_.data());
+    if (buffer.empty()) {
+      return Status::Ok();
+    }
+    return hdlc_lite::WriteInformationFrame(address_, buffer, writer_);
   }
 
  private:
@@ -52,6 +59,9 @@
 };
 
 // RpcChannelOutput with its own buffer.
+//
+// WARNING: This ChannelOutput is not thread-safe.
+// TODO(frolv): Update this to use OS locking primitives.
 template <size_t buffer_size>
 class RpcChannelOutputBuffer : public rpc::ChannelOutput {
  public:
@@ -62,9 +72,12 @@
 
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  Status SendAndReleaseBuffer(size_t size) override {
-    return hdlc_lite::WriteInformationFrame(
-        address_, std::span(buffer_.data(), size), writer_);
+  Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+    PW_DASSERT(buffer.data() == buffer_.data());
+    if (buffer.empty()) {
+      return Status::Ok();
+    }
+    return hdlc_lite::WriteInformationFrame(address_, buffer, writer_);
   }
 
  private:
diff --git a/pw_hdlc_lite/rpc_channel_test.cc b/pw_hdlc_lite/rpc_channel_test.cc
index 7e0d0dd..8e9c7db 100644
--- a/pw_hdlc_lite/rpc_channel_test.cc
+++ b/pw_hdlc_lite/rpc_channel_test.cc
@@ -56,12 +56,14 @@
       memory_writer, channel_output_buffer, kAddress, "RpcChannelOutput");
 
   constexpr byte test_data = byte{'A'};
-  std::memcpy(output.AcquireBuffer().data(), &test_data, sizeof(test_data));
+  auto buffer = output.AcquireBuffer();
+  std::memcpy(buffer.data(), &test_data, sizeof(test_data));
 
   constexpr auto expected = bytes::Concat(
       kFlag, kAddress, kControl, 'A', uint32_t{0xA63E2FA5}, kFlag);
 
-  EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(sizeof(test_data)));
+  EXPECT_EQ(Status::Ok(),
+            output.SendAndReleaseBuffer(buffer.first(sizeof(test_data))));
 
   ASSERT_EQ(memory_writer.bytes_written(), expected.size());
   EXPECT_EQ(
@@ -78,8 +80,8 @@
       memory_writer, channel_output_buffer, kAddress, "RpcChannelOutput");
 
   constexpr auto test_data = bytes::Array<0x7D>();
-  std::memcpy(
-      output.AcquireBuffer().data(), test_data.data(), test_data.size());
+  auto buffer = output.AcquireBuffer();
+  std::memcpy(buffer.data(), test_data.data(), test_data.size());
 
   constexpr auto expected = bytes::Concat(kFlag,
                                           kAddress,
@@ -88,7 +90,8 @@
                                           byte{0x7d} ^ byte{0x20},
                                           uint32_t{0x89515322},
                                           kFlag);
-  EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(test_data.size()));
+  EXPECT_EQ(Status::Ok(),
+            output.SendAndReleaseBuffer(buffer.first(test_data.size())));
 
   ASSERT_EQ(memory_writer.bytes_written(), 10u);
   EXPECT_EQ(
@@ -104,12 +107,14 @@
       memory_writer, kAddress, "RpcChannelOutput");
 
   constexpr byte test_data = byte{'A'};
-  std::memcpy(output.AcquireBuffer().data(), &test_data, sizeof(test_data));
+  auto buffer = output.AcquireBuffer();
+  std::memcpy(buffer.data(), &test_data, sizeof(test_data));
 
   constexpr auto expected = bytes::Concat(
       kFlag, kAddress, kControl, 'A', uint32_t{0xA63E2FA5}, kFlag);
 
-  EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(sizeof(test_data)));
+  EXPECT_EQ(Status::Ok(),
+            output.SendAndReleaseBuffer(buffer.first(sizeof(test_data))));
 
   ASSERT_EQ(memory_writer.bytes_written(), expected.size());
   EXPECT_EQ(
diff --git a/pw_rpc/channel.cc b/pw_rpc/channel.cc
index fb5eab2..2c67c39 100644
--- a/pw_rpc/channel.cc
+++ b/pw_rpc/channel.cc
@@ -29,15 +29,16 @@
 
 Status Channel::Send(OutputBuffer& buffer, const internal::Packet& packet) {
   Result encoded = packet.Encode(buffer.buffer_);
-  buffer.buffer_ = {};
 
   if (!encoded.ok()) {
     PW_LOG_ERROR("Failed to encode response packet to channel buffer");
-    output().SendAndReleaseBuffer(0);
+    output().DiscardBuffer(buffer.buffer_);
+    buffer.buffer_ = {};
     return Status::Internal();
   }
 
-  return output().SendAndReleaseBuffer(encoded.value().size());
+  buffer.buffer_ = {};
+  return output().SendAndReleaseBuffer(encoded.value());
 }
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/channel_test.cc b/pw_rpc/channel_test.cc
index cdddc14..a40e591 100644
--- a/pw_rpc/channel_test.cc
+++ b/pw_rpc/channel_test.cc
@@ -28,7 +28,9 @@
    public:
     NameTester(const char* name) : ChannelOutput(name) {}
     std::span<std::byte> AcquireBuffer() override { return {}; }
-    Status SendAndReleaseBuffer(size_t) override { return Status::Ok(); }
+    Status SendAndReleaseBuffer(std::span<const std::byte>) override {
+      return Status::Ok();
+    }
   };
 
   EXPECT_STREQ("hello_world", NameTester("hello_world").name());
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h b/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
index 6ff26d9..7909429 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
@@ -16,7 +16,7 @@
 #include <tuple>
 #include <utility>
 
-#include "pw_assert/assert.h"
+#include "pw_assert/light.h"
 #include "pw_containers/vector.h"
 #include "pw_preprocessor/arguments.h"
 #include "pw_rpc/channel.h"
@@ -111,7 +111,7 @@
  private:
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  Status SendAndReleaseBuffer(size_t size) override;
+  Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
 
   const internal::NanopbMethod& method_;
   Vector<Response>& responses_;
@@ -179,7 +179,7 @@
 
   // Gives access to the RPC's response.
   const Response& response() const {
-    PW_CHECK_UINT_GT(ctx_.responses.size(), 0);
+    PW_ASSERT(ctx_.responses.size() > 0u);
     return ctx_.responses.back();
   }
 };
@@ -232,7 +232,7 @@
 
   // The status of the stream. Only valid if done() is true.
   Status status() const {
-    PW_CHECK(done());
+    PW_ASSERT(done());
     return ctx_.output.last_status();
   }
 };
@@ -256,16 +256,17 @@
 }
 
 template <typename Response>
-Status MessageOutput<Response>::SendAndReleaseBuffer(size_t size) {
-  PW_CHECK(!stream_ended_);
+Status MessageOutput<Response>::SendAndReleaseBuffer(
+    std::span<const std::byte> buffer) {
+  PW_ASSERT(!stream_ended_);
+  PW_ASSERT(buffer.data() == buffer_.data());
 
-  if (size == 0u) {
+  if (buffer.empty()) {
     return Status::Ok();
   }
 
-  Result<internal::Packet> result =
-      internal::Packet::FromBuffer(std::span(buffer_.data(), size));
-  PW_CHECK(result.ok());
+  Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
+  PW_ASSERT(result.ok());
 
   last_status_ = result.value().status();
 
@@ -274,7 +275,7 @@
       // If we run out of space, the back message is always the most recent.
       responses_.emplace_back();
       responses_.back() = {};
-      PW_CHECK(
+      PW_ASSERT(
           method_.DecodeResponse(result.value().payload(), &responses_.back()));
       total_responses_ += 1;
       break;
diff --git a/pw_rpc/public/pw_rpc/channel.h b/pw_rpc/public/pw_rpc/channel.h
index 094d269..f773fe1 100644
--- a/pw_rpc/public/pw_rpc/channel.h
+++ b/pw_rpc/public/pw_rpc/channel.h
@@ -38,14 +38,22 @@
 
   constexpr const char* name() const { return name_; }
 
-  // Acquire a buffer into which to write an outgoing RPC packet.
+  // Acquire a buffer into which to write an outgoing RPC packet. The
+  // implementation is expected to handle synchronization if necessary.
   virtual std::span<std::byte> AcquireBuffer() = 0;
 
-  // Sends the contents of the buffer from AcquireBuffer(). Returns OK if the
-  // operation succeeded, on an implementation-defined Status value if there was
-  // an error. The implementation must NOT return FAILED_PRECONDITION or
-  // INTERNAL, which are reserved by pw_rpc.
-  virtual Status SendAndReleaseBuffer(size_t size) = 0;
+  // Sends the contents of a buffer previously obtained from AcquireBuffer().
+  // This may be called with an empty span, in which case the buffer should be
+  // released without sending any data.
+  //
+  // Returns OK if the operation succeeded, or an implementation-defined Status
+  // value if there was an error. The implementation must NOT return
+  // FAILED_PRECONDITION or INTERNAL, which are reserved by pw_rpc.
+  virtual Status SendAndReleaseBuffer(std::span<const std::byte> buffer) = 0;
+
+  void DiscardBuffer(std::span<const std::byte> buffer) {
+    SendAndReleaseBuffer(buffer.first(0));
+  }
 
  private:
   const char* name_;
diff --git a/pw_rpc/public/pw_rpc/internal/channel.h b/pw_rpc/public/pw_rpc/internal/channel.h
index 6533eda..3e3350e 100644
--- a/pw_rpc/public/pw_rpc/internal/channel.h
+++ b/pw_rpc/public/pw_rpc/internal/channel.h
@@ -80,8 +80,8 @@
   Status Send(OutputBuffer& output, const internal::Packet& packet);
 
   void Release(OutputBuffer& buffer) {
+    output().DiscardBuffer(buffer.buffer_);
     buffer.buffer_ = {};
-    output().SendAndReleaseBuffer(0);
   }
 };
 
diff --git a/pw_rpc/pw_rpc_private/internal_test_utils.h b/pw_rpc/pw_rpc_private/internal_test_utils.h
index faa4833..cf1899e 100644
--- a/pw_rpc/pw_rpc_private/internal_test_utils.h
+++ b/pw_rpc/pw_rpc_private/internal_test_utils.h
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <span>
 
+#include "pw_assert/light.h"
 #include "pw_rpc/client.h"
 #include "pw_rpc/internal/channel.h"
 #include "pw_rpc/internal/method.h"
@@ -39,13 +40,15 @@
 
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  Status SendAndReleaseBuffer(size_t size) override {
-    if (size == 0u) {
+  Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+    if (buffer.empty()) {
       return Status::Ok();
     }
 
+    PW_ASSERT(buffer.data() == buffer_.data());
+
     packet_count_ += 1;
-    sent_data_ = std::span(buffer_.data(), size);
+    sent_data_ = buffer;
     Result<internal::Packet> result = internal::Packet::FromBuffer(sent_data_);
     EXPECT_EQ(Status::Ok(), result.status());
     sent_packet_ = result.value_or(internal::Packet());
diff --git a/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
index 732a2b2..14e831e 100644
--- a/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
@@ -119,7 +119,7 @@
  private:
   ByteSpan AcquireBuffer() override { return packet_buffer_; }
 
-  Status SendAndReleaseBuffer(size_t size) override;
+  Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
 
   Vector<ByteSpan>& responses_;
   Vector<ResponseBuffer>& buffers_;
@@ -257,15 +257,16 @@
                >>;
 
 template <size_t output_size>
-Status MessageOutput<output_size>::SendAndReleaseBuffer(size_t size) {
+Status MessageOutput<output_size>::SendAndReleaseBuffer(
+    std::span<const std::byte> buffer) {
   PW_ASSERT(!stream_ended_);
+  PW_ASSERT(buffer.data() == packet_buffer_.data());
 
-  if (size == 0u) {
+  if (buffer.empty()) {
     return Status::Ok();
   }
 
-  Result<internal::Packet> result =
-      internal::Packet::FromBuffer(std::span(packet_buffer_.data(), size));
+  Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
   PW_ASSERT(result.ok());
 
   last_status_ = result.value().status();
diff --git a/pw_rpc/size_report/server_only.cc b/pw_rpc/size_report/server_only.cc
index a8f8ff2..b57102b 100644
--- a/pw_rpc/size_report/server_only.cc
+++ b/pw_rpc/size_report/server_only.cc
@@ -26,8 +26,9 @@
 
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  pw::Status SendAndReleaseBuffer(size_t size) override {
-    return pw::sys_io::WriteBytes(std::span(buffer_, size)).status();
+  pw::Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+    PW_DCHECK_PTR_EQ(buffer.data(), buffer_);
+    return pw::sys_io::WriteBytes(buffer).status();
   }
 
  private:
diff --git a/pw_rpc/size_report/server_with_echo_service.cc b/pw_rpc/size_report/server_with_echo_service.cc
index 860ee33..9b2f4cf 100644
--- a/pw_rpc/size_report/server_with_echo_service.cc
+++ b/pw_rpc/size_report/server_with_echo_service.cc
@@ -29,8 +29,9 @@
 
   std::span<std::byte> AcquireBuffer() override { return buffer_; }
 
-  pw::Status SendAndReleaseBuffer(size_t size) override {
-    return pw::sys_io::WriteBytes(std::span(buffer_, size)).status();
+  pw::Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+    PW_DCHECK_PTR_EQ(buffer.data(), buffer_);
+    return pw::sys_io::WriteBytes(buffer).status();
   }
 
  private: