pw_rpc: Split public and internal interfaces
- Make an internal version of Server, which will expose functions for
working with Reader/Writer objects
- Make an internal version of Channel, which provides an interface for
sending packets.
- Define the Channel::OutputBuffer class for handling buffers acquired
from a ChannelOutput.
Change-Id: Ieea29c1392726cd2eb15008e19004cde05c7f43b
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/12160
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/BUILD b/pw_rpc/BUILD
index 7cf984f..b2f3f2c 100644
--- a/pw_rpc/BUILD
+++ b/pw_rpc/BUILD
@@ -34,6 +34,8 @@
hdrs = [
"public/pw_rpc/server.h",
"public/pw_rpc/server_context.h",
+ "public/pw_rpc/internal/channel.h",
+ "public/pw_rpc/internal/server.h",
# TODO(hepler): Only building the test version of the server for now.
"test_impl/public_overrides/pw_rpc/internal/method.h",
],
@@ -106,6 +108,15 @@
)
pw_cc_test(
+ name = "channel_test",
+ srcs = ["channel_test.cc"],
+ deps = [
+ ":common",
+ ":test_utils_test_server",
+ ],
+)
+
+pw_cc_test(
name = "packet_test",
srcs = [
"packet_test.cc",
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 1d3bcde..2cc9a8f 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -55,6 +55,7 @@
sources = [
"base_server_writer.cc",
"public/pw_rpc/internal/base_server_writer.h",
+ "public/pw_rpc/internal/server.h",
"public/pw_rpc/internal/service.h",
"server.cc",
"service.cc",
@@ -94,6 +95,7 @@
public_deps = [
":protos_pwpb",
dir_pw_assert,
+ dir_pw_log,
dir_pw_span,
dir_pw_status,
]
@@ -103,6 +105,7 @@
"packet.cc",
"public/pw_rpc/internal/base_method.h",
"public/pw_rpc/internal/call.h",
+ "public/pw_rpc/internal/channel.h",
"public/pw_rpc/internal/packet.h",
]
friend = [ "./*" ]
@@ -131,9 +134,10 @@
pw_test_group("tests") {
tests = [
":base_server_writer_test",
- "nanopb:method_test",
+ ":channel_test",
":packet_test",
":server_test",
+ "nanopb:method_test",
]
}
@@ -156,6 +160,14 @@
sources = [ "base_server_writer_test.cc" ]
}
+pw_test("channel_test") {
+ deps = [
+ ":common",
+ ":test_utils_test_server",
+ ]
+ sources = [ "channel_test.cc" ]
+}
+
pw_test("packet_test") {
deps = [
":common",
diff --git a/pw_rpc/base_server_writer.cc b/pw_rpc/base_server_writer.cc
index 5156f4d..bfe3fe7 100644
--- a/pw_rpc/base_server_writer.cc
+++ b/pw_rpc/base_server_writer.cc
@@ -14,7 +14,6 @@
#include "pw_rpc/internal/base_server_writer.h"
-#include "pw_assert/assert.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/server.h"
@@ -22,59 +21,47 @@
namespace pw::rpc::internal {
BaseServerWriter& BaseServerWriter::operator=(BaseServerWriter&& other) {
- context_ = std::move(other.context_);
+ call_ = std::move(other.call_);
response_ = std::move(other.response_);
state_ = std::move(other.state_);
+
other.state_ = kClosed;
return *this;
}
-void BaseServerWriter::close() {
- if (open()) {
- // TODO(hepler): Send a control packet indicating that the stream has
- // terminated, and remove this ServerWriter from the Server's list.
-
- state_ = kClosed;
+void BaseServerWriter::Finish() {
+ if (!open()) {
+ return;
}
+
+ // TODO(hepler): Send a control packet indicating that the stream has
+ // terminated.
+
+ state_ = kClosed;
}
-span<std::byte> BaseServerWriter::AcquireBuffer() {
+span<std::byte> BaseServerWriter::AcquirePayloadBuffer() {
if (!open()) {
return {};
}
- PW_DCHECK(response_.empty());
- response_ = context_.channel().AcquireBuffer();
-
- // Reserve space for the RPC packet header.
- return packet().PayloadUsableSpace(response_);
+ response_ = call_.channel().AcquireBuffer();
+ return response_.payload(packet());
}
-Status BaseServerWriter::SendAndReleaseBuffer(span<const std::byte> payload) {
+Status BaseServerWriter::ReleasePayloadBuffer(span<const std::byte> payload) {
if (!open()) {
return Status::FAILED_PRECONDITION;
}
-
- Packet response_packet = packet();
- response_packet.set_payload(payload);
- StatusWithSize encoded = response_packet.Encode(response_);
- response_ = {};
-
- if (!encoded.ok()) {
- context_.channel().SendAndReleaseBuffer(0);
- return Status::INTERNAL;
- }
-
- // TODO(hepler): Should Channel::SendAndReleaseBuffer return Status?
- context_.channel().SendAndReleaseBuffer(encoded.size());
- return Status::OK;
+ return call_.channel().Send(response_, packet(payload));
}
-Packet BaseServerWriter::packet() const {
+Packet BaseServerWriter::packet(span<const std::byte> payload) const {
return Packet(PacketType::RPC,
- context_.channel_id(),
- context_.service().id(),
- method().id());
+ call_.channel().id(),
+ call_.service().id(),
+ method().id(),
+ payload);
}
} // namespace pw::rpc::internal
diff --git a/pw_rpc/base_server_writer_test.cc b/pw_rpc/base_server_writer_test.cc
index 7bf8d7f..c7b0480 100644
--- a/pw_rpc/base_server_writer_test.cc
+++ b/pw_rpc/base_server_writer_test.cc
@@ -63,11 +63,11 @@
constexpr FakeServerWriter() = default;
Status Write(span<const byte> response) {
- span buffer = AcquireBuffer();
+ span buffer = AcquirePayloadBuffer();
std::memcpy(buffer.data(),
response.data(),
std::min(buffer.size(), response.size()));
- return SendAndReleaseBuffer(buffer.first(response.size()));
+ return ReleasePayloadBuffer(buffer.first(response.size()));
}
};
@@ -82,7 +82,7 @@
FakeServerWriter writer(context.get());
ASSERT_TRUE(writer.open());
- writer.close();
+ writer.Finish();
EXPECT_FALSE(writer.open());
}
@@ -107,7 +107,7 @@
ServerContextForTest<TestService> context;
FakeServerWriter writer(context.get());
- writer.close();
+ writer.Finish();
constexpr byte data[] = {byte{0xf0}, byte{0x0d}};
EXPECT_EQ(Status::FAILED_PRECONDITION, writer.Write(data));
diff --git a/pw_rpc/channel.cc b/pw_rpc/channel.cc
index 6095479..5747577 100644
--- a/pw_rpc/channel.cc
+++ b/pw_rpc/channel.cc
@@ -12,6 +12,33 @@
// License for the specific language governing permissions and limitations under
// the License.
-#include "pw_rpc/channel.h"
+#include "pw_rpc/internal/channel.h"
-namespace pw::rpc {} // namespace pw::rpc
+#include "pw_log/log.h"
+#include "pw_rpc/internal/packet.h"
+
+namespace pw::rpc::internal {
+
+using std::byte;
+
+span<byte> Channel::OutputBuffer::payload(const Packet& packet) const {
+ const size_t reserved_size = packet.MinEncodedSizeBytes();
+ return reserved_size <= buffer_.size() ? buffer_.subspan(reserved_size)
+ : span<byte>();
+}
+
+Status Channel::Send(OutputBuffer& buffer, const internal::Packet& packet) {
+ StatusWithSize encoded = packet.Encode(buffer.buffer_);
+ buffer.buffer_ = {};
+
+ if (!encoded.ok()) {
+ PW_LOG_ERROR("Failed to encode response packet to channel buffer");
+ output().SendAndReleaseBuffer(0);
+ return Status::INTERNAL;
+ }
+
+ output().SendAndReleaseBuffer(encoded.size());
+ return Status::OK;
+}
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/channel_test.cc b/pw_rpc/channel_test.cc
new file mode 100644
index 0000000..5f195af
--- /dev/null
+++ b/pw_rpc/channel_test.cc
@@ -0,0 +1,105 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/channel.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/internal/packet.h"
+#include "pw_rpc_private/test_utils.h"
+
+namespace pw::rpc::internal {
+namespace {
+
+using std::byte;
+
+TEST(ChannelOutput, Name) {
+ class NameTester : public ChannelOutput {
+ public:
+ NameTester(const char* name) : ChannelOutput(name) {}
+ span<std::byte> AcquireBuffer() override { return {}; }
+ void SendAndReleaseBuffer(size_t) override {}
+ };
+
+ EXPECT_STREQ("hello_world", NameTester("hello_world").name());
+ EXPECT_EQ(nullptr, NameTester(nullptr).name());
+}
+
+constexpr Packet kTestPacket(PacketType::RPC, 1, 42, 100);
+const size_t kReservedSize = 2 /* type */ + 2 /* channel */ + 2 /* service */ +
+ 2 /* method */ + 2 /* payload key */ +
+ 2 /* status */;
+
+TEST(Channel, TestPacket_ReservedSizeMatchesMinEncodedSizeBytes) {
+ EXPECT_EQ(kReservedSize, kTestPacket.MinEncodedSizeBytes());
+}
+
+TEST(Channel, OutputBuffer_EmptyBuffer) {
+ TestOutput<0> output;
+ internal::Channel channel(100, &output);
+
+ Channel::OutputBuffer buffer = channel.AcquireBuffer();
+ EXPECT_TRUE(buffer.payload(kTestPacket).empty());
+}
+
+TEST(Channel, OutputBuffer_TooSmall) {
+ TestOutput<kReservedSize - 1> output;
+ internal::Channel channel(100, &output);
+
+ Channel::OutputBuffer output_buffer = channel.AcquireBuffer();
+ EXPECT_TRUE(output_buffer.payload(kTestPacket).empty());
+
+ EXPECT_EQ(Status::INTERNAL, channel.Send(output_buffer, kTestPacket));
+}
+
+TEST(Channel, OutputBuffer_ExactFit) {
+ TestOutput<kReservedSize> output;
+ internal::Channel channel(100, &output);
+
+ Channel::OutputBuffer output_buffer(channel.AcquireBuffer());
+ const span payload = output_buffer.payload(kTestPacket);
+
+ EXPECT_EQ(payload.size(), output.buffer().size() - kReservedSize);
+ EXPECT_EQ(output.buffer().data() + kReservedSize, payload.data());
+
+ EXPECT_EQ(Status::OK, channel.Send(output_buffer, kTestPacket));
+}
+
+TEST(Channel, OutputBuffer_PayloadDoesNotFit_ReportsError) {
+ TestOutput<kReservedSize> output;
+ internal::Channel channel(100, &output);
+
+ Channel::OutputBuffer output_buffer(channel.AcquireBuffer());
+
+ Packet packet = kTestPacket;
+ byte data[1] = {};
+ packet.set_payload(data);
+
+ EXPECT_EQ(Status::INTERNAL, channel.Send(output_buffer, packet));
+}
+
+TEST(Channel, OutputBuffer_ExtraRoom) {
+ TestOutput<kReservedSize * 3> output;
+ internal::Channel channel(100, &output);
+
+ Channel::OutputBuffer output_buffer = channel.AcquireBuffer();
+ const span payload = output_buffer.payload(kTestPacket);
+
+ EXPECT_EQ(payload.size(), output.buffer().size() - kReservedSize);
+ EXPECT_EQ(output.buffer().data() + kReservedSize, payload.data());
+
+ EXPECT_EQ(Status::OK, channel.Send(output_buffer, kTestPacket));
+}
+
+} // namespace
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h b/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
index 3f74472..f50a997 100644
--- a/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
+++ b/pw_rpc/nanopb/public_overrides/pw_rpc/internal/method.h
@@ -264,14 +264,13 @@
template <typename T>
Status ServerWriter<T>::Write(const T& response) {
- // TODO(hepler): Need to think about what Status this returns. Should channels
- // return a status for their SendAndReleaseBuffer?
- span<std::byte> buffer = AcquireBuffer();
+ span<std::byte> buffer = AcquirePayloadBuffer();
if (auto result = method().EncodeResponse(&response, buffer); result.ok()) {
- return SendAndReleaseBuffer(buffer.first(result.size()));
+ return ReleasePayloadBuffer(buffer.first(result.size()));
}
+ ReleasePayloadBuffer({});
return Status::INTERNAL;
}
diff --git a/pw_rpc/packet.cc b/pw_rpc/packet.cc
index 7fa7a8d..b6e6701 100644
--- a/pw_rpc/packet.cc
+++ b/pw_rpc/packet.cc
@@ -89,7 +89,7 @@
return StatusWithSize(proto.size());
}
-span<byte> Packet::PayloadUsableSpace(span<byte> buffer) const {
+size_t Packet::MinEncodedSizeBytes() const {
size_t reserved_size = 0;
reserved_size += 1; // channel_id key
@@ -108,8 +108,7 @@
// Payload field takes at least two bytes to encode (varint key + length).
reserved_size += 2;
- return reserved_size <= buffer.size() ? buffer.subspan(reserved_size)
- : span<byte>();
+ return reserved_size;
}
} // namespace pw::rpc::internal
diff --git a/pw_rpc/packet_test.cc b/pw_rpc/packet_test.cc
index 9e2fa44..a8d96a7 100644
--- a/pw_rpc/packet_test.cc
+++ b/pw_rpc/packet_test.cc
@@ -55,10 +55,6 @@
byte{0x00},
};
-constexpr size_t kReservedSize = 2 /* type */ + 2 /* channel */ +
- 2 /* service */ + 2 /* method */ +
- 2 /* payload key */ + 2 /* status */;
-
TEST(Packet, Encode) {
byte buffer[64];
@@ -110,45 +106,18 @@
EXPECT_EQ(decoded.status(), Status::UNAVAILABLE);
}
-TEST(Packet, PayloadUsableSpace_EmptyBuffer) {
- Packet packet(PacketType::RPC, 1, 42, 100, kPayload);
- EXPECT_TRUE(packet.PayloadUsableSpace(span<byte>()).empty());
-}
-
-TEST(Packet, PayloadUsableSpace_TooSmall) {
- Packet packet(PacketType::RPC, 1, 42, 100, kPayload);
-
- byte buffer[10];
- EXPECT_TRUE(packet.PayloadUsableSpace(buffer).empty());
-}
+constexpr size_t kReservedSize = 2 /* type */ + 2 /* channel */ +
+ 2 /* service */ + 2 /* method */ +
+ 2 /* payload key */ + 2 /* status */;
TEST(Packet, PayloadUsableSpace_ExactFit) {
- byte buffer[kReservedSize];
- const span payload =
- Packet(PacketType::RPC, 1, 42, 100).PayloadUsableSpace(buffer);
-
- EXPECT_EQ(payload.size(), sizeof(buffer) - kReservedSize);
- EXPECT_EQ(buffer + kReservedSize, payload.data());
-}
-
-TEST(Packet, PayloadUsableSpace_ExtraRoom) {
- byte buffer[kReservedSize * 3];
- const span payload =
- Packet(PacketType::RPC, 1, 42, 100).PayloadUsableSpace(buffer);
-
- EXPECT_EQ(payload.size(), sizeof(buffer) - kReservedSize);
- EXPECT_EQ(buffer + kReservedSize, payload.data());
+ EXPECT_EQ(kReservedSize,
+ Packet(PacketType::RPC, 1, 42, 100).MinEncodedSizeBytes());
}
TEST(Packet, PayloadUsableSpace_LargerVarints) {
- byte buffer[kReservedSize * 3];
- const span payload =
- Packet(PacketType::RPC, 17000, 200, 200).PayloadUsableSpace(buffer);
-
- constexpr size_t expected_size = kReservedSize + 2 + 1 + 1;
-
- EXPECT_EQ(payload.size(), sizeof(buffer) - expected_size);
- EXPECT_EQ(buffer + expected_size, payload.data());
+ EXPECT_EQ(kReservedSize + 2 + 1 + 1,
+ Packet(PacketType::RPC, 17000, 200, 200).MinEncodedSizeBytes());
}
} // namespace
diff --git a/pw_rpc/public/pw_rpc/channel.h b/pw_rpc/public/pw_rpc/channel.h
index 08ae219..2602bbe 100644
--- a/pw_rpc/public/pw_rpc/channel.h
+++ b/pw_rpc/public/pw_rpc/channel.h
@@ -65,20 +65,18 @@
constexpr uint32_t id() const { return id_; }
constexpr bool assigned() const { return id_ != kUnassignedChannelId; }
- private:
- friend class Server;
- friend class internal::BaseServerWriter;
-
- span<std::byte> AcquireBuffer() const { return output_->AcquireBuffer(); }
- void SendAndReleaseBuffer(size_t size) const {
- output_->SendAndReleaseBuffer(size);
- }
-
+ protected:
constexpr Channel(uint32_t id, ChannelOutput* output)
: id_(id), output_(output) {
PW_CHECK_UINT_NE(id, kUnassignedChannelId);
}
+ ChannelOutput& output() const {
+ PW_CHECK_NOTNULL(output_);
+ return *output_;
+ }
+
+ private:
uint32_t id_;
ChannelOutput* output_;
};
diff --git a/pw_rpc/public/pw_rpc/internal/base_server_writer.h b/pw_rpc/public/pw_rpc/internal/base_server_writer.h
index 8c95816..30d0883 100644
--- a/pw_rpc/public/pw_rpc/internal/base_server_writer.h
+++ b/pw_rpc/public/pw_rpc/internal/base_server_writer.h
@@ -16,8 +16,8 @@
#include <cstddef>
#include <utility>
-#include "pw_rpc/channel.h"
-#include "pw_rpc/server_context.h"
+#include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/channel.h"
#include "pw_span/span.h"
namespace pw::rpc::internal {
@@ -33,35 +33,36 @@
// cancelling / terminating ongoing streaming RPCs.
class BaseServerWriter {
public:
- constexpr BaseServerWriter(ServerCall& context)
- : context_(context), state_{kOpen} {}
-
- BaseServerWriter(BaseServerWriter&& other) { *this = std::move(other); }
- BaseServerWriter& operator=(BaseServerWriter&& other);
+ constexpr BaseServerWriter(ServerCall& call) : call_(call), state_(kOpen) {}
BaseServerWriter(const BaseServerWriter&) = delete;
+
+ BaseServerWriter(BaseServerWriter&& other) { *this = std::move(other); }
+
BaseServerWriter& operator=(const BaseServerWriter&) = delete;
+ BaseServerWriter& operator=(BaseServerWriter&& other);
+
// True if the ServerWriter is active and ready to send responses.
bool open() const { return state_ == kOpen; }
// Closes the ServerWriter, if it is open.
- void close();
+ void Finish();
protected:
constexpr BaseServerWriter() : state_{kClosed} {}
- const Method& method() const { return context_.method(); }
+ const Method& method() const { return call_.method(); }
- span<std::byte> AcquireBuffer();
+ span<std::byte> AcquirePayloadBuffer();
- Status SendAndReleaseBuffer(span<const std::byte> payload);
+ Status ReleasePayloadBuffer(span<const std::byte> payload);
private:
- Packet packet() const;
+ Packet packet(span<const std::byte> payload = {}) const;
- ServerCall context_;
- span<std::byte> response_;
+ ServerCall call_;
+ Channel::OutputBuffer response_;
enum { kClosed, kOpen } state_;
};
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 580f7fa..d8813ae 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -17,16 +17,16 @@
#include <cstdint>
#include "pw_assert/assert.h"
-#include "pw_rpc/channel.h"
+#include "pw_rpc/internal/channel.h"
namespace pw::rpc {
-class Server;
class ServerContext;
namespace internal {
class Method;
+class Server;
class Service;
// Collects information for an ongoing RPC being processed by the server.
@@ -37,8 +37,6 @@
// interface to the internal::ServerCall.
class ServerCall {
public:
- uint32_t channel_id() const { return channel().id(); }
-
constexpr ServerCall()
: server_(nullptr),
channel_(nullptr),
@@ -47,7 +45,7 @@
constexpr ServerCall(Server& server,
Channel& channel,
- internal::Service& service,
+ Service& service,
const internal::Method& method)
: server_(&server),
channel_(&channel),
@@ -70,7 +68,7 @@
return *channel_;
}
- internal::Service& service() const {
+ Service& service() const {
PW_DCHECK_NOTNULL(service_);
return *service_;
}
@@ -83,7 +81,7 @@
private:
Server* server_;
Channel* channel_;
- internal::Service* service_;
+ Service* service_;
const internal::Method* method_;
};
diff --git a/pw_rpc/public/pw_rpc/internal/channel.h b/pw_rpc/public/pw_rpc/internal/channel.h
new file mode 100644
index 0000000..b3f01f7
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/channel.h
@@ -0,0 +1,69 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_rpc/channel.h"
+#include "pw_span/span.h"
+#include "pw_status/status.h"
+
+namespace pw::rpc::internal {
+
+class Packet;
+
+class Channel : public rpc::Channel {
+ public:
+ Channel() = delete;
+
+ constexpr Channel(uint32_t id, ChannelOutput* output)
+ : rpc::Channel(id, output) {}
+
+ class OutputBuffer {
+ public:
+ constexpr OutputBuffer() = default;
+
+ OutputBuffer(const OutputBuffer&) = delete;
+
+ OutputBuffer(OutputBuffer&& other) { *this = std::move(other); }
+
+ ~OutputBuffer() { PW_DCHECK(buffer_.empty()); }
+
+ OutputBuffer& operator=(const OutputBuffer&) = delete;
+
+ OutputBuffer& operator=(OutputBuffer&& other) {
+ PW_DCHECK(buffer_.empty());
+ buffer_ = other.buffer_;
+ other.buffer_ = {};
+ return *this;
+ }
+
+ // Returns a portion of this OutputBuffer to use as the packet payload.
+ span<std::byte> payload(const Packet& packet) const;
+
+ private:
+ friend class Channel;
+
+ explicit constexpr OutputBuffer(span<std::byte> buffer) : buffer_(buffer) {}
+
+ span<std::byte> buffer_;
+ };
+
+ // Acquires a buffer for the packet.
+ OutputBuffer AcquireBuffer() const {
+ return OutputBuffer(output().AcquireBuffer());
+ }
+
+ Status Send(OutputBuffer& output, const internal::Packet& packet);
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/packet.h b/pw_rpc/public/pw_rpc/internal/packet.h
index 281e17d..3f5e87d 100644
--- a/pw_rpc/public/pw_rpc/internal/packet.h
+++ b/pw_rpc/public/pw_rpc/internal/packet.h
@@ -47,9 +47,9 @@
StatusWithSize Encode(span<std::byte> buffer) const;
// Determines the space required to encode the packet proto fields for a
- // response, and splits the buffer into reserved space and available space for
- // the payload. Returns a subspan of the payload space.
- span<std::byte> PayloadUsableSpace(span<std::byte> buffer) const;
+ // response. This may be used to split the buffer into reserved space and
+ // available space for the payload.
+ size_t MinEncodedSizeBytes() const;
bool is_control() const { return !is_rpc(); }
bool is_rpc() const { return type_ == PacketType::RPC; }
diff --git a/pw_rpc/public/pw_rpc/internal/server.h b/pw_rpc/public/pw_rpc/internal/server.h
new file mode 100644
index 0000000..53cdee0
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/server.h
@@ -0,0 +1,25 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_rpc/server.h"
+
+namespace pw::rpc::internal {
+
+class Server : public rpc::Server {
+ public:
+ Server() = delete;
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/server.h b/pw_rpc/public/pw_rpc/server.h
index cf4c583..409e93c 100644
--- a/pw_rpc/public/pw_rpc/server.h
+++ b/pw_rpc/public/pw_rpc/server.h
@@ -17,22 +17,17 @@
#include "pw_containers/intrusive_list.h"
#include "pw_rpc/channel.h"
+#include "pw_rpc/internal/base_server_writer.h"
+#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/service.h"
namespace pw::rpc {
-namespace internal {
-
-class Method;
-class Packet;
-
-} // namespace internal
class Server {
public:
- constexpr Server(span<Channel> channels) : channels_(channels) {}
-
- Server(const Server& other) = delete;
- Server& operator=(const Server& other) = delete;
+ constexpr Server(span<Channel> channels)
+ : channels_(static_cast<internal::Channel*>(channels.data()),
+ channels.size()) {}
// Registers a service with the server. This should not be called directly
// with an internal::Service; instead, use a generated class which inherits
@@ -51,14 +46,10 @@
internal::Packet& response,
span<std::byte> buffer);
- void SendResponse(const Channel& output,
- const internal::Packet& response,
- span<std::byte> response_buffer) const;
+ internal::Channel* FindChannel(uint32_t id) const;
+ internal::Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
- Channel* FindChannel(uint32_t id) const;
- Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
-
- span<Channel> channels_;
+ span<internal::Channel> channels_;
IntrusiveList<internal::Service> services_;
};
diff --git a/pw_rpc/pw_rpc_private/test_utils.h b/pw_rpc/pw_rpc_private/test_utils.h
index 262a079..57767ca 100644
--- a/pw_rpc/pw_rpc_private/test_utils.h
+++ b/pw_rpc/pw_rpc_private/test_utils.h
@@ -13,13 +13,14 @@
// the License.
#pragma once
+#include <array>
#include <cstddef>
#include <cstdint>
-#include "pw_rpc/channel.h"
+#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/server.h"
+#include "pw_rpc/internal/server.h"
#include "pw_span/span.h"
namespace pw::rpc {
@@ -33,13 +34,15 @@
span<std::byte> AcquireBuffer() override { return buffer_; }
void SendAndReleaseBuffer(size_t size) override {
- sent_packet_ = {buffer_, size};
+ sent_packet_ = span(buffer_.data(), size);
}
+ span<const std::byte> buffer() const { return buffer_; }
+
const span<const std::byte>& sent_packet() const { return sent_packet_; }
private:
- std::byte buffer_[buffer_size];
+ std::array<std::byte, buffer_size> buffer_;
span<const std::byte> sent_packet_;
};
@@ -56,7 +59,10 @@
: channel_(Channel::Create<kChannelId>(&output_)),
server_(span(&channel_, 1)),
service_(kServiceId),
- context_(server_, channel_, service_, method) {
+ context_(static_cast<internal::Server&>(server_),
+ static_cast<internal::Channel&>(channel_),
+ service_,
+ method) {
server_.RegisterService(service_);
}
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index bc12505..11857c1 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -19,13 +19,13 @@
#include "pw_log/log.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
+#include "pw_rpc/internal/server.h"
#include "pw_rpc/server_context.h"
namespace pw::rpc {
using std::byte;
-using internal::Method;
using internal::Packet;
using internal::PacketType;
@@ -46,7 +46,7 @@
Packet response(PacketType::RPC);
- Channel* channel = FindChannel(packet.channel_id());
+ internal::Channel* channel = FindChannel(packet.channel_id());
if (channel == nullptr) {
// If the requested channel doesn't exist, try to dynamically assign one.
channel = AssignChannel(packet.channel_id(), interface);
@@ -54,25 +54,26 @@
// If a channel can't be assigned, send back a response indicating that
// the server cannot process the request. The channel_id in the response
// is not set, to allow clients to detect this error case.
- Channel temp_channel(packet.channel_id(), &interface);
+ internal::Channel temp_channel(packet.channel_id(), &interface);
response.set_status(Status::RESOURCE_EXHAUSTED);
- SendResponse(temp_channel, response, temp_channel.AcquireBuffer());
+ auto response_buffer = temp_channel.AcquireBuffer();
+ temp_channel.Send(response_buffer, response);
return;
}
}
response.set_channel_id(channel->id());
- const span<byte> response_buffer = channel->AcquireBuffer();
+ auto response_buffer = channel->AcquireBuffer();
// Invoke the method with matching service and method IDs, if any.
- InvokeMethod(packet, *channel, response, response_buffer);
- SendResponse(*channel, response, response_buffer);
+ InvokeMethod(packet, *channel, response, response_buffer.payload(response));
+ channel->Send(response_buffer, response);
}
void Server::InvokeMethod(const Packet& request,
Channel& channel,
internal::Packet& response,
- span<std::byte> buffer) {
+ span<std::byte> payload_buffer) {
auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
return s.id() == request.service_id();
});
@@ -97,18 +98,19 @@
response.set_method_id(method->id());
- span<byte> response_buffer = request.PayloadUsableSpace(buffer);
-
- internal::ServerCall call(*this, channel, *service, *method);
+ internal::ServerCall call(static_cast<internal::Server&>(*this),
+ static_cast<internal::Channel&>(channel),
+ *service,
+ *method);
StatusWithSize result =
- method->Invoke(call, request.payload(), response_buffer);
+ method->Invoke(call, request.payload(), payload_buffer);
response.set_status(result.status());
- response.set_payload(response_buffer.first(result.size()));
+ response.set_payload(payload_buffer.first(result.size()));
}
-Channel* Server::FindChannel(uint32_t id) const {
- for (Channel& c : channels_) {
+internal::Channel* Server::FindChannel(uint32_t id) const {
+ for (internal::Channel& c : channels_) {
if (c.id() == id) {
return &c;
}
@@ -116,30 +118,17 @@
return nullptr;
}
-Channel* Server::AssignChannel(uint32_t id, ChannelOutput& interface) {
- Channel* channel = FindChannel(Channel::kUnassignedChannelId);
+internal::Channel* Server::AssignChannel(uint32_t id,
+ ChannelOutput& interface) {
+ internal::Channel* channel = FindChannel(Channel::kUnassignedChannelId);
if (channel == nullptr) {
return nullptr;
}
- *channel = Channel(id, &interface);
+ *channel = internal::Channel(id, &interface);
return channel;
}
-void Server::SendResponse(const Channel& channel,
- const Packet& response,
- span<byte> response_buffer) const {
- StatusWithSize sws = response.Encode(response_buffer);
- if (!sws.ok()) {
- // TODO(frolv): What should be done here?
- channel.SendAndReleaseBuffer(0);
- PW_LOG_ERROR("Failed to encode response packet to channel buffer");
- return;
- }
-
- channel.SendAndReleaseBuffer(sws.size());
-}
-
static_assert(std::is_base_of<internal::BaseMethod, internal::Method>(),
"The Method implementation must be derived from "
"pw::rpc::internal::BaseMethod");
diff --git a/pw_rpc/test_impl/public_overrides/pw_rpc/internal/method.h b/pw_rpc/test_impl/public_overrides/pw_rpc/internal/method.h
index 4c91255..6c3beeb 100644
--- a/pw_rpc/test_impl/public_overrides/pw_rpc/internal/method.h
+++ b/pw_rpc/test_impl/public_overrides/pw_rpc/internal/method.h
@@ -32,7 +32,7 @@
StatusWithSize Invoke(ServerCall& call,
span<const std::byte> request,
span<std::byte> payload_buffer) const {
- last_channel_id_ = call.channel_id();
+ last_channel_id_ = call.channel().id();
last_request_ = request;
last_payload_buffer_ = payload_buffer;