pw_rpc: Support opening a ServerReader/Writer
- Deprecate open() in favor of active(), which isn't ambiguous about
whether it's a verb or adjective and doesn't conflict with Open().
- Support opening a ServerReader/Writer object server-side, without a
client calling the RPC. This is helpful for testing and in situations
when the server needs to send a response to an RPC the client hasn't
called the RPC yet (e.g. respond to a Reset() RPC after rebooting).
Change-Id: I2c7d3186597f576db2edbead7fd9a8cd407f4983
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/57882
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index c8f7096..69525da 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -51,6 +51,7 @@
"public/pw_rpc/internal/method.h",
"public/pw_rpc/internal/method_lookup.h",
"public/pw_rpc/internal/method_union.h",
+ "public/pw_rpc/internal/open_call.h",
"server.cc",
"service.cc",
],
@@ -198,6 +199,7 @@
"nanopb/public/pw_rpc/nanopb/test_method_context.h",
"nanopb/pw_rpc_nanopb_private/internal_test_utils.h",
"nanopb/server_reader_writer.cc",
+ "nanopb/server_reader_writer_test.cc",
"nanopb/stub_generation_test.cc",
],
)
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index bc5f7a1..d0a6346 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -66,6 +66,7 @@
"public/pw_rpc/internal/method.h",
"public/pw_rpc/internal/method_lookup.h",
"public/pw_rpc/internal/method_union.h",
+ "public/pw_rpc/internal/open_call.h",
"server.cc",
"service.cc",
]
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index 939165a..b7cd614 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -46,10 +46,10 @@
Call::Call(const CallContext& call, MethodType type)
: call_(call),
- rpc_state_(kOpen),
+ rpc_state_(kActive),
type_(type),
- client_stream_state_(HasClientStream(type) ? kClientStreamOpen
- : kClientStreamClosed) {
+ client_stream_state_(HasClientStream(type) ? kClientStreamActive
+ : kClientStreamInactive) {
call_.server().RegisterCall(*this);
}
@@ -62,7 +62,7 @@
type_ = other.type_;
client_stream_state_ = other.client_stream_state_;
- if (other.open()) {
+ if (other.active()) {
other.Close();
other.call_.server().RegisterCall(*this);
}
@@ -85,7 +85,7 @@
Status Call::CloseAndSendResponse(std::span<const std::byte> response,
Status status) {
- if (!open()) {
+ if (!active()) {
return Status::FailedPrecondition();
}
@@ -107,7 +107,7 @@
}
std::span<std::byte> Call::AcquirePayloadBuffer() {
- PW_DCHECK(open());
+ PW_DCHECK(active());
// Only allow having one active buffer at a time.
if (response_.empty()) {
@@ -118,21 +118,21 @@
}
Status Call::SendPayloadBufferClientStream(std::span<const std::byte> payload) {
- PW_DCHECK(open());
+ PW_DCHECK(active());
return call_.channel().Send(response_, StreamPacket(call_, payload));
}
void Call::ReleasePayloadBuffer() {
- PW_DCHECK(open());
+ PW_DCHECK(active());
call_.channel().Release(response_);
}
void Call::Close() {
- PW_DCHECK(open());
+ PW_DCHECK(active());
call_.server().UnregisterCall(*this);
- rpc_state_ = kClosed;
- client_stream_state_ = kClientStreamClosed;
+ rpc_state_ = kInactive;
+ client_stream_state_ = kClientStreamInactive;
}
} // namespace pw::rpc::internal
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index cee015a..7fb6d5f 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -48,7 +48,7 @@
FakeServerWriter writer(context.get());
- EXPECT_TRUE(writer.open());
+ EXPECT_TRUE(writer.active());
}
TEST(ServerWriter, Move_ClosesOriginal) {
@@ -58,15 +58,15 @@
FakeServerWriter writer(std::move(moved));
#ifndef __clang_analyzer__
- EXPECT_FALSE(moved.open());
+ EXPECT_FALSE(moved.active());
#endif // ignore use-after-move
- EXPECT_TRUE(writer.open());
+ EXPECT_TRUE(writer.active());
}
TEST(ServerWriter, DefaultConstruct_Closed) {
FakeServerWriter writer;
- EXPECT_FALSE(writer.open());
+ EXPECT_FALSE(writer.active());
}
TEST(ServerWriter, Construct_RegistersWithServer) {
@@ -121,9 +121,9 @@
ServerContextForTest<TestService> context(TestService::method.method());
FakeServerWriter writer(context.get());
- ASSERT_TRUE(writer.open());
+ ASSERT_TRUE(writer.active());
EXPECT_EQ(OkStatus(), writer.Finish());
- EXPECT_FALSE(writer.open());
+ EXPECT_FALSE(writer.active());
EXPECT_EQ(Status::FailedPrecondition(), writer.Finish());
}
@@ -131,12 +131,12 @@
ServerContextForTest<TestService> context(TestService::method.method());
FakeServerWriter writer(context.get());
- ASSERT_TRUE(writer.open());
+ ASSERT_TRUE(writer.active());
auto buffer = writer.PayloadBuffer();
buffer[0] = std::byte{0};
EXPECT_FALSE(writer.output_buffer().empty());
EXPECT_EQ(OkStatus(), writer.Finish());
- EXPECT_FALSE(writer.open());
+ EXPECT_FALSE(writer.active());
EXPECT_TRUE(writer.output_buffer().empty());
}
@@ -198,11 +198,11 @@
ServerContextForTest<TestService> context(TestService::method.method());
test::FakeServerReader reader(context.get());
- EXPECT_TRUE(reader.as_responder().open());
+ EXPECT_TRUE(reader.as_responder().active());
EXPECT_TRUE(reader.as_responder().client_stream_open());
EXPECT_EQ(OkStatus(), reader.as_responder().CloseAndSendResponse(OkStatus()));
- EXPECT_FALSE(reader.as_responder().open());
+ EXPECT_FALSE(reader.as_responder().active());
EXPECT_FALSE(reader.as_responder().client_stream_open());
}
@@ -210,11 +210,11 @@
ServerContextForTest<TestService> context(TestService::method.method());
test::FakeServerReader reader(context.get());
- EXPECT_TRUE(reader.open());
+ EXPECT_TRUE(reader.active());
EXPECT_TRUE(reader.as_responder().client_stream_open());
reader.as_responder().EndClientStream();
- EXPECT_TRUE(reader.open());
+ EXPECT_TRUE(reader.active());
EXPECT_FALSE(reader.as_responder().client_stream_open());
}
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 7188722..c34563c 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -45,6 +45,18 @@
synchronously handled unary RPC before it completes. The same RPC may be invoked
multiple times simultaneously if the invocations are on different channels.
+Unrequested responses
+---------------------
+``pw_rpc`` supports sending responses to RPCs that have not yet been invoked by
+a client. This is useful in testing and in situations like an RPC that triggers
+reboot. After the reboot, the device opens the writer object and sends its
+response to the client.
+
+.. admonition:: Under construction
+
+ The ``ReaderWriter::Open()`` API is cumbersome, but a more streamlined API
+ will be added to the generated RPC code.
+
Creating an RPC
===============
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index 217c5ef..70aec83 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -115,6 +115,7 @@
":method_lookup_test",
":method_test",
":method_union_test",
+ ":server_reader_writer_test",
":stub_generation_test",
]
}
@@ -188,6 +189,16 @@
enable_if = dir_pw_third_party_nanopb != ""
}
+pw_test("server_reader_writer_test") {
+ deps = [
+ ":method",
+ ":test_method_context",
+ "..:test_protos.nanopb_rpc",
+ ]
+ sources = [ "server_reader_writer_test.cc" ]
+ enable_if = dir_pw_third_party_nanopb != ""
+}
+
pw_test("stub_generation_test") {
deps = [ "..:test_protos.nanopb_rpc" ]
sources = [ "stub_generation_test.cc" ]
diff --git a/pw_rpc/nanopb/method_union_test.cc b/pw_rpc/nanopb/method_union_test.cc
index 2a82cf3..92d3982 100644
--- a/pw_rpc/nanopb/method_union_test.cc
+++ b/pw_rpc/nanopb/method_union_test.cc
@@ -108,7 +108,7 @@
method.Invoke(context.get(), context.request(request));
- EXPECT_TRUE(last_raw_writer.open());
+ EXPECT_TRUE(last_raw_writer.active());
EXPECT_EQ(OkStatus(), last_raw_writer.Finish());
EXPECT_EQ(context.output().sent_packet().type(), PacketType::RESPONSE);
}
@@ -148,7 +148,7 @@
method.Invoke(context.get(), context.request(request));
EXPECT_EQ(555, last_request.integer);
- EXPECT_TRUE(last_writer.open());
+ EXPECT_TRUE(last_writer.active());
EXPECT_EQ(OkStatus(), last_writer.Finish());
EXPECT_EQ(context.output().sent_packet().type(), PacketType::RESPONSE);
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
index d04110e..659e272 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
@@ -29,11 +29,12 @@
class NanopbFakeChannelOutput final
: public internal::test::FakeChannelOutputBuffer<kOutputSize> {
public:
- template <auto kMethod, uint32_t kMethodId, typename ServiceType>
+ // Creates a NanopbFakeChannelOutput for the specified method.
+ template <typename ServiceType, auto kMethod, uint32_t kMethodId>
static NanopbFakeChannelOutput Create() {
return NanopbFakeChannelOutput(
- internal::MethodLookup::GetNanopbMethod<ServiceType, kMethodId>(),
- internal::MethodTraits<decltype(kMethod)>::kType);
+ internal::MethodTraits<decltype(kMethod)>::kType,
+ internal::MethodLookup::GetNanopbMethod<ServiceType, kMethodId>());
}
// Private constructor, do not use. This constructor is exposed so this class
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index 271c482..303825c 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -20,6 +20,8 @@
#include "pw_bytes/span.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/method_lookup.h"
+#include "pw_rpc/internal/open_call.h"
#include "pw_rpc/server.h"
namespace pw::rpc {
@@ -93,6 +95,26 @@
class NanopbServerReaderWriter
: private internal::BaseNanopbServerReader<Request> {
public:
+ // Creates a NanopbServerReaderWriter that is ready to send responses for a
+ // particular RPC. This can be used for testing or to send responses to an RPC
+ // that has not been started by a client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static NanopbServerReaderWriter Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ static_assert(std::is_same_v<Request, internal::Request<kMethod>>,
+ "The request type of a NanopbServerReaderWriter must match "
+ "the method.");
+ static_assert(std::is_same_v<Response, internal::Response<kMethod>>,
+ "The response type of a NanopbServerReaderWriter must match "
+ "the method.");
+ return {internal::OpenCall<kMethod, MethodType::kBidirectionalStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+ }
+
constexpr NanopbServerReaderWriter()
: internal::BaseNanopbServerReader<Request>(
MethodType::kBidirectionalStreaming) {}
@@ -100,7 +122,7 @@
NanopbServerReaderWriter(NanopbServerReaderWriter&&) = default;
NanopbServerReaderWriter& operator=(NanopbServerReaderWriter&&) = default;
- using internal::GenericNanopbResponder::open;
+ using internal::GenericNanopbResponder::active;
using internal::GenericNanopbResponder::channel_id;
@@ -141,6 +163,26 @@
template <typename Request, typename Response>
class NanopbServerReader : private internal::BaseNanopbServerReader<Request> {
public:
+ // Creates a NanopbServerReader that is ready to send a response to a
+ // particular RPC. This can be used for testing or to finish an RPC that has
+ // not been started by the client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static NanopbServerReader Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ static_assert(
+ std::is_same_v<Request, internal::Request<kMethod>>,
+ "The request type of a NanopbServerReader must match the method.");
+ static_assert(
+ std::is_same_v<Response, internal::Response<kMethod>>,
+ "The response type of a NanopbServerReader must match the method.");
+ return {internal::OpenCall<kMethod, MethodType::kClientStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+ }
+
// Allow default construction so that users can declare a variable into which
// to move NanopbServerReaders from RPC calls.
constexpr NanopbServerReader()
@@ -150,6 +192,7 @@
NanopbServerReader(NanopbServerReader&&) = default;
NanopbServerReader& operator=(NanopbServerReader&&) = default;
+ using internal::GenericNanopbResponder::active;
using internal::GenericNanopbResponder::channel_id;
// Functions for setting RPC event callbacks.
@@ -178,6 +221,23 @@
template <typename Response>
class NanopbServerWriter : private internal::GenericNanopbResponder {
public:
+ // Creates a NanopbServerWriter that is ready to send responses for a
+ // particular RPC. This can be used for testing or to send responses to an RPC
+ // that has not been started by a client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static NanopbServerWriter Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ static_assert(
+ std::is_same_v<Response, internal::Response<kMethod>>,
+ "The response type of a NanopbServerWriter must match the method.");
+ return {internal::OpenCall<kMethod, MethodType::kServerStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+ }
+
// Allow default construction so that users can declare a variable into which
// to move ServerWriters from RPC calls.
constexpr NanopbServerWriter()
@@ -186,9 +246,9 @@
NanopbServerWriter(NanopbServerWriter&&) = default;
NanopbServerWriter& operator=(NanopbServerWriter&&) = default;
- using internal::GenericNanopbResponder::open;
-
+ using internal::GenericNanopbResponder::active;
using internal::GenericNanopbResponder::channel_id;
+ using internal::GenericNanopbResponder::open;
// Writes a response struct. Returns the following Status codes:
//
diff --git a/pw_rpc/nanopb/server_reader_writer.cc b/pw_rpc/nanopb/server_reader_writer.cc
index 331f4db..62ab476 100644
--- a/pw_rpc/nanopb/server_reader_writer.cc
+++ b/pw_rpc/nanopb/server_reader_writer.cc
@@ -20,7 +20,7 @@
Status GenericNanopbResponder::SendClientStreamOrResponse(const void* response,
Status* status) {
- if (!open()) {
+ if (!active()) {
return Status::FailedPrecondition();
}
diff --git a/pw_rpc/nanopb/server_reader_writer_test.cc b/pw_rpc/nanopb/server_reader_writer_test.cc
new file mode 100644
index 0000000..ff8066b
--- /dev/null
+++ b/pw_rpc/nanopb/server_reader_writer_test.cc
@@ -0,0 +1,113 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/raw/server_reader_writer.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/nanopb/fake_channel_output.h"
+#include "pw_rpc/service.h"
+#include "pw_rpc_test_protos/test.rpc.pb.h"
+
+namespace pw::rpc {
+
+class TestService final : public test::generated::TestService<TestService> {
+ public:
+ Status TestUnaryRpc(ServerContext&,
+ const pw_rpc_test_TestRequest&,
+ pw_rpc_test_TestResponse&) {
+ return OkStatus();
+ }
+
+ void TestServerStreamRpc(
+ ServerContext&,
+ const pw_rpc_test_TestRequest&,
+ NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
+
+ void TestClientStreamRpc(
+ ServerContext&,
+ NanopbServerReader<pw_rpc_test_TestRequest,
+ pw_rpc_test_TestStreamResponse>&) {}
+
+ void TestBidirectionalStreamRpc(
+ ServerContext&,
+ NanopbServerReaderWriter<pw_rpc_test_TestRequest,
+ pw_rpc_test_TestStreamResponse>&) {}
+};
+
+template <auto kMethod, uint32_t kMethodId>
+struct ReaderWriterTestContext {
+ ReaderWriterTestContext()
+ : output(decltype(output)::
+ template Create<TestService, kMethod, kMethodId>()),
+ channel(Channel::Create<1>(&output)),
+ server(std::span(&channel, 1)) {}
+
+ TestService service;
+ NanopbFakeChannelOutput<internal::Response<kMethod>, 4, 128> output;
+ Channel channel;
+ Server server;
+};
+
+TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
+ ReaderWriterTestContext<&TestService::TestServerStreamRpc,
+ CalculateMethodId("TestServerStreamRpc")>
+ ctx;
+ NanopbServerWriter responder =
+ NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
+ &TestService::TestServerStreamRpc,
+ CalculateMethodId("TestServerStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ responder.Write({.chunk = {}, .number = 321});
+ responder.Finish();
+
+ EXPECT_EQ(ctx.output.last_response().number, 321u);
+ EXPECT_EQ(ctx.output.last_status(), OkStatus());
+}
+
+TEST(NanopbServerReader, Open_ReturnsUsableReader) {
+ ReaderWriterTestContext<&TestService::TestClientStreamRpc,
+ CalculateMethodId("TestClientStreamRpc")>
+ ctx;
+ NanopbServerReader responder =
+ NanopbServerReader<pw_rpc_test_TestRequest,
+ pw_rpc_test_TestStreamResponse>::
+ Open<&TestService::TestClientStreamRpc,
+ CalculateMethodId("TestClientStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ responder.Finish({.chunk = {}, .number = 321});
+
+ EXPECT_EQ(ctx.output.last_response().number, 321u);
+}
+
+TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
+ ReaderWriterTestContext<&TestService::TestBidirectionalStreamRpc,
+ CalculateMethodId("TestBidirectionalStreamRpc")>
+ ctx;
+ NanopbServerReaderWriter responder =
+ NanopbServerReaderWriter<pw_rpc_test_TestRequest,
+ pw_rpc_test_TestStreamResponse>::
+ Open<&TestService::TestBidirectionalStreamRpc,
+ CalculateMethodId("TestBidirectionalStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ responder.Write({.chunk = {}, .number = 321});
+ responder.Finish(Status::NotFound());
+
+ EXPECT_EQ(ctx.output.last_response().number, 321u);
+ EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
+}
+
+} // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 4e8ab7b..a9822c8 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -53,13 +53,20 @@
Call& operator=(const Call&) = delete;
// True if the Call is active and ready to send responses.
- bool open() const { return rpc_state_ == kOpen; }
+ [[nodiscard]] bool active() const { return rpc_state_ == kActive; }
+
+ // DEPRECATED: open() was renamed to active() because it is clearer and does
+ // not conflict with Open() in ReaderWriter classes.
+ // TODO(pwbug/472): Remove the open() method.
+ /* [[deprecated("Renamed to active()")]] */ bool open() const {
+ return active();
+ }
uint32_t channel_id() const { return call_.channel().id(); }
uint32_t service_id() const { return call_.service().id(); }
uint32_t method_id() const;
- // Closes the Call and sends a RESPONSE packet, if it is open. Returns the
+ // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
// status from sending the packet, or FAILED_PRECONDITION if the Call is not
// active.
Status CloseAndSendResponse(std::span<const std::byte> response,
@@ -83,7 +90,7 @@
}
void EndClientStream() {
- client_stream_state_ = kClientStreamClosed;
+ client_stream_state_ = kClientStreamInactive;
#if PW_RPC_CLIENT_STREAM_END_CALLBACK
if (on_client_stream_end_) {
@@ -95,22 +102,22 @@
bool has_client_stream() const { return HasClientStream(type_); }
bool client_stream_open() const {
- return client_stream_state_ == kClientStreamOpen;
+ return client_stream_state_ == kClientStreamActive;
}
protected:
// Creates a Call for a closed RPC.
constexpr Call(MethodType type)
- : rpc_state_(kClosed),
+ : rpc_state_(kInactive),
type_(type),
- client_stream_state_(kClientStreamClosed) {}
+ client_stream_state_(kClientStreamInactive) {}
- // Creates a Call for an open RPC.
+ // Creates a Call for an active RPC.
Call(const CallContext& call, MethodType type);
// Initialize rpc_state_ to closed since move-assignment will check if the
- // Call is open before moving into it.
- Call(Call&& other) : rpc_state_(kClosed) { *this = std::move(other); }
+ // Call is active before moving into it.
+ Call(Call&& other) : rpc_state_(kInactive) { *this = std::move(other); }
Call& operator=(Call&& other);
@@ -143,12 +150,12 @@
constexpr const Channel::OutputBuffer& buffer() const { return response_; }
- // Acquires a buffer into which to write a payload. The Call MUST be open when
- // this is called!
+ // Acquires a buffer into which to write a payload. The Call MUST be active
+ // when this is called!
std::span<std::byte> AcquirePayloadBuffer();
// Releases the buffer, sending a client stream packet with the specified
- // payload. The Call MUST be open when this is called!
+ // payload. The Call MUST be active when this is called!
Status SendPayloadBufferClientStream(std::span<const std::byte> payload);
// Releases the buffer without sending a packet.
@@ -156,7 +163,7 @@
private:
// Removes the RPC from the server & marks as closed. The responder must be
- // open when this is called.
+ // active when this is called.
void Close();
CallContext call_;
@@ -174,9 +181,12 @@
Function<void()> on_client_stream_end_;
#endif // PW_RPC_CLIENT_STREAM_END_CALLBACK
- enum : bool { kClosed, kOpen } rpc_state_;
+ enum : bool { kInactive, kActive } rpc_state_;
MethodType type_;
- enum : bool { kClientStreamClosed, kClientStreamOpen } client_stream_state_;
+ enum : bool {
+ kClientStreamInactive,
+ kClientStreamActive,
+ } client_stream_state_;
};
} // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/open_call.h b/pw_rpc/public/pw_rpc/internal/open_call.h
new file mode 100644
index 0000000..2578b03
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/open_call.h
@@ -0,0 +1,57 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_rpc/internal/call_context.h"
+#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/method.h"
+#include "pw_rpc/method_type.h"
+#include "pw_rpc/server.h"
+#include "pw_rpc/service.h"
+
+namespace pw::rpc::internal {
+
+// Creates a call context for a particular RPC. Unlike the CallContext
+// constructor, this function checks the type of RPC at compile time.
+template <auto kMethod, MethodType kExpected, typename MethodImpl>
+CallContext OpenCall(Server& server,
+ uint32_t channel_id,
+ Service& service,
+ const MethodImpl& method) {
+ if constexpr (kExpected == MethodType::kUnary) {
+ static_assert(internal::MethodTraits<decltype(kMethod)>::kType == kExpected,
+ "ServerResponse objects may only be opened for unary RPCs.");
+ } else if constexpr (kExpected == MethodType::kServerStreaming) {
+ static_assert(
+ MethodTraits<decltype(kMethod)>::kType == kExpected,
+ "ServerWriters may only be opened for server streaming RPCs.");
+ } else if constexpr (kExpected == MethodType::kClientStreaming) {
+ static_assert(
+ MethodTraits<decltype(kMethod)>::kType == kExpected,
+ "ServerReaders may only be opened for client streaming RPCs.");
+ } else if constexpr (kExpected == MethodType::kBidirectionalStreaming) {
+ static_assert(internal::MethodTraits<decltype(kMethod)>::kType == kExpected,
+ "ServerReaderWriters may only be opened for bidirectional "
+ "streaming RPCs.");
+ }
+
+ // TODO(hepler): Update the CallContext to store the ID instead, and lookup
+ // the channel by ID.
+ rpc::Channel* channel = server.GetChannel(channel_id);
+ PW_ASSERT(channel != nullptr);
+
+ return CallContext(server, static_cast<Channel&>(*channel), service, method);
+}
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
index 04dbedf..6d650a4 100644
--- a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
+++ b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
@@ -49,7 +49,7 @@
FakeServerReaderWriter& operator=(FakeServerReaderWriter&&) = default;
// Pull in protected functions from the hidden Call base as needed.
- using Call::open;
+ using Call::active;
using Call::set_on_client_stream_end;
using Call::set_on_error;
using Call::set_on_next;
@@ -84,8 +84,8 @@
FakeServerWriter(FakeServerWriter&&) = default;
// Common reader/writer functions.
+ using FakeServerReaderWriter::active;
using FakeServerReaderWriter::Finish;
- using FakeServerReaderWriter::open;
using FakeServerReaderWriter::set_on_error;
using FakeServerReaderWriter::Write;
@@ -105,8 +105,8 @@
FakeServerReader(FakeServerReader&&) = default;
+ using FakeServerReaderWriter::active;
using FakeServerReaderWriter::as_responder;
- using FakeServerReaderWriter::open;
// Functions for test use.
using FakeServerReaderWriter::PayloadBuffer;
diff --git a/pw_rpc/raw/BUILD.bazel b/pw_rpc/raw/BUILD.bazel
index a0f3cdf..8b11c4c 100644
--- a/pw_rpc/raw/BUILD.bazel
+++ b/pw_rpc/raw/BUILD.bazel
@@ -104,6 +104,18 @@
],
)
+# TODO(hepler): Make this a pw_cc_test when raw RPC generated headers are supported.
+filegroup(
+ name = "server_reader_writer_test",
+ srcs = ["server_reader_writer_test.cc"],
+ # deps = [
+ # "..:test_protos.raw_rpc",
+ # ":test_method_context",
+ # "//pw_rpc:internal_test_utils",
+ # ":method",
+ # ],
+)
+
pw_cc_test(
name = "stub_generation_test",
srcs = ["stub_generation_test.cc"],
diff --git a/pw_rpc/raw/BUILD.gn b/pw_rpc/raw/BUILD.gn
index af01d36..6e0febd 100644
--- a/pw_rpc/raw/BUILD.gn
+++ b/pw_rpc/raw/BUILD.gn
@@ -65,6 +65,7 @@
":codegen_test",
":method_test",
":method_union_test",
+ ":server_reader_writer_test",
":stub_generation_test",
]
}
@@ -101,6 +102,15 @@
sources = [ "method_union_test.cc" ]
}
+pw_test("server_reader_writer_test") {
+ deps = [
+ ":method",
+ ":test_method_context",
+ "..:test_protos.raw_rpc",
+ ]
+ sources = [ "server_reader_writer_test.cc" ]
+}
+
pw_test("stub_generation_test") {
deps = [ "..:test_protos.raw_rpc" ]
sources = [ "stub_generation_test.cc" ]
diff --git a/pw_rpc/raw/method_test.cc b/pw_rpc/raw/method_test.cc
index 99cf38d..0f7eaa8 100644
--- a/pw_rpc/raw/method_test.cc
+++ b/pw_rpc/raw/method_test.cc
@@ -188,7 +188,7 @@
EXPECT_EQ(0u, context.output().packet_count());
EXPECT_EQ(777, last_request.integer);
EXPECT_EQ(2u, last_request.status_code);
- EXPECT_TRUE(last_writer.open());
+ EXPECT_TRUE(last_writer.active());
EXPECT_EQ(OkStatus(), last_writer.Finish());
}
diff --git a/pw_rpc/raw/method_union_test.cc b/pw_rpc/raw/method_union_test.cc
index bf1268e..6be187f 100644
--- a/pw_rpc/raw/method_union_test.cc
+++ b/pw_rpc/raw/method_union_test.cc
@@ -146,7 +146,7 @@
EXPECT_EQ(0u, context.output().packet_count());
EXPECT_EQ(777, last_request.integer);
EXPECT_EQ(2u, last_request.status_code);
- EXPECT_TRUE(last_writer.open());
+ EXPECT_TRUE(last_writer.active());
EXPECT_EQ(OkStatus(), last_writer.Finish());
}
diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
index 91cae74..0f05454 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
@@ -21,6 +21,7 @@
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/call.h"
#include "pw_rpc/internal/method_lookup.h"
+#include "pw_rpc/internal/open_call.h"
#include "pw_rpc/server.h"
namespace pw::rpc {
@@ -50,8 +51,21 @@
RawServerReaderWriter(RawServerReaderWriter&&) = default;
RawServerReaderWriter& operator=(RawServerReaderWriter&&) = default;
- using internal::Call::open;
+ // Creates a RawServerReaderWriter that is ready to send responses for a
+ // particular RPC. This can be used for testing or to send responses to an RPC
+ // that has not been started by a client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static RawServerReaderWriter Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ return {internal::OpenCall<kMethod, MethodType::kBidirectionalStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+ }
+ using internal::Call::active;
using internal::Call::channel_id;
// Functions for setting the callbacks.
@@ -83,6 +97,7 @@
: internal::Call(call, type) {}
using internal::Call::CloseAndSendResponse;
+ using internal::Call::open; // Deprecated; renamed to active()
private:
friend class internal::RawMethod;
@@ -95,14 +110,27 @@
// raw client streaming RPC.
class RawServerReader : private RawServerReaderWriter {
public:
+ // Creates a RawServerReader that is ready to send a response to a particular
+ // RPC. This can be used for testing or to finish an RPC that has not been
+ // started by the client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static RawServerReader Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ return {internal::OpenCall<kMethod, MethodType::kClientStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+ }
+
constexpr RawServerReader()
: RawServerReaderWriter(MethodType::kClientStreaming) {}
RawServerReader(RawServerReader&&) = default;
RawServerReader& operator=(RawServerReader&&) = default;
- using RawServerReaderWriter::open;
-
+ using RawServerReaderWriter::active;
using RawServerReaderWriter::channel_id;
using RawServerReaderWriter::set_on_client_stream_end;
@@ -128,15 +156,29 @@
// The RawServerWriter is used to send responses in a raw server streaming RPC.
class RawServerWriter : private RawServerReaderWriter {
public:
+ // Creates a RawServerWriter that is ready to send responses for a particular
+ // RPC. This can be used for testing or to send responses to an RPC that has
+ // not been started by a client.
+ template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+ [[nodiscard]] static RawServerWriter Open(Server& server,
+ uint32_t channel_id,
+ ServiceImpl& service) {
+ return {internal::OpenCall<kMethod, MethodType::kServerStreaming>(
+ server,
+ channel_id,
+ service,
+ internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+ }
+
constexpr RawServerWriter()
: RawServerReaderWriter(MethodType::kServerStreaming) {}
RawServerWriter(RawServerWriter&&) = default;
RawServerWriter& operator=(RawServerWriter&&) = default;
- using RawServerReaderWriter::open;
-
+ using RawServerReaderWriter::active;
using RawServerReaderWriter::channel_id;
+ using RawServerReaderWriter::open;
using RawServerReaderWriter::set_on_error;
diff --git a/pw_rpc/raw/server_reader_writer.cc b/pw_rpc/raw/server_reader_writer.cc
index 19e0432..fe8672d 100644
--- a/pw_rpc/raw/server_reader_writer.cc
+++ b/pw_rpc/raw/server_reader_writer.cc
@@ -19,7 +19,7 @@
namespace pw::rpc {
Status RawServerReaderWriter::Write(ConstByteSpan response) {
- if (!open()) {
+ if (!active()) {
return Status::FailedPrecondition();
}
diff --git a/pw_rpc/raw/server_reader_writer_test.cc b/pw_rpc/raw/server_reader_writer_test.cc
new file mode 100644
index 0000000..d73fe0c
--- /dev/null
+++ b/pw_rpc/raw/server_reader_writer_test.cc
@@ -0,0 +1,92 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/raw/server_reader_writer.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/raw/fake_channel_output.h"
+#include "pw_rpc/service.h"
+#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+
+namespace pw::rpc {
+
+class TestService final : public test::generated::TestService<TestService> {
+ public:
+ static StatusWithSize TestUnaryRpc(ServerContext&, ConstByteSpan, ByteSpan) {
+ return StatusWithSize(0);
+ }
+
+ void TestServerStreamRpc(ServerContext&, ConstByteSpan, RawServerWriter&) {}
+
+ void TestClientStreamRpc(ServerContext&, RawServerReader&) {}
+
+ void TestBidirectionalStreamRpc(ServerContext&, RawServerReaderWriter&) {}
+};
+
+struct ReaderWriterTestContext {
+ ReaderWriterTestContext(MethodType type)
+ : output(type),
+ channel(Channel::Create<1>(&output)),
+ server(std::span(&channel, 1)) {}
+
+ TestService service;
+ RawFakeChannelOutput<128, 4> output;
+ Channel channel;
+ Server server;
+};
+
+TEST(RawServerWriter, Open_ReturnsUsableWriter) {
+ ReaderWriterTestContext ctx(MethodType::kServerStreaming);
+ RawServerWriter call =
+ RawServerWriter::Open<&TestService::TestServerStreamRpc,
+ CalculateMethodId("TestServerStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ EXPECT_EQ(call.channel_id(), ctx.channel.id());
+ call.Write(std::as_bytes(std::span("321")));
+
+ EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+ "321");
+}
+
+TEST(RawServerReader, Open_ReturnsUsableReader) {
+ ReaderWriterTestContext ctx(MethodType::kClientStreaming);
+ RawServerReader call =
+ RawServerReader::Open<&TestService::TestClientStreamRpc,
+ CalculateMethodId("TestClientStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ EXPECT_EQ(call.channel_id(), ctx.channel.id());
+ call.Finish(std::as_bytes(std::span("This is a message")));
+
+ EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+ "This is a message");
+}
+
+TEST(RawServerReaderWriter, Open_ReturnsUsableReaderWriter) {
+ ReaderWriterTestContext ctx(MethodType::kBidirectionalStreaming);
+ RawServerReaderWriter call =
+ RawServerReaderWriter::Open<&TestService::TestBidirectionalStreamRpc,
+ CalculateMethodId(
+ "TestBidirectionalStreamRpc")>(
+ ctx.server, ctx.channel.id(), ctx.service);
+
+ EXPECT_EQ(call.channel_id(), ctx.channel.id());
+ call.Write(std::as_bytes(std::span("321")));
+
+ EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+ "321");
+}
+
+} // namespace pw::rpc
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index dc23e59..7fe1e6c 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -261,7 +261,7 @@
service_,
service_.method(100)),
responder_(call_) {
- ASSERT_TRUE(responder_.open());
+ ASSERT_TRUE(responder_.active());
}
internal::CallContext call_;
@@ -290,7 +290,7 @@
EXPECT_EQ(OkStatus(),
server_.ProcessPacket(PacketForRpc(PacketType::CANCEL), output_));
- EXPECT_FALSE(responder_.open());
+ EXPECT_FALSE(responder_.active());
}
TEST_F(BidiMethod, Cancel_SendsNoResponse) {
@@ -305,7 +305,7 @@
OkStatus(),
server_.ProcessPacket(PacketForRpc(PacketType::CLIENT_ERROR), output_));
- EXPECT_FALSE(responder_.open());
+ EXPECT_FALSE(responder_.active());
EXPECT_EQ(output_.packet_count(), 0u);
}
@@ -337,7 +337,7 @@
EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
EXPECT_EQ(output_.sent_packet().status(), Status::FailedPrecondition());
- EXPECT_TRUE(responder_.open());
+ EXPECT_TRUE(responder_.active());
}
TEST_F(BidiMethod, Cancel_IncorrectService) {
@@ -349,7 +349,7 @@
EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
EXPECT_EQ(output_.sent_packet().service_id(), 43u);
EXPECT_EQ(output_.sent_packet().method_id(), 100u);
- EXPECT_TRUE(responder_.open());
+ EXPECT_TRUE(responder_.active());
}
TEST_F(BidiMethod, Cancel_IncorrectMethod) {
@@ -358,7 +358,7 @@
output_));
EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
- EXPECT_TRUE(responder_.open());
+ EXPECT_TRUE(responder_.active());
}
TEST_F(BidiMethod, ClientStream_CallsCallback) {
@@ -411,7 +411,7 @@
service_,
service_.method(100)),
responder_(call_) {
- ASSERT_TRUE(responder_.open());
+ ASSERT_TRUE(responder_.active());
}
internal::CallContext call_;