pw_rpc: Support opening a ServerReader/Writer

- Deprecate open() in favor of active(), which isn't ambiguous about
  whether it's a verb or adjective and doesn't conflict with Open().
- Support opening a ServerReader/Writer object server-side, without a
  client calling the RPC. This is helpful for testing and in situations
  when the server needs to send a response to an RPC the client hasn't
  called the RPC yet (e.g. respond to a Reset() RPC after rebooting).

Change-Id: I2c7d3186597f576db2edbead7fd9a8cd407f4983
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/57882
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index c8f7096..69525da 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -51,6 +51,7 @@
         "public/pw_rpc/internal/method.h",
         "public/pw_rpc/internal/method_lookup.h",
         "public/pw_rpc/internal/method_union.h",
+        "public/pw_rpc/internal/open_call.h",
         "server.cc",
         "service.cc",
     ],
@@ -198,6 +199,7 @@
         "nanopb/public/pw_rpc/nanopb/test_method_context.h",
         "nanopb/pw_rpc_nanopb_private/internal_test_utils.h",
         "nanopb/server_reader_writer.cc",
+        "nanopb/server_reader_writer_test.cc",
         "nanopb/stub_generation_test.cc",
     ],
 )
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index bc5f7a1..d0a6346 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -66,6 +66,7 @@
     "public/pw_rpc/internal/method.h",
     "public/pw_rpc/internal/method_lookup.h",
     "public/pw_rpc/internal/method_union.h",
+    "public/pw_rpc/internal/open_call.h",
     "server.cc",
     "service.cc",
   ]
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index 939165a..b7cd614 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -46,10 +46,10 @@
 
 Call::Call(const CallContext& call, MethodType type)
     : call_(call),
-      rpc_state_(kOpen),
+      rpc_state_(kActive),
       type_(type),
-      client_stream_state_(HasClientStream(type) ? kClientStreamOpen
-                                                 : kClientStreamClosed) {
+      client_stream_state_(HasClientStream(type) ? kClientStreamActive
+                                                 : kClientStreamInactive) {
   call_.server().RegisterCall(*this);
 }
 
@@ -62,7 +62,7 @@
   type_ = other.type_;
   client_stream_state_ = other.client_stream_state_;
 
-  if (other.open()) {
+  if (other.active()) {
     other.Close();
     other.call_.server().RegisterCall(*this);
   }
@@ -85,7 +85,7 @@
 
 Status Call::CloseAndSendResponse(std::span<const std::byte> response,
                                   Status status) {
-  if (!open()) {
+  if (!active()) {
     return Status::FailedPrecondition();
   }
 
@@ -107,7 +107,7 @@
 }
 
 std::span<std::byte> Call::AcquirePayloadBuffer() {
-  PW_DCHECK(open());
+  PW_DCHECK(active());
 
   // Only allow having one active buffer at a time.
   if (response_.empty()) {
@@ -118,21 +118,21 @@
 }
 
 Status Call::SendPayloadBufferClientStream(std::span<const std::byte> payload) {
-  PW_DCHECK(open());
+  PW_DCHECK(active());
   return call_.channel().Send(response_, StreamPacket(call_, payload));
 }
 
 void Call::ReleasePayloadBuffer() {
-  PW_DCHECK(open());
+  PW_DCHECK(active());
   call_.channel().Release(response_);
 }
 
 void Call::Close() {
-  PW_DCHECK(open());
+  PW_DCHECK(active());
 
   call_.server().UnregisterCall(*this);
-  rpc_state_ = kClosed;
-  client_stream_state_ = kClientStreamClosed;
+  rpc_state_ = kInactive;
+  client_stream_state_ = kClientStreamInactive;
 }
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index cee015a..7fb6d5f 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -48,7 +48,7 @@
 
   FakeServerWriter writer(context.get());
 
-  EXPECT_TRUE(writer.open());
+  EXPECT_TRUE(writer.active());
 }
 
 TEST(ServerWriter, Move_ClosesOriginal) {
@@ -58,15 +58,15 @@
   FakeServerWriter writer(std::move(moved));
 
 #ifndef __clang_analyzer__
-  EXPECT_FALSE(moved.open());
+  EXPECT_FALSE(moved.active());
 #endif  // ignore use-after-move
-  EXPECT_TRUE(writer.open());
+  EXPECT_TRUE(writer.active());
 }
 
 TEST(ServerWriter, DefaultConstruct_Closed) {
   FakeServerWriter writer;
 
-  EXPECT_FALSE(writer.open());
+  EXPECT_FALSE(writer.active());
 }
 
 TEST(ServerWriter, Construct_RegistersWithServer) {
@@ -121,9 +121,9 @@
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
-  ASSERT_TRUE(writer.open());
+  ASSERT_TRUE(writer.active());
   EXPECT_EQ(OkStatus(), writer.Finish());
-  EXPECT_FALSE(writer.open());
+  EXPECT_FALSE(writer.active());
   EXPECT_EQ(Status::FailedPrecondition(), writer.Finish());
 }
 
@@ -131,12 +131,12 @@
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
-  ASSERT_TRUE(writer.open());
+  ASSERT_TRUE(writer.active());
   auto buffer = writer.PayloadBuffer();
   buffer[0] = std::byte{0};
   EXPECT_FALSE(writer.output_buffer().empty());
   EXPECT_EQ(OkStatus(), writer.Finish());
-  EXPECT_FALSE(writer.open());
+  EXPECT_FALSE(writer.active());
   EXPECT_TRUE(writer.output_buffer().empty());
 }
 
@@ -198,11 +198,11 @@
   ServerContextForTest<TestService> context(TestService::method.method());
   test::FakeServerReader reader(context.get());
 
-  EXPECT_TRUE(reader.as_responder().open());
+  EXPECT_TRUE(reader.as_responder().active());
   EXPECT_TRUE(reader.as_responder().client_stream_open());
   EXPECT_EQ(OkStatus(), reader.as_responder().CloseAndSendResponse(OkStatus()));
 
-  EXPECT_FALSE(reader.as_responder().open());
+  EXPECT_FALSE(reader.as_responder().active());
   EXPECT_FALSE(reader.as_responder().client_stream_open());
 }
 
@@ -210,11 +210,11 @@
   ServerContextForTest<TestService> context(TestService::method.method());
   test::FakeServerReader reader(context.get());
 
-  EXPECT_TRUE(reader.open());
+  EXPECT_TRUE(reader.active());
   EXPECT_TRUE(reader.as_responder().client_stream_open());
   reader.as_responder().EndClientStream();
 
-  EXPECT_TRUE(reader.open());
+  EXPECT_TRUE(reader.active());
   EXPECT_FALSE(reader.as_responder().client_stream_open());
 }
 
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 7188722..c34563c 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -45,6 +45,18 @@
 synchronously handled unary RPC before it completes. The same RPC may be invoked
 multiple times simultaneously if the invocations are on different channels.
 
+Unrequested responses
+---------------------
+``pw_rpc`` supports sending responses to RPCs that have not yet been invoked by
+a client. This is useful in testing and in situations like an RPC that triggers
+reboot. After the reboot, the device opens the writer object and sends its
+response to the client.
+
+.. admonition:: Under construction
+
+  The ``ReaderWriter::Open()`` API is cumbersome, but a more streamlined API
+  will be added to the generated RPC code.
+
 Creating an RPC
 ===============
 
diff --git a/pw_rpc/nanopb/BUILD.gn b/pw_rpc/nanopb/BUILD.gn
index 217c5ef..70aec83 100644
--- a/pw_rpc/nanopb/BUILD.gn
+++ b/pw_rpc/nanopb/BUILD.gn
@@ -115,6 +115,7 @@
     ":method_lookup_test",
     ":method_test",
     ":method_union_test",
+    ":server_reader_writer_test",
     ":stub_generation_test",
   ]
 }
@@ -188,6 +189,16 @@
   enable_if = dir_pw_third_party_nanopb != ""
 }
 
+pw_test("server_reader_writer_test") {
+  deps = [
+    ":method",
+    ":test_method_context",
+    "..:test_protos.nanopb_rpc",
+  ]
+  sources = [ "server_reader_writer_test.cc" ]
+  enable_if = dir_pw_third_party_nanopb != ""
+}
+
 pw_test("stub_generation_test") {
   deps = [ "..:test_protos.nanopb_rpc" ]
   sources = [ "stub_generation_test.cc" ]
diff --git a/pw_rpc/nanopb/method_union_test.cc b/pw_rpc/nanopb/method_union_test.cc
index 2a82cf3..92d3982 100644
--- a/pw_rpc/nanopb/method_union_test.cc
+++ b/pw_rpc/nanopb/method_union_test.cc
@@ -108,7 +108,7 @@
 
   method.Invoke(context.get(), context.request(request));
 
-  EXPECT_TRUE(last_raw_writer.open());
+  EXPECT_TRUE(last_raw_writer.active());
   EXPECT_EQ(OkStatus(), last_raw_writer.Finish());
   EXPECT_EQ(context.output().sent_packet().type(), PacketType::RESPONSE);
 }
@@ -148,7 +148,7 @@
   method.Invoke(context.get(), context.request(request));
 
   EXPECT_EQ(555, last_request.integer);
-  EXPECT_TRUE(last_writer.open());
+  EXPECT_TRUE(last_writer.active());
 
   EXPECT_EQ(OkStatus(), last_writer.Finish());
   EXPECT_EQ(context.output().sent_packet().type(), PacketType::RESPONSE);
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
index d04110e..659e272 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/fake_channel_output.h
@@ -29,11 +29,12 @@
 class NanopbFakeChannelOutput final
     : public internal::test::FakeChannelOutputBuffer<kOutputSize> {
  public:
-  template <auto kMethod, uint32_t kMethodId, typename ServiceType>
+  // Creates a NanopbFakeChannelOutput for the specified method.
+  template <typename ServiceType, auto kMethod, uint32_t kMethodId>
   static NanopbFakeChannelOutput Create() {
     return NanopbFakeChannelOutput(
-        internal::MethodLookup::GetNanopbMethod<ServiceType, kMethodId>(),
-        internal::MethodTraits<decltype(kMethod)>::kType);
+        internal::MethodTraits<decltype(kMethod)>::kType,
+        internal::MethodLookup::GetNanopbMethod<ServiceType, kMethodId>());
   }
 
   // Private constructor, do not use. This constructor is exposed so this class
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index 271c482..303825c 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -20,6 +20,8 @@
 #include "pw_bytes/span.h"
 #include "pw_rpc/channel.h"
 #include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/method_lookup.h"
+#include "pw_rpc/internal/open_call.h"
 #include "pw_rpc/server.h"
 
 namespace pw::rpc {
@@ -93,6 +95,26 @@
 class NanopbServerReaderWriter
     : private internal::BaseNanopbServerReader<Request> {
  public:
+  // Creates a NanopbServerReaderWriter that is ready to send responses for a
+  // particular RPC. This can be used for testing or to send responses to an RPC
+  // that has not been started by a client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static NanopbServerReaderWriter Open(Server& server,
+                                                     uint32_t channel_id,
+                                                     ServiceImpl& service) {
+    static_assert(std::is_same_v<Request, internal::Request<kMethod>>,
+                  "The request type of a NanopbServerReaderWriter must match "
+                  "the method.");
+    static_assert(std::is_same_v<Response, internal::Response<kMethod>>,
+                  "The response type of a NanopbServerReaderWriter must match "
+                  "the method.");
+    return {internal::OpenCall<kMethod, MethodType::kBidirectionalStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+  }
+
   constexpr NanopbServerReaderWriter()
       : internal::BaseNanopbServerReader<Request>(
             MethodType::kBidirectionalStreaming) {}
@@ -100,7 +122,7 @@
   NanopbServerReaderWriter(NanopbServerReaderWriter&&) = default;
   NanopbServerReaderWriter& operator=(NanopbServerReaderWriter&&) = default;
 
-  using internal::GenericNanopbResponder::open;
+  using internal::GenericNanopbResponder::active;
 
   using internal::GenericNanopbResponder::channel_id;
 
@@ -141,6 +163,26 @@
 template <typename Request, typename Response>
 class NanopbServerReader : private internal::BaseNanopbServerReader<Request> {
  public:
+  // Creates a NanopbServerReader that is ready to send a response to a
+  // particular RPC. This can be used for testing or to finish an RPC that has
+  // not been started by the client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static NanopbServerReader Open(Server& server,
+                                               uint32_t channel_id,
+                                               ServiceImpl& service) {
+    static_assert(
+        std::is_same_v<Request, internal::Request<kMethod>>,
+        "The request type of a NanopbServerReader must match the method.");
+    static_assert(
+        std::is_same_v<Response, internal::Response<kMethod>>,
+        "The response type of a NanopbServerReader must match the method.");
+    return {internal::OpenCall<kMethod, MethodType::kClientStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+  }
+
   // Allow default construction so that users can declare a variable into which
   // to move NanopbServerReaders from RPC calls.
   constexpr NanopbServerReader()
@@ -150,6 +192,7 @@
   NanopbServerReader(NanopbServerReader&&) = default;
   NanopbServerReader& operator=(NanopbServerReader&&) = default;
 
+  using internal::GenericNanopbResponder::active;
   using internal::GenericNanopbResponder::channel_id;
 
   // Functions for setting RPC event callbacks.
@@ -178,6 +221,23 @@
 template <typename Response>
 class NanopbServerWriter : private internal::GenericNanopbResponder {
  public:
+  // Creates a NanopbServerWriter that is ready to send responses for a
+  // particular RPC. This can be used for testing or to send responses to an RPC
+  // that has not been started by a client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static NanopbServerWriter Open(Server& server,
+                                               uint32_t channel_id,
+                                               ServiceImpl& service) {
+    static_assert(
+        std::is_same_v<Response, internal::Response<kMethod>>,
+        "The response type of a NanopbServerWriter must match the method.");
+    return {internal::OpenCall<kMethod, MethodType::kServerStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetNanopbMethod<ServiceImpl, kMethodId>())};
+  }
+
   // Allow default construction so that users can declare a variable into which
   // to move ServerWriters from RPC calls.
   constexpr NanopbServerWriter()
@@ -186,9 +246,9 @@
   NanopbServerWriter(NanopbServerWriter&&) = default;
   NanopbServerWriter& operator=(NanopbServerWriter&&) = default;
 
-  using internal::GenericNanopbResponder::open;
-
+  using internal::GenericNanopbResponder::active;
   using internal::GenericNanopbResponder::channel_id;
+  using internal::GenericNanopbResponder::open;
 
   // Writes a response struct. Returns the following Status codes:
   //
diff --git a/pw_rpc/nanopb/server_reader_writer.cc b/pw_rpc/nanopb/server_reader_writer.cc
index 331f4db..62ab476 100644
--- a/pw_rpc/nanopb/server_reader_writer.cc
+++ b/pw_rpc/nanopb/server_reader_writer.cc
@@ -20,7 +20,7 @@
 
 Status GenericNanopbResponder::SendClientStreamOrResponse(const void* response,
                                                           Status* status) {
-  if (!open()) {
+  if (!active()) {
     return Status::FailedPrecondition();
   }
 
diff --git a/pw_rpc/nanopb/server_reader_writer_test.cc b/pw_rpc/nanopb/server_reader_writer_test.cc
new file mode 100644
index 0000000..ff8066b
--- /dev/null
+++ b/pw_rpc/nanopb/server_reader_writer_test.cc
@@ -0,0 +1,113 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/raw/server_reader_writer.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/nanopb/fake_channel_output.h"
+#include "pw_rpc/service.h"
+#include "pw_rpc_test_protos/test.rpc.pb.h"
+
+namespace pw::rpc {
+
+class TestService final : public test::generated::TestService<TestService> {
+ public:
+  Status TestUnaryRpc(ServerContext&,
+                      const pw_rpc_test_TestRequest&,
+                      pw_rpc_test_TestResponse&) {
+    return OkStatus();
+  }
+
+  void TestServerStreamRpc(
+      ServerContext&,
+      const pw_rpc_test_TestRequest&,
+      NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
+
+  void TestClientStreamRpc(
+      ServerContext&,
+      NanopbServerReader<pw_rpc_test_TestRequest,
+                         pw_rpc_test_TestStreamResponse>&) {}
+
+  void TestBidirectionalStreamRpc(
+      ServerContext&,
+      NanopbServerReaderWriter<pw_rpc_test_TestRequest,
+                               pw_rpc_test_TestStreamResponse>&) {}
+};
+
+template <auto kMethod, uint32_t kMethodId>
+struct ReaderWriterTestContext {
+  ReaderWriterTestContext()
+      : output(decltype(output)::
+                   template Create<TestService, kMethod, kMethodId>()),
+        channel(Channel::Create<1>(&output)),
+        server(std::span(&channel, 1)) {}
+
+  TestService service;
+  NanopbFakeChannelOutput<internal::Response<kMethod>, 4, 128> output;
+  Channel channel;
+  Server server;
+};
+
+TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
+  ReaderWriterTestContext<&TestService::TestServerStreamRpc,
+                          CalculateMethodId("TestServerStreamRpc")>
+      ctx;
+  NanopbServerWriter responder =
+      NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
+          &TestService::TestServerStreamRpc,
+          CalculateMethodId("TestServerStreamRpc")>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  responder.Write({.chunk = {}, .number = 321});
+  responder.Finish();
+
+  EXPECT_EQ(ctx.output.last_response().number, 321u);
+  EXPECT_EQ(ctx.output.last_status(), OkStatus());
+}
+
+TEST(NanopbServerReader, Open_ReturnsUsableReader) {
+  ReaderWriterTestContext<&TestService::TestClientStreamRpc,
+                          CalculateMethodId("TestClientStreamRpc")>
+      ctx;
+  NanopbServerReader responder =
+      NanopbServerReader<pw_rpc_test_TestRequest,
+                         pw_rpc_test_TestStreamResponse>::
+          Open<&TestService::TestClientStreamRpc,
+               CalculateMethodId("TestClientStreamRpc")>(
+              ctx.server, ctx.channel.id(), ctx.service);
+
+  responder.Finish({.chunk = {}, .number = 321});
+
+  EXPECT_EQ(ctx.output.last_response().number, 321u);
+}
+
+TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
+  ReaderWriterTestContext<&TestService::TestBidirectionalStreamRpc,
+                          CalculateMethodId("TestBidirectionalStreamRpc")>
+      ctx;
+  NanopbServerReaderWriter responder =
+      NanopbServerReaderWriter<pw_rpc_test_TestRequest,
+                               pw_rpc_test_TestStreamResponse>::
+          Open<&TestService::TestBidirectionalStreamRpc,
+               CalculateMethodId("TestBidirectionalStreamRpc")>(
+              ctx.server, ctx.channel.id(), ctx.service);
+
+  responder.Write({.chunk = {}, .number = 321});
+  responder.Finish(Status::NotFound());
+
+  EXPECT_EQ(ctx.output.last_response().number, 321u);
+  EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
+}
+
+}  // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 4e8ab7b..a9822c8 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -53,13 +53,20 @@
   Call& operator=(const Call&) = delete;
 
   // True if the Call is active and ready to send responses.
-  bool open() const { return rpc_state_ == kOpen; }
+  [[nodiscard]] bool active() const { return rpc_state_ == kActive; }
+
+  // DEPRECATED: open() was renamed to active() because it is clearer and does
+  //     not conflict with Open() in ReaderWriter classes.
+  // TODO(pwbug/472): Remove the open() method.
+  /* [[deprecated("Renamed to active()")]] */ bool open() const {
+    return active();
+  }
 
   uint32_t channel_id() const { return call_.channel().id(); }
   uint32_t service_id() const { return call_.service().id(); }
   uint32_t method_id() const;
 
-  // Closes the Call and sends a RESPONSE packet, if it is open. Returns the
+  // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
   // status from sending the packet, or FAILED_PRECONDITION if the Call is not
   // active.
   Status CloseAndSendResponse(std::span<const std::byte> response,
@@ -83,7 +90,7 @@
   }
 
   void EndClientStream() {
-    client_stream_state_ = kClientStreamClosed;
+    client_stream_state_ = kClientStreamInactive;
 
 #if PW_RPC_CLIENT_STREAM_END_CALLBACK
     if (on_client_stream_end_) {
@@ -95,22 +102,22 @@
   bool has_client_stream() const { return HasClientStream(type_); }
 
   bool client_stream_open() const {
-    return client_stream_state_ == kClientStreamOpen;
+    return client_stream_state_ == kClientStreamActive;
   }
 
  protected:
   // Creates a Call for a closed RPC.
   constexpr Call(MethodType type)
-      : rpc_state_(kClosed),
+      : rpc_state_(kInactive),
         type_(type),
-        client_stream_state_(kClientStreamClosed) {}
+        client_stream_state_(kClientStreamInactive) {}
 
-  // Creates a Call for an open RPC.
+  // Creates a Call for an active RPC.
   Call(const CallContext& call, MethodType type);
 
   // Initialize rpc_state_ to closed since move-assignment will check if the
-  // Call is open before moving into it.
-  Call(Call&& other) : rpc_state_(kClosed) { *this = std::move(other); }
+  // Call is active before moving into it.
+  Call(Call&& other) : rpc_state_(kInactive) { *this = std::move(other); }
 
   Call& operator=(Call&& other);
 
@@ -143,12 +150,12 @@
 
   constexpr const Channel::OutputBuffer& buffer() const { return response_; }
 
-  // Acquires a buffer into which to write a payload. The Call MUST be open when
-  // this is called!
+  // Acquires a buffer into which to write a payload. The Call MUST be active
+  // when this is called!
   std::span<std::byte> AcquirePayloadBuffer();
 
   // Releases the buffer, sending a client stream packet with the specified
-  // payload. The Call MUST be open when this is called!
+  // payload. The Call MUST be active when this is called!
   Status SendPayloadBufferClientStream(std::span<const std::byte> payload);
 
   // Releases the buffer without sending a packet.
@@ -156,7 +163,7 @@
 
  private:
   // Removes the RPC from the server & marks as closed. The responder must be
-  // open when this is called.
+  // active when this is called.
   void Close();
 
   CallContext call_;
@@ -174,9 +181,12 @@
   Function<void()> on_client_stream_end_;
 #endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
 
-  enum : bool { kClosed, kOpen } rpc_state_;
+  enum : bool { kInactive, kActive } rpc_state_;
   MethodType type_;
-  enum : bool { kClientStreamClosed, kClientStreamOpen } client_stream_state_;
+  enum : bool {
+    kClientStreamInactive,
+    kClientStreamActive,
+  } client_stream_state_;
 };
 
 }  // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/open_call.h b/pw_rpc/public/pw_rpc/internal/open_call.h
new file mode 100644
index 0000000..2578b03
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/open_call.h
@@ -0,0 +1,57 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_rpc/internal/call_context.h"
+#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/method.h"
+#include "pw_rpc/method_type.h"
+#include "pw_rpc/server.h"
+#include "pw_rpc/service.h"
+
+namespace pw::rpc::internal {
+
+// Creates a call context for a particular RPC. Unlike the CallContext
+// constructor, this function checks the type of RPC at compile time.
+template <auto kMethod, MethodType kExpected, typename MethodImpl>
+CallContext OpenCall(Server& server,
+                     uint32_t channel_id,
+                     Service& service,
+                     const MethodImpl& method) {
+  if constexpr (kExpected == MethodType::kUnary) {
+    static_assert(internal::MethodTraits<decltype(kMethod)>::kType == kExpected,
+                  "ServerResponse objects may only be opened for unary RPCs.");
+  } else if constexpr (kExpected == MethodType::kServerStreaming) {
+    static_assert(
+        MethodTraits<decltype(kMethod)>::kType == kExpected,
+        "ServerWriters may only be opened for server streaming RPCs.");
+  } else if constexpr (kExpected == MethodType::kClientStreaming) {
+    static_assert(
+        MethodTraits<decltype(kMethod)>::kType == kExpected,
+        "ServerReaders may only be opened for client streaming RPCs.");
+  } else if constexpr (kExpected == MethodType::kBidirectionalStreaming) {
+    static_assert(internal::MethodTraits<decltype(kMethod)>::kType == kExpected,
+                  "ServerReaderWriters may only be opened for bidirectional "
+                  "streaming RPCs.");
+  }
+
+  // TODO(hepler): Update the CallContext to store the ID instead, and lookup
+  //     the channel by ID.
+  rpc::Channel* channel = server.GetChannel(channel_id);
+  PW_ASSERT(channel != nullptr);
+
+  return CallContext(server, static_cast<Channel&>(*channel), service, method);
+}
+
+}  // namespace pw::rpc::internal
diff --git a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
index 04dbedf..6d650a4 100644
--- a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
+++ b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
@@ -49,7 +49,7 @@
   FakeServerReaderWriter& operator=(FakeServerReaderWriter&&) = default;
 
   // Pull in protected functions from the hidden Call base as needed.
-  using Call::open;
+  using Call::active;
   using Call::set_on_client_stream_end;
   using Call::set_on_error;
   using Call::set_on_next;
@@ -84,8 +84,8 @@
   FakeServerWriter(FakeServerWriter&&) = default;
 
   // Common reader/writer functions.
+  using FakeServerReaderWriter::active;
   using FakeServerReaderWriter::Finish;
-  using FakeServerReaderWriter::open;
   using FakeServerReaderWriter::set_on_error;
   using FakeServerReaderWriter::Write;
 
@@ -105,8 +105,8 @@
 
   FakeServerReader(FakeServerReader&&) = default;
 
+  using FakeServerReaderWriter::active;
   using FakeServerReaderWriter::as_responder;
-  using FakeServerReaderWriter::open;
 
   // Functions for test use.
   using FakeServerReaderWriter::PayloadBuffer;
diff --git a/pw_rpc/raw/BUILD.bazel b/pw_rpc/raw/BUILD.bazel
index a0f3cdf..8b11c4c 100644
--- a/pw_rpc/raw/BUILD.bazel
+++ b/pw_rpc/raw/BUILD.bazel
@@ -104,6 +104,18 @@
     ],
 )
 
+# TODO(hepler): Make this a pw_cc_test when raw RPC generated headers are supported.
+filegroup(
+    name = "server_reader_writer_test",
+    srcs = ["server_reader_writer_test.cc"],
+    # deps = [
+    #     "..:test_protos.raw_rpc",
+    #     ":test_method_context",
+    #     "//pw_rpc:internal_test_utils",
+    #     ":method",
+    # ],
+)
+
 pw_cc_test(
     name = "stub_generation_test",
     srcs = ["stub_generation_test.cc"],
diff --git a/pw_rpc/raw/BUILD.gn b/pw_rpc/raw/BUILD.gn
index af01d36..6e0febd 100644
--- a/pw_rpc/raw/BUILD.gn
+++ b/pw_rpc/raw/BUILD.gn
@@ -65,6 +65,7 @@
     ":codegen_test",
     ":method_test",
     ":method_union_test",
+    ":server_reader_writer_test",
     ":stub_generation_test",
   ]
 }
@@ -101,6 +102,15 @@
   sources = [ "method_union_test.cc" ]
 }
 
+pw_test("server_reader_writer_test") {
+  deps = [
+    ":method",
+    ":test_method_context",
+    "..:test_protos.raw_rpc",
+  ]
+  sources = [ "server_reader_writer_test.cc" ]
+}
+
 pw_test("stub_generation_test") {
   deps = [ "..:test_protos.raw_rpc" ]
   sources = [ "stub_generation_test.cc" ]
diff --git a/pw_rpc/raw/method_test.cc b/pw_rpc/raw/method_test.cc
index 99cf38d..0f7eaa8 100644
--- a/pw_rpc/raw/method_test.cc
+++ b/pw_rpc/raw/method_test.cc
@@ -188,7 +188,7 @@
   EXPECT_EQ(0u, context.output().packet_count());
   EXPECT_EQ(777, last_request.integer);
   EXPECT_EQ(2u, last_request.status_code);
-  EXPECT_TRUE(last_writer.open());
+  EXPECT_TRUE(last_writer.active());
   EXPECT_EQ(OkStatus(), last_writer.Finish());
 }
 
diff --git a/pw_rpc/raw/method_union_test.cc b/pw_rpc/raw/method_union_test.cc
index bf1268e..6be187f 100644
--- a/pw_rpc/raw/method_union_test.cc
+++ b/pw_rpc/raw/method_union_test.cc
@@ -146,7 +146,7 @@
   EXPECT_EQ(0u, context.output().packet_count());
   EXPECT_EQ(777, last_request.integer);
   EXPECT_EQ(2u, last_request.status_code);
-  EXPECT_TRUE(last_writer.open());
+  EXPECT_TRUE(last_writer.active());
   EXPECT_EQ(OkStatus(), last_writer.Finish());
 }
 
diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
index 91cae74..0f05454 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
@@ -21,6 +21,7 @@
 #include "pw_rpc/channel.h"
 #include "pw_rpc/internal/call.h"
 #include "pw_rpc/internal/method_lookup.h"
+#include "pw_rpc/internal/open_call.h"
 #include "pw_rpc/server.h"
 
 namespace pw::rpc {
@@ -50,8 +51,21 @@
   RawServerReaderWriter(RawServerReaderWriter&&) = default;
   RawServerReaderWriter& operator=(RawServerReaderWriter&&) = default;
 
-  using internal::Call::open;
+  // Creates a RawServerReaderWriter that is ready to send responses for a
+  // particular RPC. This can be used for testing or to send responses to an RPC
+  // that has not been started by a client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static RawServerReaderWriter Open(Server& server,
+                                                  uint32_t channel_id,
+                                                  ServiceImpl& service) {
+    return {internal::OpenCall<kMethod, MethodType::kBidirectionalStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+  }
 
+  using internal::Call::active;
   using internal::Call::channel_id;
 
   // Functions for setting the callbacks.
@@ -83,6 +97,7 @@
       : internal::Call(call, type) {}
 
   using internal::Call::CloseAndSendResponse;
+  using internal::Call::open;  // Deprecated; renamed to active()
 
  private:
   friend class internal::RawMethod;
@@ -95,14 +110,27 @@
 // raw client streaming RPC.
 class RawServerReader : private RawServerReaderWriter {
  public:
+  // Creates a RawServerReader that is ready to send a response to a particular
+  // RPC. This can be used for testing or to finish an RPC that has not been
+  // started by the client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static RawServerReader Open(Server& server,
+                                            uint32_t channel_id,
+                                            ServiceImpl& service) {
+    return {internal::OpenCall<kMethod, MethodType::kClientStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+  }
+
   constexpr RawServerReader()
       : RawServerReaderWriter(MethodType::kClientStreaming) {}
 
   RawServerReader(RawServerReader&&) = default;
   RawServerReader& operator=(RawServerReader&&) = default;
 
-  using RawServerReaderWriter::open;
-
+  using RawServerReaderWriter::active;
   using RawServerReaderWriter::channel_id;
 
   using RawServerReaderWriter::set_on_client_stream_end;
@@ -128,15 +156,29 @@
 // The RawServerWriter is used to send responses in a raw server streaming RPC.
 class RawServerWriter : private RawServerReaderWriter {
  public:
+  // Creates a RawServerWriter that is ready to send responses for a particular
+  // RPC. This can be used for testing or to send responses to an RPC that has
+  // not been started by a client.
+  template <auto kMethod, uint32_t kMethodId, typename ServiceImpl>
+  [[nodiscard]] static RawServerWriter Open(Server& server,
+                                            uint32_t channel_id,
+                                            ServiceImpl& service) {
+    return {internal::OpenCall<kMethod, MethodType::kServerStreaming>(
+        server,
+        channel_id,
+        service,
+        internal::MethodLookup::GetRawMethod<ServiceImpl, kMethodId>())};
+  }
+
   constexpr RawServerWriter()
       : RawServerReaderWriter(MethodType::kServerStreaming) {}
 
   RawServerWriter(RawServerWriter&&) = default;
   RawServerWriter& operator=(RawServerWriter&&) = default;
 
-  using RawServerReaderWriter::open;
-
+  using RawServerReaderWriter::active;
   using RawServerReaderWriter::channel_id;
+  using RawServerReaderWriter::open;
 
   using RawServerReaderWriter::set_on_error;
 
diff --git a/pw_rpc/raw/server_reader_writer.cc b/pw_rpc/raw/server_reader_writer.cc
index 19e0432..fe8672d 100644
--- a/pw_rpc/raw/server_reader_writer.cc
+++ b/pw_rpc/raw/server_reader_writer.cc
@@ -19,7 +19,7 @@
 namespace pw::rpc {
 
 Status RawServerReaderWriter::Write(ConstByteSpan response) {
-  if (!open()) {
+  if (!active()) {
     return Status::FailedPrecondition();
   }
 
diff --git a/pw_rpc/raw/server_reader_writer_test.cc b/pw_rpc/raw/server_reader_writer_test.cc
new file mode 100644
index 0000000..d73fe0c
--- /dev/null
+++ b/pw_rpc/raw/server_reader_writer_test.cc
@@ -0,0 +1,92 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/raw/server_reader_writer.h"
+
+#include "gtest/gtest.h"
+#include "pw_rpc/raw/fake_channel_output.h"
+#include "pw_rpc/service.h"
+#include "pw_rpc_test_protos/test.raw_rpc.pb.h"
+
+namespace pw::rpc {
+
+class TestService final : public test::generated::TestService<TestService> {
+ public:
+  static StatusWithSize TestUnaryRpc(ServerContext&, ConstByteSpan, ByteSpan) {
+    return StatusWithSize(0);
+  }
+
+  void TestServerStreamRpc(ServerContext&, ConstByteSpan, RawServerWriter&) {}
+
+  void TestClientStreamRpc(ServerContext&, RawServerReader&) {}
+
+  void TestBidirectionalStreamRpc(ServerContext&, RawServerReaderWriter&) {}
+};
+
+struct ReaderWriterTestContext {
+  ReaderWriterTestContext(MethodType type)
+      : output(type),
+        channel(Channel::Create<1>(&output)),
+        server(std::span(&channel, 1)) {}
+
+  TestService service;
+  RawFakeChannelOutput<128, 4> output;
+  Channel channel;
+  Server server;
+};
+
+TEST(RawServerWriter, Open_ReturnsUsableWriter) {
+  ReaderWriterTestContext ctx(MethodType::kServerStreaming);
+  RawServerWriter call =
+      RawServerWriter::Open<&TestService::TestServerStreamRpc,
+                            CalculateMethodId("TestServerStreamRpc")>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  EXPECT_EQ(call.channel_id(), ctx.channel.id());
+  call.Write(std::as_bytes(std::span("321")));
+
+  EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+               "321");
+}
+
+TEST(RawServerReader, Open_ReturnsUsableReader) {
+  ReaderWriterTestContext ctx(MethodType::kClientStreaming);
+  RawServerReader call =
+      RawServerReader::Open<&TestService::TestClientStreamRpc,
+                            CalculateMethodId("TestClientStreamRpc")>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  EXPECT_EQ(call.channel_id(), ctx.channel.id());
+  call.Finish(std::as_bytes(std::span("This is a message")));
+
+  EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+               "This is a message");
+}
+
+TEST(RawServerReaderWriter, Open_ReturnsUsableReaderWriter) {
+  ReaderWriterTestContext ctx(MethodType::kBidirectionalStreaming);
+  RawServerReaderWriter call =
+      RawServerReaderWriter::Open<&TestService::TestBidirectionalStreamRpc,
+                                  CalculateMethodId(
+                                      "TestBidirectionalStreamRpc")>(
+          ctx.server, ctx.channel.id(), ctx.service);
+
+  EXPECT_EQ(call.channel_id(), ctx.channel.id());
+  call.Write(std::as_bytes(std::span("321")));
+
+  EXPECT_STREQ(reinterpret_cast<const char*>(ctx.output.last_response().data()),
+               "321");
+}
+
+}  // namespace pw::rpc
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index dc23e59..7fe1e6c 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -261,7 +261,7 @@
               service_,
               service_.method(100)),
         responder_(call_) {
-    ASSERT_TRUE(responder_.open());
+    ASSERT_TRUE(responder_.active());
   }
 
   internal::CallContext call_;
@@ -290,7 +290,7 @@
   EXPECT_EQ(OkStatus(),
             server_.ProcessPacket(PacketForRpc(PacketType::CANCEL), output_));
 
-  EXPECT_FALSE(responder_.open());
+  EXPECT_FALSE(responder_.active());
 }
 
 TEST_F(BidiMethod, Cancel_SendsNoResponse) {
@@ -305,7 +305,7 @@
       OkStatus(),
       server_.ProcessPacket(PacketForRpc(PacketType::CLIENT_ERROR), output_));
 
-  EXPECT_FALSE(responder_.open());
+  EXPECT_FALSE(responder_.active());
   EXPECT_EQ(output_.packet_count(), 0u);
 }
 
@@ -337,7 +337,7 @@
 
   EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
   EXPECT_EQ(output_.sent_packet().status(), Status::FailedPrecondition());
-  EXPECT_TRUE(responder_.open());
+  EXPECT_TRUE(responder_.active());
 }
 
 TEST_F(BidiMethod, Cancel_IncorrectService) {
@@ -349,7 +349,7 @@
   EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
   EXPECT_EQ(output_.sent_packet().service_id(), 43u);
   EXPECT_EQ(output_.sent_packet().method_id(), 100u);
-  EXPECT_TRUE(responder_.open());
+  EXPECT_TRUE(responder_.active());
 }
 
 TEST_F(BidiMethod, Cancel_IncorrectMethod) {
@@ -358,7 +358,7 @@
                                   output_));
   EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
   EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
-  EXPECT_TRUE(responder_.open());
+  EXPECT_TRUE(responder_.active());
 }
 
 TEST_F(BidiMethod, ClientStream_CallsCallback) {
@@ -411,7 +411,7 @@
               service_,
               service_.method(100)),
         responder_(call_) {
-    ASSERT_TRUE(responder_.open());
+    ASSERT_TRUE(responder_.active());
   }
 
   internal::CallContext call_;