pw_rpc: Update ChannelOutput release API
This changes ChannelOutput::SendAndReleaseBuffer to take a span instead
of a size. The previous API restricted a ChannelOutput implementation to
using a single buffer, which is limiting for larger systems. With the
new API, a ChannelOutput can hand out multiple buffers simultaneously.
Change-Id: Ie4c55bf66063493bf280353795c0295ceb7e450a
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/24444
Reviewed-by: Wyatt Hepler <hepler@google.com>
diff --git a/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h b/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
index 4f0e401..8289784 100644
--- a/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
+++ b/pw_hdlc_lite/public/pw_hdlc_lite/rpc_channel.h
@@ -16,6 +16,7 @@
#include <array>
#include <span>
+#include "pw_assert/light.h"
#include "pw_hdlc_lite/encoder.h"
#include "pw_rpc/channel.h"
#include "pw_stream/stream.h"
@@ -24,6 +25,9 @@
// Custom HDLC ChannelOutput class to write and read data through serial using
// the HDLC-Lite protocol.
+//
+// WARNING: This ChannelOutput is not thread-safe.
+// TODO(frolv): Update this to use OS locking primitives.
class RpcChannelOutput : public rpc::ChannelOutput {
public:
// The RpcChannelOutput class does not own the buffer it uses to store the
@@ -40,9 +44,12 @@
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- Status SendAndReleaseBuffer(size_t size) override {
- return hdlc_lite::WriteInformationFrame(
- address_, buffer_.first(size), writer_);
+ Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+ PW_DASSERT(buffer.data() == buffer_.data());
+ if (buffer.empty()) {
+ return Status::Ok();
+ }
+ return hdlc_lite::WriteInformationFrame(address_, buffer, writer_);
}
private:
@@ -52,6 +59,9 @@
};
// RpcChannelOutput with its own buffer.
+//
+// WARNING: This ChannelOutput is not thread-safe.
+// TODO(frolv): Update this to use OS locking primitives.
template <size_t buffer_size>
class RpcChannelOutputBuffer : public rpc::ChannelOutput {
public:
@@ -62,9 +72,12 @@
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- Status SendAndReleaseBuffer(size_t size) override {
- return hdlc_lite::WriteInformationFrame(
- address_, std::span(buffer_.data(), size), writer_);
+ Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+ PW_DASSERT(buffer.data() == buffer_.data());
+ if (buffer.empty()) {
+ return Status::Ok();
+ }
+ return hdlc_lite::WriteInformationFrame(address_, buffer, writer_);
}
private:
diff --git a/pw_hdlc_lite/rpc_channel_test.cc b/pw_hdlc_lite/rpc_channel_test.cc
index 7e0d0dd..8e9c7db 100644
--- a/pw_hdlc_lite/rpc_channel_test.cc
+++ b/pw_hdlc_lite/rpc_channel_test.cc
@@ -56,12 +56,14 @@
memory_writer, channel_output_buffer, kAddress, "RpcChannelOutput");
constexpr byte test_data = byte{'A'};
- std::memcpy(output.AcquireBuffer().data(), &test_data, sizeof(test_data));
+ auto buffer = output.AcquireBuffer();
+ std::memcpy(buffer.data(), &test_data, sizeof(test_data));
constexpr auto expected = bytes::Concat(
kFlag, kAddress, kControl, 'A', uint32_t{0xA63E2FA5}, kFlag);
- EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(sizeof(test_data)));
+ EXPECT_EQ(Status::Ok(),
+ output.SendAndReleaseBuffer(buffer.first(sizeof(test_data))));
ASSERT_EQ(memory_writer.bytes_written(), expected.size());
EXPECT_EQ(
@@ -78,8 +80,8 @@
memory_writer, channel_output_buffer, kAddress, "RpcChannelOutput");
constexpr auto test_data = bytes::Array<0x7D>();
- std::memcpy(
- output.AcquireBuffer().data(), test_data.data(), test_data.size());
+ auto buffer = output.AcquireBuffer();
+ std::memcpy(buffer.data(), test_data.data(), test_data.size());
constexpr auto expected = bytes::Concat(kFlag,
kAddress,
@@ -88,7 +90,8 @@
byte{0x7d} ^ byte{0x20},
uint32_t{0x89515322},
kFlag);
- EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(test_data.size()));
+ EXPECT_EQ(Status::Ok(),
+ output.SendAndReleaseBuffer(buffer.first(test_data.size())));
ASSERT_EQ(memory_writer.bytes_written(), 10u);
EXPECT_EQ(
@@ -104,12 +107,14 @@
memory_writer, kAddress, "RpcChannelOutput");
constexpr byte test_data = byte{'A'};
- std::memcpy(output.AcquireBuffer().data(), &test_data, sizeof(test_data));
+ auto buffer = output.AcquireBuffer();
+ std::memcpy(buffer.data(), &test_data, sizeof(test_data));
constexpr auto expected = bytes::Concat(
kFlag, kAddress, kControl, 'A', uint32_t{0xA63E2FA5}, kFlag);
- EXPECT_EQ(Status::Ok(), output.SendAndReleaseBuffer(sizeof(test_data)));
+ EXPECT_EQ(Status::Ok(),
+ output.SendAndReleaseBuffer(buffer.first(sizeof(test_data))));
ASSERT_EQ(memory_writer.bytes_written(), expected.size());
EXPECT_EQ(
diff --git a/pw_rpc/channel.cc b/pw_rpc/channel.cc
index fb5eab2..2c67c39 100644
--- a/pw_rpc/channel.cc
+++ b/pw_rpc/channel.cc
@@ -29,15 +29,16 @@
Status Channel::Send(OutputBuffer& buffer, const internal::Packet& packet) {
Result encoded = packet.Encode(buffer.buffer_);
- buffer.buffer_ = {};
if (!encoded.ok()) {
PW_LOG_ERROR("Failed to encode response packet to channel buffer");
- output().SendAndReleaseBuffer(0);
+ output().DiscardBuffer(buffer.buffer_);
+ buffer.buffer_ = {};
return Status::Internal();
}
- return output().SendAndReleaseBuffer(encoded.value().size());
+ buffer.buffer_ = {};
+ return output().SendAndReleaseBuffer(encoded.value());
}
} // namespace pw::rpc::internal
diff --git a/pw_rpc/channel_test.cc b/pw_rpc/channel_test.cc
index cdddc14..a40e591 100644
--- a/pw_rpc/channel_test.cc
+++ b/pw_rpc/channel_test.cc
@@ -28,7 +28,9 @@
public:
NameTester(const char* name) : ChannelOutput(name) {}
std::span<std::byte> AcquireBuffer() override { return {}; }
- Status SendAndReleaseBuffer(size_t) override { return Status::Ok(); }
+ Status SendAndReleaseBuffer(std::span<const std::byte>) override {
+ return Status::Ok();
+ }
};
EXPECT_STREQ("hello_world", NameTester("hello_world").name());
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h b/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
index 6ff26d9..7909429 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb_test_method_context.h
@@ -16,7 +16,7 @@
#include <tuple>
#include <utility>
-#include "pw_assert/assert.h"
+#include "pw_assert/light.h"
#include "pw_containers/vector.h"
#include "pw_preprocessor/arguments.h"
#include "pw_rpc/channel.h"
@@ -111,7 +111,7 @@
private:
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- Status SendAndReleaseBuffer(size_t size) override;
+ Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
const internal::NanopbMethod& method_;
Vector<Response>& responses_;
@@ -179,7 +179,7 @@
// Gives access to the RPC's response.
const Response& response() const {
- PW_CHECK_UINT_GT(ctx_.responses.size(), 0);
+ PW_ASSERT(ctx_.responses.size() > 0u);
return ctx_.responses.back();
}
};
@@ -232,7 +232,7 @@
// The status of the stream. Only valid if done() is true.
Status status() const {
- PW_CHECK(done());
+ PW_ASSERT(done());
return ctx_.output.last_status();
}
};
@@ -256,16 +256,17 @@
}
template <typename Response>
-Status MessageOutput<Response>::SendAndReleaseBuffer(size_t size) {
- PW_CHECK(!stream_ended_);
+Status MessageOutput<Response>::SendAndReleaseBuffer(
+ std::span<const std::byte> buffer) {
+ PW_ASSERT(!stream_ended_);
+ PW_ASSERT(buffer.data() == buffer_.data());
- if (size == 0u) {
+ if (buffer.empty()) {
return Status::Ok();
}
- Result<internal::Packet> result =
- internal::Packet::FromBuffer(std::span(buffer_.data(), size));
- PW_CHECK(result.ok());
+ Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
+ PW_ASSERT(result.ok());
last_status_ = result.value().status();
@@ -274,7 +275,7 @@
// If we run out of space, the back message is always the most recent.
responses_.emplace_back();
responses_.back() = {};
- PW_CHECK(
+ PW_ASSERT(
method_.DecodeResponse(result.value().payload(), &responses_.back()));
total_responses_ += 1;
break;
diff --git a/pw_rpc/public/pw_rpc/channel.h b/pw_rpc/public/pw_rpc/channel.h
index 094d269..f773fe1 100644
--- a/pw_rpc/public/pw_rpc/channel.h
+++ b/pw_rpc/public/pw_rpc/channel.h
@@ -38,14 +38,22 @@
constexpr const char* name() const { return name_; }
- // Acquire a buffer into which to write an outgoing RPC packet.
+ // Acquire a buffer into which to write an outgoing RPC packet. The
+ // implementation is expected to handle synchronization if necessary.
virtual std::span<std::byte> AcquireBuffer() = 0;
- // Sends the contents of the buffer from AcquireBuffer(). Returns OK if the
- // operation succeeded, on an implementation-defined Status value if there was
- // an error. The implementation must NOT return FAILED_PRECONDITION or
- // INTERNAL, which are reserved by pw_rpc.
- virtual Status SendAndReleaseBuffer(size_t size) = 0;
+ // Sends the contents of a buffer previously obtained from AcquireBuffer().
+ // This may be called with an empty span, in which case the buffer should be
+ // released without sending any data.
+ //
+ // Returns OK if the operation succeeded, or an implementation-defined Status
+ // value if there was an error. The implementation must NOT return
+ // FAILED_PRECONDITION or INTERNAL, which are reserved by pw_rpc.
+ virtual Status SendAndReleaseBuffer(std::span<const std::byte> buffer) = 0;
+
+ void DiscardBuffer(std::span<const std::byte> buffer) {
+ SendAndReleaseBuffer(buffer.first(0));
+ }
private:
const char* name_;
diff --git a/pw_rpc/public/pw_rpc/internal/channel.h b/pw_rpc/public/pw_rpc/internal/channel.h
index 6533eda..3e3350e 100644
--- a/pw_rpc/public/pw_rpc/internal/channel.h
+++ b/pw_rpc/public/pw_rpc/internal/channel.h
@@ -80,8 +80,8 @@
Status Send(OutputBuffer& output, const internal::Packet& packet);
void Release(OutputBuffer& buffer) {
+ output().DiscardBuffer(buffer.buffer_);
buffer.buffer_ = {};
- output().SendAndReleaseBuffer(0);
}
};
diff --git a/pw_rpc/pw_rpc_private/internal_test_utils.h b/pw_rpc/pw_rpc_private/internal_test_utils.h
index faa4833..cf1899e 100644
--- a/pw_rpc/pw_rpc_private/internal_test_utils.h
+++ b/pw_rpc/pw_rpc_private/internal_test_utils.h
@@ -21,6 +21,7 @@
#include <cstdint>
#include <span>
+#include "pw_assert/light.h"
#include "pw_rpc/client.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/method.h"
@@ -39,13 +40,15 @@
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- Status SendAndReleaseBuffer(size_t size) override {
- if (size == 0u) {
+ Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+ if (buffer.empty()) {
return Status::Ok();
}
+ PW_ASSERT(buffer.data() == buffer_.data());
+
packet_count_ += 1;
- sent_data_ = std::span(buffer_.data(), size);
+ sent_data_ = buffer;
Result<internal::Packet> result = internal::Packet::FromBuffer(sent_data_);
EXPECT_EQ(Status::Ok(), result.status());
sent_packet_ = result.value_or(internal::Packet());
diff --git a/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
index 732a2b2..14e831e 100644
--- a/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw_test_method_context.h
@@ -119,7 +119,7 @@
private:
ByteSpan AcquireBuffer() override { return packet_buffer_; }
- Status SendAndReleaseBuffer(size_t size) override;
+ Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
Vector<ByteSpan>& responses_;
Vector<ResponseBuffer>& buffers_;
@@ -257,15 +257,16 @@
>>;
template <size_t output_size>
-Status MessageOutput<output_size>::SendAndReleaseBuffer(size_t size) {
+Status MessageOutput<output_size>::SendAndReleaseBuffer(
+ std::span<const std::byte> buffer) {
PW_ASSERT(!stream_ended_);
+ PW_ASSERT(buffer.data() == packet_buffer_.data());
- if (size == 0u) {
+ if (buffer.empty()) {
return Status::Ok();
}
- Result<internal::Packet> result =
- internal::Packet::FromBuffer(std::span(packet_buffer_.data(), size));
+ Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
PW_ASSERT(result.ok());
last_status_ = result.value().status();
diff --git a/pw_rpc/size_report/server_only.cc b/pw_rpc/size_report/server_only.cc
index a8f8ff2..b57102b 100644
--- a/pw_rpc/size_report/server_only.cc
+++ b/pw_rpc/size_report/server_only.cc
@@ -26,8 +26,9 @@
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- pw::Status SendAndReleaseBuffer(size_t size) override {
- return pw::sys_io::WriteBytes(std::span(buffer_, size)).status();
+ pw::Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+ PW_DCHECK_PTR_EQ(buffer.data(), buffer_);
+ return pw::sys_io::WriteBytes(buffer).status();
}
private:
diff --git a/pw_rpc/size_report/server_with_echo_service.cc b/pw_rpc/size_report/server_with_echo_service.cc
index 860ee33..9b2f4cf 100644
--- a/pw_rpc/size_report/server_with_echo_service.cc
+++ b/pw_rpc/size_report/server_with_echo_service.cc
@@ -29,8 +29,9 @@
std::span<std::byte> AcquireBuffer() override { return buffer_; }
- pw::Status SendAndReleaseBuffer(size_t size) override {
- return pw::sys_io::WriteBytes(std::span(buffer_, size)).status();
+ pw::Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override {
+ PW_DCHECK_PTR_EQ(buffer.data(), buffer_);
+ return pw::sys_io::WriteBytes(buffer).status();
}
private: