pw_rpc: Prepare Call for use by both server & client

- Introduce ServerCall and ClientCall classes for functionality not
  shared between client and server calls.
- Move the on_client_stream_end_ / on_completed_ callbacks to ServerCall
  and ClientCall classes, since these callbacks differ between server
  and client.
- Use CallContext only to initialize calls. Do not store the CallContext
  in the Call class, since the method and service are not needed in the
  call and are not relevant for the client.
- Move prior call cancellation from the Server to the RegisterCall
  function in Endpoint. This correctly cancels prior calls when using
  the Open API, and the code will be shared between server & client.
- Expand and update raw and Nanopb method tests.
- Remove redundant Method argument from the Method Invoker function. The
  method is included in the CallContext.

Change-Id: I6ff46d2a22028d05acd1bbc15e925def1dcbc263
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61796
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index 5913171..fc5b8b5 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -158,8 +158,9 @@
   rpc::RawServerWriter second_writer =
       rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
           server, drain_id, log_service);
+  ASSERT_FALSE(writer.open());
   ASSERT_TRUE(second_writer.open());
-  EXPECT_EQ(drain.Open(second_writer), Status::AlreadyExists());
+  EXPECT_EQ(drain.Open(second_writer), OkStatus());
 }
 
 }  // namespace
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index 083c106..833146b 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -15,33 +15,25 @@
 #include "pw_rpc/internal/call.h"
 
 #include "pw_assert/check.h"
+#include "pw_rpc/internal/endpoint.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/server.h"
 
 namespace pw::rpc::internal {
-namespace {
-
-Packet StreamPacket(const CallContext& call,
-                    std::span<const std::byte> payload) {
-  return Packet(PacketType::SERVER_STREAM,
-                call.channel().id(),
-                call.service().id(),
-                call.method().id(),
-                payload);
-}
-
-}  // namespace
 
 Call::Call(const CallContext& call, MethodType type)
-    : call_(call),
+    : endpoint_(&call.endpoint()),
+      channel_(&call.channel()),
+      service_id_(call.service().id()),
+      method_id_(call.method().id()),
       rpc_state_(kActive),
       type_(type),
       client_stream_state_(HasClientStream(type) ? kClientStreamActive
                                                  : kClientStreamInactive) {
-  call_.server().RegisterCall(*this);
+  endpoint().RegisterCall(*this);
 }
 
-Call& Call::operator=(Call&& other) {
+void Call::MoveFrom(Call& other) {
   // If this RPC was running, complete it before moving in the other RPC.
   CloseAndSendResponse(OkStatus()).IgnoreError();
 
@@ -50,27 +42,25 @@
   type_ = other.type_;
   client_stream_state_ = other.client_stream_state_;
 
+  endpoint_ = other.endpoint_;
+  channel_ = other.channel_;
+  service_id_ = other.service_id_;
+  method_id_ = other.method_id_;
+
   if (other.active()) {
     other.Close();
-    other.call_.server().RegisterCall(*this);
+
+    // This call is known to be unique since the other call was just closed.
+    endpoint().RegisterUniqueCall(*this);
   }
 
   // Move the rest of the member variables.
-  call_ = std::move(other.call_);
   response_ = std::move(other.response_);
 
   on_error_ = std::move(other.on_error_);
   on_next_ = std::move(other.on_next_);
-
-#if PW_RPC_CLIENT_STREAM_END_CALLBACK
-  on_client_stream_end_ = std::move(other.on_client_stream_end_);
-#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
-
-  return *this;
 }
 
-uint32_t Call::method_id() const { return call_.method().id(); }
-
 Status Call::CloseAndSendFinalPacket(PacketType type,
                                      std::span<const std::byte> payload,
                                      Status status) {
@@ -82,12 +72,12 @@
 
   // Acquire a buffer to use for the outgoing packet if none is available.
   if (response_.empty()) {
-    response_ = call_.channel().AcquireBuffer();
+    response_ = channel().AcquireBuffer();
   }
 
   // Send a packet indicating that the RPC has terminated and optionally
   // containing the final payload.
-  packet_status = call_.channel().Send(
+  packet_status = channel().Send(
       response_,
       Packet(type, channel_id(), service_id(), method_id(), payload, status));
 
@@ -101,28 +91,46 @@
 
   // Only allow having one active buffer at a time.
   if (response_.empty()) {
-    response_ = call_.channel().AcquireBuffer();
+    response_ = channel().AcquireBuffer();
   }
 
-  return response_.payload(StreamPacket(call_, {}));
+  return response_.payload(StreamPacket({}));
 }
 
 Status Call::SendPayloadBufferClientStream(std::span<const std::byte> payload) {
   PW_DCHECK(active());
-  return call_.channel().Send(response_, StreamPacket(call_, payload));
+  return channel().Send(response_, StreamPacket(payload));
 }
 
 void Call::ReleasePayloadBuffer() {
   PW_DCHECK(active());
-  call_.channel().Release(response_);
+  channel().Release(response_);
 }
 
 void Call::Close() {
   PW_DCHECK(active());
 
-  call_.server().UnregisterCall(*this);
+  endpoint().UnregisterCall(*this);
   rpc_state_ = kInactive;
   client_stream_state_ = kClientStreamInactive;
 }
 
+ServerCall& ServerCall::operator=(ServerCall&& other) {
+  MoveFrom(other);
+
+#if PW_RPC_CLIENT_STREAM_END_CALLBACK
+  on_client_stream_end_ = std::move(other.on_client_stream_end_);
+#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+
+  return *this;
+}
+
+ClientCall& ClientCall::operator=(ClientCall&& other) {
+  MoveFrom(other);
+
+  on_completed_ = std::move(other.on_completed_);
+
+  return *this;
+}
+
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index 7fb6d5f..c09f5de 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -168,42 +168,43 @@
 
 TEST(ServerWriter, DefaultConstructor_NoClientStream) {
   FakeServerWriter writer;
-  EXPECT_FALSE(writer.as_responder().has_client_stream());
-  EXPECT_FALSE(writer.as_responder().client_stream_open());
+  EXPECT_FALSE(writer.as_server_call().has_client_stream());
+  EXPECT_FALSE(writer.as_server_call().client_stream_open());
 }
 
 TEST(ServerWriter, Open_NoClientStream) {
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
-  EXPECT_FALSE(writer.as_responder().has_client_stream());
-  EXPECT_FALSE(writer.as_responder().client_stream_open());
+  EXPECT_FALSE(writer.as_server_call().has_client_stream());
+  EXPECT_FALSE(writer.as_server_call().client_stream_open());
 }
 
 TEST(ServerReader, DefaultConstructor_ClientStreamClosed) {
   test::FakeServerReader reader;
-  EXPECT_TRUE(reader.as_responder().has_client_stream());
-  EXPECT_FALSE(reader.as_responder().client_stream_open());
+  EXPECT_TRUE(reader.as_server_call().has_client_stream());
+  EXPECT_FALSE(reader.as_server_call().client_stream_open());
 }
 
 TEST(ServerReader, Open_ClientStreamStartsOpen) {
   ServerContextForTest<TestService> context(TestService::method.method());
   test::FakeServerReader reader(context.get());
 
-  EXPECT_TRUE(reader.as_responder().has_client_stream());
-  EXPECT_TRUE(reader.as_responder().client_stream_open());
+  EXPECT_TRUE(reader.as_server_call().has_client_stream());
+  EXPECT_TRUE(reader.as_server_call().client_stream_open());
 }
 
 TEST(ServerReader, Close_ClosesClientStream) {
   ServerContextForTest<TestService> context(TestService::method.method());
   test::FakeServerReader reader(context.get());
 
-  EXPECT_TRUE(reader.as_responder().active());
-  EXPECT_TRUE(reader.as_responder().client_stream_open());
-  EXPECT_EQ(OkStatus(), reader.as_responder().CloseAndSendResponse(OkStatus()));
+  EXPECT_TRUE(reader.as_server_call().active());
+  EXPECT_TRUE(reader.as_server_call().client_stream_open());
+  EXPECT_EQ(OkStatus(),
+            reader.as_server_call().CloseAndSendResponse(OkStatus()));
 
-  EXPECT_FALSE(reader.as_responder().active());
-  EXPECT_FALSE(reader.as_responder().client_stream_open());
+  EXPECT_FALSE(reader.as_server_call().active());
+  EXPECT_FALSE(reader.as_server_call().client_stream_open());
 }
 
 TEST(ServerReader, HandleClientStream_OnlyClosesClientStream) {
@@ -211,11 +212,11 @@
   test::FakeServerReader reader(context.get());
 
   EXPECT_TRUE(reader.active());
-  EXPECT_TRUE(reader.as_responder().client_stream_open());
-  reader.as_responder().EndClientStream();
+  EXPECT_TRUE(reader.as_server_call().client_stream_open());
+  reader.as_server_call().EndClientStream();
 
   EXPECT_TRUE(reader.active());
-  EXPECT_FALSE(reader.as_responder().client_stream_open());
+  EXPECT_FALSE(reader.as_server_call().client_stream_open());
 }
 
 TEST(ServerReaderWriter, Move_MaintainsClientStream) {
@@ -223,11 +224,11 @@
   test::FakeServerReaderWriter reader_writer(context.get());
   test::FakeServerReaderWriter destination;
 
-  EXPECT_FALSE(destination.as_responder().client_stream_open());
+  EXPECT_FALSE(destination.as_server_call().client_stream_open());
 
   destination = std::move(reader_writer);
-  EXPECT_TRUE(destination.as_responder().has_client_stream());
-  EXPECT_TRUE(destination.as_responder().client_stream_open());
+  EXPECT_TRUE(destination.as_server_call().has_client_stream());
+  EXPECT_TRUE(destination.as_server_call().client_stream_open());
 }
 
 TEST(ServerReaderWriter, Move_MovesCallbacks) {
@@ -243,9 +244,9 @@
 #endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
 
   test::FakeServerReaderWriter destination(std::move(reader_writer));
-  destination.as_responder().HandleClientStream({});
-  destination.as_responder().EndClientStream();
-  destination.as_responder().HandleError(Status::Unknown());
+  destination.as_server_call().HandleClientStream({});
+  destination.as_server_call().EndClientStream();
+  destination.as_server_call().HandleError(Status::Unknown());
 
   EXPECT_EQ(calls, 2 + PW_RPC_CLIENT_STREAM_END_CALLBACK);
 }
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
index a3886f5..fa765e3 100644
--- a/pw_rpc/endpoint.cc
+++ b/pw_rpc/endpoint.cc
@@ -54,15 +54,15 @@
   return result;
 }
 
-Call* Endpoint::FindCall(const Packet& packet) {
-  for (Call& call : calls_) {
-    if (packet.channel_id() == call.channel_id() &&
-        packet.service_id() == call.service_id() &&
-        packet.method_id() == call.method_id()) {
-      return &call;
-    }
+void Endpoint::RegisterCall(Call& call) {
+  Call* const existing_call =
+      FindCallById(call.channel_id(), call.service_id(), call.method_id());
+
+  if (existing_call != nullptr) {
+    existing_call->HandleError(Status::Cancelled());
   }
-  return nullptr;
+
+  RegisterUniqueCall(call);
 }
 
 Channel* Endpoint::GetInternalChannel(uint32_t id) const {
@@ -85,4 +85,16 @@
   return channel;
 }
 
+Call* Endpoint::FindCallById(uint32_t channel_id,
+                             uint32_t service_id,
+                             uint32_t method_id) {
+  for (Call& call : calls_) {
+    if (channel_id == call.channel_id() && service_id == call.service_id() &&
+        method_id == call.method_id()) {
+      return &call;
+    }
+  }
+  return nullptr;
+}
+
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/method_test.cc b/pw_rpc/method_test.cc
index a639260..77b07d2 100644
--- a/pw_rpc/method_test.cc
+++ b/pw_rpc/method_test.cc
@@ -48,15 +48,23 @@
 TEST(Method, Id) { EXPECT_EQ(kTestMethod.id(), 1234u); }
 
 TEST(Method, Invoke) {
-  Channel channel(123, nullptr);
+  class NullChannelOutput final : public ChannelOutput {
+   public:
+    constexpr NullChannelOutput() : ChannelOutput("NullChannelOutput") {}
+
+    ByteSpan AcquireBuffer() override { return {}; }
+    Status SendAndReleaseBuffer(ConstByteSpan) override { return OkStatus(); }
+  } channel_output;
+
+  Channel channel(123, &channel_output);
   Server server(std::span(static_cast<rpc::Channel*>(&channel), 1));
   TestService service;
 
-  CallContext call(server, channel, service, kTestMethod);
+  const CallContext context(server, channel, service, kTestMethod);
   Packet empty_packet;
 
   EXPECT_EQ(kTestMethod.invocations(), 0u);
-  kTestMethod.Invoke(call, empty_packet);
+  kTestMethod.Invoke(context, empty_packet);
   EXPECT_EQ(kTestMethod.invocations(), 1u);
 }
 
diff --git a/pw_rpc/nanopb/method.cc b/pw_rpc/nanopb/method.cc
index a3e4014..a763a25 100644
--- a/pw_rpc/nanopb/method.cc
+++ b/pw_rpc/nanopb/method.cc
@@ -25,30 +25,30 @@
 
 namespace internal {
 
-void NanopbMethod::CallSynchronousUnary(CallContext& call,
+void NanopbMethod::CallSynchronousUnary(const CallContext& context,
                                         const Packet& request,
                                         void* request_struct,
                                         void* response_struct) const {
-  if (!DecodeRequest(call.channel(), request, request_struct)) {
+  if (!DecodeRequest(context.channel(), request, request_struct)) {
     return;
   }
 
-  GenericNanopbResponder responder(call, MethodType::kUnary);
-  const Status status =
-      function_.synchronous_unary(call, request_struct, response_struct);
+  GenericNanopbResponder responder(context, MethodType::kUnary);
+  const Status status = function_.synchronous_unary(
+      context.service(), request_struct, response_struct);
   responder.SendResponse(response_struct, status).IgnoreError();
 }
 
-void NanopbMethod::CallUnaryRequest(CallContext& call,
+void NanopbMethod::CallUnaryRequest(const CallContext& context,
                                     MethodType type,
                                     const Packet& request,
                                     void* request_struct) const {
-  if (!DecodeRequest(call.channel(), request, request_struct)) {
+  if (!DecodeRequest(context.channel(), request, request_struct)) {
     return;
   }
 
-  GenericNanopbResponder server_writer(call, type);
-  function_.unary_request(call, request_struct, server_writer);
+  GenericNanopbResponder server_writer(context, type);
+  function_.unary_request(context.service(), request_struct, server_writer);
 }
 
 bool NanopbMethod::DecodeRequest(Channel& channel,
diff --git a/pw_rpc/nanopb/method_test.cc b/pw_rpc/nanopb/method_test.cc
index c00370c..7e768fa 100644
--- a/pw_rpc/nanopb/method_test.cc
+++ b/pw_rpc/nanopb/method_test.cc
@@ -135,16 +135,18 @@
 NanopbServerReaderWriter<pw_rpc_test_TestRequest, pw_rpc_test_TestResponse>
     last_reader_writer;
 
+Status DoNothing(ServerContext&, const pw_rpc_test_Empty&, pw_rpc_test_Empty&) {
+  return Status::Unknown();
+}
+
 void AddFive(ServerContext&,
              const pw_rpc_test_TestRequest& request,
              NanopbServerResponder<pw_rpc_test_TestResponse>& responder) {
   last_request = request;
-  responder.Finish({.value = static_cast<int32_t>(request.integer + 5)},
-                   Status::Unauthenticated());
-}
-
-Status DoNothing(ServerContext&, const pw_rpc_test_Empty&, pw_rpc_test_Empty&) {
-  return Status::Unknown();
+  ASSERT_EQ(
+      OkStatus(),
+      responder.Finish({.value = static_cast<int32_t>(request.integer + 5)},
+                       Status::Unauthenticated()));
 }
 
 void StartStream(ServerContext&,
@@ -186,23 +188,23 @@
           pw_rpc_test_TestResponse_fields)};
 };
 
-constexpr const NanopbMethod& kDoNothing =
+constexpr const NanopbMethod& kSyncUnary =
     std::get<0>(FakeService::kMethods).nanopb_method();
-constexpr const NanopbMethod& kAddFive =
+constexpr const NanopbMethod& kAsyncUnary =
     std::get<1>(FakeService::kMethods).nanopb_method();
-constexpr const NanopbMethod& kStartStream =
+constexpr const NanopbMethod& kServerStream =
     std::get<2>(FakeService::kMethods).nanopb_method();
 constexpr const NanopbMethod& kClientStream =
     std::get<3>(FakeService::kMethods).nanopb_method();
 constexpr const NanopbMethod& kBidirectionalStream =
     std::get<4>(FakeService::kMethods).nanopb_method();
 
-TEST(NanopbMethod, UnaryRpc_SendsResponse) {
+TEST(NanopbMethod, AsyncUnaryRpc_SendsResponse) {
   PW_ENCODE_PB(
       pw_rpc_test_TestRequest, request, .integer = 123, .status_code = 0);
 
-  ServerContextForTest<FakeService> context(kAddFive);
-  kAddFive.Invoke(context.get(), context.request(request));
+  ServerContextForTest<FakeService> context(kAsyncUnary);
+  kAsyncUnary.Invoke(context.get(), context.request(request));
 
   const Packet& response = context.output().sent_packet();
   EXPECT_EQ(response.status(), Status::Unauthenticated());
@@ -218,37 +220,37 @@
   EXPECT_EQ(123, last_request.integer);
 }
 
-TEST(NanopbMethod, UnaryRpc_InvalidPayload_SendsError) {
+TEST(NanopbMethod, SyncUnaryRpc_InvalidPayload_SendsError) {
   std::array<byte, 8> bad_payload{byte{0xFF}, byte{0xAA}, byte{0xDD}};
 
-  ServerContextForTest<FakeService> context(kDoNothing);
-  kDoNothing.Invoke(context.get(), context.request(bad_payload));
+  ServerContextForTest<FakeService> context(kSyncUnary);
+  kSyncUnary.Invoke(context.get(), context.request(bad_payload));
 
   const Packet& packet = context.output().sent_packet();
   EXPECT_EQ(PacketType::SERVER_ERROR, packet.type());
   EXPECT_EQ(Status::DataLoss(), packet.status());
   EXPECT_EQ(context.service_id(), packet.service_id());
-  EXPECT_EQ(kDoNothing.id(), packet.method_id());
+  EXPECT_EQ(kSyncUnary.id(), packet.method_id());
 }
 
-TEST(NanopbMethod, UnaryRpc_BufferTooSmallForResponse_SendsInternalError) {
+TEST(NanopbMethod, AsyncUnaryRpc_BufferTooSmallForResponse_SendsInternalError) {
   constexpr int64_t value = 0x7FFFFFFF'FFFFFF00ll;
   PW_ENCODE_PB(
       pw_rpc_test_TestRequest, request, .integer = value, .status_code = 0);
 
   // Output buffer is too small for the response, but can fit an error packet.
-  ServerContextForTest<FakeService, 22> context(kAddFive);
+  ServerContextForTest<FakeService, 22> context(kAsyncUnary);
   ASSERT_LT(
       context.output().buffer_size(),
       context.request(request).MinEncodedSizeBytes() + request.size() + 1);
 
-  kAddFive.Invoke(context.get(), context.request(request));
+  kAsyncUnary.Invoke(context.get(), context.request(request));
 
   const Packet& packet = context.output().sent_packet();
   EXPECT_EQ(PacketType::SERVER_ERROR, packet.type());
   EXPECT_EQ(Status::Internal(), packet.status());
   EXPECT_EQ(context.service_id(), packet.service_id());
-  EXPECT_EQ(kAddFive.id(), packet.method_id());
+  EXPECT_EQ(kAsyncUnary.id(), packet.method_id());
 
   EXPECT_EQ(value, last_request.integer);
 }
@@ -257,18 +259,18 @@
   PW_ENCODE_PB(
       pw_rpc_test_TestRequest, request, .integer = 555, .status_code = 0);
 
-  ServerContextForTest<FakeService> context(kStartStream);
+  ServerContextForTest<FakeService> context(kServerStream);
 
-  kStartStream.Invoke(context.get(), context.request(request));
+  kServerStream.Invoke(context.get(), context.request(request));
 
   EXPECT_EQ(0u, context.output().packet_count());
   EXPECT_EQ(555, last_request.integer);
 }
 
 TEST(NanopbMethod, ServerWriter_SendsResponse) {
-  ServerContextForTest<FakeService> context(kStartStream);
+  ServerContextForTest<FakeService> context(kServerStream);
 
-  kStartStream.Invoke(context.get(), context.request({}));
+  kServerStream.Invoke(context.get(), context.request({}));
 
   EXPECT_EQ(OkStatus(), last_writer.Write({.value = 100}));
 
@@ -285,18 +287,18 @@
 }
 
 TEST(NanopbMethod, ServerWriter_WriteWhenClosed_ReturnsFailedPrecondition) {
-  ServerContextForTest<FakeService> context(kStartStream);
+  ServerContextForTest<FakeService> context(kServerStream);
 
-  kStartStream.Invoke(context.get(), context.request({}));
+  kServerStream.Invoke(context.get(), context.request({}));
 
   EXPECT_EQ(OkStatus(), last_writer.Finish());
   EXPECT_TRUE(last_writer.Write({.value = 100}).IsFailedPrecondition());
 }
 
 TEST(NanopbMethod, ServerWriter_WriteAfterMoved_ReturnsFailedPrecondition) {
-  ServerContextForTest<FakeService> context(kStartStream);
+  ServerContextForTest<FakeService> context(kServerStream);
 
-  kStartStream.Invoke(context.get(), context.request({}));
+  kServerStream.Invoke(context.get(), context.request({}));
   NanopbServerWriter<pw_rpc_test_TestResponse> new_writer =
       std::move(last_writer);
 
@@ -315,7 +317,8 @@
       0 /* payload (when empty) */ + 0 /* status (when OK)*/;
 
   // Make the buffer barely fit a packet with no payload.
-  ServerContextForTest<FakeService, kNoPayloadPacketSize> context(kStartStream);
+  ServerContextForTest<FakeService, kNoPayloadPacketSize> context(
+      kServerStream);
 
   // Verify that the encoded size of a packet with an empty payload is correct.
   std::array<byte, 128> encoded_response = {};
@@ -323,7 +326,7 @@
   ASSERT_EQ(OkStatus(), encoded.status());
   ASSERT_EQ(kNoPayloadPacketSize, encoded.value().size());
 
-  kStartStream.Invoke(context.get(), context.request({}));
+  kServerStream.Invoke(context.get(), context.request({}));
 
   EXPECT_EQ(OkStatus(), last_writer.Write({}));  // Barely fits
   EXPECT_EQ(Status::Internal(), last_writer.Write({.value = 1}));  // Too big
@@ -332,10 +335,10 @@
 TEST(NanopbMethod, ServerReader_HandlesRequests) {
   ServerContextForTest<FakeService> context(kClientStream);
 
-  kBidirectionalStream.Invoke(context.get(), context.request({}));
+  kClientStream.Invoke(context.get(), context.request({}));
 
   pw_rpc_test_TestRequest request_struct{};
-  last_reader_writer.set_on_next(
+  last_reader.set_on_next(
       [&request_struct](const pw_rpc_test_TestRequest& req) {
         request_struct = req;
       });
@@ -383,15 +386,15 @@
       });
 
   PW_ENCODE_PB(
-      pw_rpc_test_TestRequest, request, .integer = 1 << 30, .status_code = 9);
+      pw_rpc_test_TestRequest, request, .integer = 1 << 29, .status_code = 8);
   std::array<byte, 128> encoded_request = {};
   auto encoded = context.client_stream(request).Encode(encoded_request);
   ASSERT_EQ(OkStatus(), encoded.status());
   ASSERT_EQ(OkStatus(),
             context.server().ProcessPacket(*encoded, context.output()));
 
-  EXPECT_EQ(request_struct.integer, 1 << 30);
-  EXPECT_EQ(request_struct.status_code, 9u);
+  EXPECT_EQ(request_struct.integer, 1 << 29);
+  EXPECT_EQ(request_struct.status_code, 8u);
 }
 
 }  // namespace
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
index 9c99a23..3fa75ee 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/internal/method.h
@@ -20,7 +20,6 @@
 #include <type_traits>
 
 #include "pw_function/function.h"
-#include "pw_rpc/internal/call.h"
 #include "pw_rpc/internal/config.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/method_type.h"
@@ -183,9 +182,9 @@
     // In optimized builds, the compiler inlines the user-defined function into
     // this wrapper, elminating any overhead.
     constexpr SynchronousUnaryFunction wrapper =
-        [](CallContext& call, const void* req, void* resp) {
+        [](Service& service, const void* req, void* resp) {
           return CallMethodImplFunction<kMethod>(
-              call,
+              service,
               *static_cast<const Request<kMethod>*>(req),
               *static_cast<Response<kMethod>*>(resp));
         };
@@ -211,9 +210,9 @@
     // In optimized builds, the compiler inlines the user-defined function into
     // this wrapper, elminating any overhead.
     constexpr UnaryRequestFunction wrapper =
-        [](CallContext& call, const void* req, GenericNanopbResponder& resp) {
+        [](Service& service, const void* req, GenericNanopbResponder& resp) {
           return CallMethodImplFunction<kMethod>(
-              call,
+              service,
               *static_cast<const Request<kMethod>*>(req),
               static_cast<NanopbServerResponder<Response<kMethod>>&>(resp));
         };
@@ -236,9 +235,9 @@
     // templated NanopbServerWriter class. This wrapper is stored generically in
     // the Function union, defined below.
     constexpr UnaryRequestFunction wrapper =
-        [](CallContext& call, const void* req, GenericNanopbResponder& writer) {
+        [](Service& service, const void* req, GenericNanopbResponder& writer) {
           return CallMethodImplFunction<kMethod>(
-              call,
+              service,
               *static_cast<const Request<kMethod>*>(req),
               static_cast<NanopbServerWriter<Response<kMethod>>&>(writer));
         };
@@ -256,11 +255,11 @@
       uint32_t id,
       NanopbMessageDescriptor request,
       NanopbMessageDescriptor response) {
-    constexpr StreamRequestFunction wrapper = [](CallContext& call,
+    constexpr StreamRequestFunction wrapper = [](Service& service,
                                                  GenericNanopbResponder&
                                                      reader) {
       return CallMethodImplFunction<kMethod>(
-          call,
+          service,
           static_cast<NanopbServerReader<Request<kMethod>, Response<kMethod>>&>(
               reader));
     };
@@ -278,9 +277,9 @@
       NanopbMessageDescriptor request,
       NanopbMessageDescriptor response) {
     constexpr StreamRequestFunction wrapper =
-        [](CallContext& call, GenericNanopbResponder& reader_writer) {
+        [](Service& service, GenericNanopbResponder& reader_writer) {
           return CallMethodImplFunction<kMethod>(
-              call,
+              service,
               static_cast<NanopbServerReaderWriter<Request<kMethod>,
                                                    Response<kMethod>>&>(
                   reader_writer));
@@ -303,18 +302,18 @@
 
  private:
   // Generic function signature for synchronous unary RPCs.
-  using SynchronousUnaryFunction = Status (*)(CallContext&,
+  using SynchronousUnaryFunction = Status (*)(Service&,
                                               const void* request,
                                               void* response);
 
   // Generic function signature for asynchronous unary and server streaming
   // RPCs.
-  using UnaryRequestFunction = void (*)(CallContext&,
+  using UnaryRequestFunction = void (*)(Service&,
                                         const void* request,
                                         GenericNanopbResponder& writer);
 
   // Generic function signature for client and bidirectional streaming RPCs.
-  using StreamRequestFunction = void (*)(CallContext&,
+  using StreamRequestFunction = void (*)(Service&,
                                          GenericNanopbResponder& reader_writer);
 
   // The Function union stores a pointer to a generic version of the
@@ -340,12 +339,12 @@
                          NanopbMessageDescriptor response)
       : Method(id, invoker), function_(function), serde_(request, response) {}
 
-  void CallSynchronousUnary(CallContext& call,
+  void CallSynchronousUnary(const CallContext& context,
                             const Packet& request,
                             void* request_struct,
                             void* response_struct) const;
 
-  void CallUnaryRequest(CallContext& call,
+  void CallUnaryRequest(const CallContext& context,
                         MethodType type,
                         const Packet& request,
                         void* request_struct) const;
@@ -354,8 +353,7 @@
   // structs by size, with maximum alignment, to avoid generating unnecessary
   // copies of this function for each request/response type.
   template <size_t kRequestSize, size_t kResponseSize>
-  static void SynchronousUnaryInvoker(const Method& method,
-                                      CallContext& call,
+  static void SynchronousUnaryInvoker(const CallContext& context,
                                       const Packet& request) {
     _PW_RPC_NANOPB_STRUCT_STORAGE_CLASS
     std::aligned_storage_t<kRequestSize, alignof(std::max_align_t)>
@@ -364,59 +362,59 @@
     std::aligned_storage_t<kResponseSize, alignof(std::max_align_t)>
         response_struct{};
 
-    static_cast<const NanopbMethod&>(method).CallSynchronousUnary(
-        call, request, &request_struct, &response_struct);
+    static_cast<const NanopbMethod&>(context.method())
+        .CallSynchronousUnary(
+            context, request, &request_struct, &response_struct);
   }
 
   // Invoker function for asynchronous unary RPCs. Allocates space for a request
   // struct. Ignores the payload buffer since resposnes are sent through the
   // NanopbServerResponder.
   template <size_t kRequestSize>
-  static void AsynchronousUnaryInvoker(const Method& method,
-                                       CallContext& call,
+  static void AsynchronousUnaryInvoker(const CallContext& context,
                                        const Packet& request) {
     _PW_RPC_NANOPB_STRUCT_STORAGE_CLASS
     std::aligned_storage_t<kRequestSize, alignof(std::max_align_t)>
         request_struct{};
 
-    static_cast<const NanopbMethod&>(method).CallUnaryRequest(
-        call, MethodType::kUnary, request, &request_struct);
+    static_cast<const NanopbMethod&>(context.method())
+        .CallUnaryRequest(
+            context, MethodType::kUnary, request, &request_struct);
   }
 
   // Invoker function for server streaming RPCs. Allocates space for a request
   // struct. Ignores the payload buffer since resposnes are sent through the
   // NanopbServerWriter.
   template <size_t kRequestSize>
-  static void ServerStreamingInvoker(const Method& method,
-                                     CallContext& call,
+  static void ServerStreamingInvoker(const CallContext& context,
                                      const Packet& request) {
     _PW_RPC_NANOPB_STRUCT_STORAGE_CLASS
     std::aligned_storage_t<kRequestSize, alignof(std::max_align_t)>
         request_struct{};
 
-    static_cast<const NanopbMethod&>(method).CallUnaryRequest(
-        call, MethodType::kServerStreaming, request, &request_struct);
+    static_cast<const NanopbMethod&>(context.method())
+        .CallUnaryRequest(
+            context, MethodType::kServerStreaming, request, &request_struct);
   }
 
   // Invoker function for client streaming RPCs.
   template <typename Request>
-  static void ClientStreamingInvoker(const Method& method,
-                                     CallContext& call,
+  static void ClientStreamingInvoker(const CallContext& context,
                                      const Packet&) {
-    BaseNanopbServerReader<Request> reader(call, MethodType::kClientStreaming);
-    static_cast<const NanopbMethod&>(method).function_.stream_request(call,
-                                                                      reader);
+    BaseNanopbServerReader<Request> reader(context,
+                                           MethodType::kClientStreaming);
+    static_cast<const NanopbMethod&>(context.method())
+        .function_.stream_request(context.service(), reader);
   }
 
   // Invoker function for bidirectional streaming RPCs.
   template <typename Request>
-  static void BidirectionalStreamingInvoker(const Method& method,
-                                            CallContext& call,
+  static void BidirectionalStreamingInvoker(const CallContext& context,
                                             const Packet&) {
     BaseNanopbServerReader<Request> reader_writer(
-        call, MethodType::kBidirectionalStreaming);
-    static_cast<const NanopbMethod&>(method).function_.stream_request(
-        call, reader_writer);
+        context, MethodType::kBidirectionalStreaming);
+    static_cast<const NanopbMethod&>(context.method())
+        .function_.stream_request(context.service(), reader_writer);
   }
 
   // Decodes a request protobuf with Nanopb to the provided buffer. Sends an
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index 2e8cbee..6432e07 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -23,6 +23,7 @@
 #include "pw_rpc/internal/method_info.h"
 #include "pw_rpc/internal/method_lookup.h"
 #include "pw_rpc/internal/open_call.h"
+#include "pw_rpc/nanopb/internal/common.h"
 #include "pw_rpc/server.h"
 
 namespace pw::rpc {
@@ -39,12 +40,15 @@
 }  // namespace test
 
 // Non-templated base so the methods are instantiated only once.
-class GenericNanopbResponder : public internal::Call {
+class GenericNanopbResponder : public internal::ServerCall {
  public:
-  constexpr GenericNanopbResponder(MethodType type) : internal::Call(type) {}
+  constexpr GenericNanopbResponder(MethodType type)
+      : internal::ServerCall(type), serde_(nullptr) {}
 
-  GenericNanopbResponder(const CallContext& call, MethodType type)
-      : internal::Call(call, type) {}
+  GenericNanopbResponder(const CallContext& context, MethodType type);
+
+  GenericNanopbResponder(GenericNanopbResponder&&) = default;
+  GenericNanopbResponder& operator=(GenericNanopbResponder&&) = default;
 
   Status SendResponse(const void* response, Status status) {
     return SendClientStreamOrResponse(response, &status);
@@ -55,10 +59,14 @@
     return SendClientStreamOrResponse(response, nullptr);
   }
 
-  void DecodeRequest(ConstByteSpan payload, void* request_struct) const;
+  void DecodeRequest(ConstByteSpan payload, void* request_struct) const {
+    serde_->DecodeRequest(payload, request_struct);
+  }
 
  private:
   Status SendClientStreamOrResponse(const void* response, const Status* status);
+
+  const NanopbMethodSerde* serde_;
 };
 
 // The BaseNanopbServerReader serves as the base for the ServerReader and
@@ -67,8 +75,8 @@
 template <typename Request>
 class BaseNanopbServerReader : public GenericNanopbResponder {
  public:
-  BaseNanopbServerReader(const internal::CallContext& call, MethodType type)
-      : GenericNanopbResponder(call, type) {}
+  BaseNanopbServerReader(const internal::CallContext& context, MethodType type)
+      : GenericNanopbResponder(context, type) {}
 
  protected:
   constexpr BaseNanopbServerReader(MethodType type)
@@ -156,9 +164,9 @@
   template <typename, typename, uint32_t>
   friend class internal::test::InvocationContext;
 
-  NanopbServerReaderWriter(const internal::CallContext& call)
+  NanopbServerReaderWriter(const internal::CallContext& context)
       : internal::BaseNanopbServerReader<Request>(
-            call, MethodType::kBidirectionalStreaming) {}
+            context, MethodType::kBidirectionalStreaming) {}
 };
 
 // The NanopbServerReader is used to receive messages and send a response in a
@@ -216,9 +224,9 @@
   template <typename, typename, uint32_t>
   friend class internal::test::InvocationContext;
 
-  NanopbServerReader(const internal::CallContext& call)
+  NanopbServerReader(const internal::CallContext& context)
       : internal::BaseNanopbServerReader<Request>(
-            call, MethodType::kClientStreaming) {}
+            context, MethodType::kClientStreaming) {}
 };
 
 // The NanopbServerWriter is used to send responses in a Nanopb server streaming
@@ -279,8 +287,9 @@
   template <typename, typename, uint32_t>
   friend class internal::test::InvocationContext;
 
-  NanopbServerWriter(const internal::CallContext& call)
-      : internal::GenericNanopbResponder(call, MethodType::kServerStreaming) {}
+  NanopbServerWriter(const internal::CallContext& context)
+      : internal::GenericNanopbResponder(context,
+                                         MethodType::kServerStreaming) {}
 };
 
 template <typename Response>
@@ -334,8 +343,8 @@
   template <typename, typename, uint32_t>
   friend class internal::test::InvocationContext;
 
-  NanopbServerResponder(const internal::CallContext& call)
-      : internal::GenericNanopbResponder(call, MethodType::kUnary) {}
+  NanopbServerResponder(const internal::CallContext& context)
+      : internal::GenericNanopbResponder(context, MethodType::kUnary) {}
 };
 // TODO(hepler): "pw::rpc::ServerWriter" should not be specific to Nanopb.
 template <typename T>
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/test_method_context.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/test_method_context.h
index 51269a0..b7ef8a4 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/test_method_context.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/test_method_context.h
@@ -164,7 +164,7 @@
 
       Response& response = Base::output().AllocateResponse();
       return CallMethodImplFunction<kMethod>(
-          Base::call_context(), request, response);
+          Base::service(), request, response);
     } else {
       Base::template call<kMethod, NanopbServerResponder<Response>>(request);
     }
diff --git a/pw_rpc/nanopb/server_reader_writer.cc b/pw_rpc/nanopb/server_reader_writer.cc
index c533e73..c3a7cbe26 100644
--- a/pw_rpc/nanopb/server_reader_writer.cc
+++ b/pw_rpc/nanopb/server_reader_writer.cc
@@ -18,6 +18,12 @@
 
 namespace pw::rpc::internal {
 
+GenericNanopbResponder::GenericNanopbResponder(const CallContext& context,
+                                               MethodType type)
+    : internal::ServerCall(context, type),
+      serde_(&static_cast<const internal::NanopbMethod&>(context.method())
+                  .serde()) {}
+
 Status GenericNanopbResponder::SendClientStreamOrResponse(
     const void* response, const Status* status) {
   if (!active()) {
@@ -28,9 +34,7 @@
 
   // Cast the method to a NanopbMethod. Access the Nanopb
   // serializer/deserializer object and encode the response with it.
-  StatusWithSize result = static_cast<const internal::NanopbMethod&>(method())
-                              .serde()
-                              .EncodeResponse(response, payload_buffer);
+  StatusWithSize result = serde_->EncodeResponse(response, payload_buffer);
 
   if (!result.ok()) {
     return CloseAndSendServerError(Status::Internal());
@@ -44,10 +48,4 @@
   return SendPayloadBufferClientStream(payload_buffer);
 }
 
-void GenericNanopbResponder::DecodeRequest(ConstByteSpan payload,
-                                           void* request_struct) const {
-  static_cast<const internal::NanopbMethod&>(method()).serde().DecodeRequest(
-      payload, request_struct);
-}
-
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 065b56d..5a5e1f2 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -34,6 +34,7 @@
 
 namespace internal {
 
+class Endpoint;
 class Packet;
 
 // Internal RPC Call class. The Call is used to respond to any type of RPC.
@@ -49,9 +50,13 @@
  public:
   Call(const Call&) = delete;
 
+  // Move support is provided to derived classes through the MoveFrom function.
+  Call(Call&&) = delete;
+
   ~Call() { CloseAndSendResponse(OkStatus()).IgnoreError(); }
 
   Call& operator=(const Call&) = delete;
+  Call& operator=(Call&&) = delete;
 
   // True if the Call is active and ready to send responses.
   [[nodiscard]] bool active() const { return rpc_state_ == kActive; }
@@ -63,9 +68,9 @@
     return active();
   }
 
-  uint32_t channel_id() const { return call_.channel().id(); }
-  uint32_t service_id() const { return call_.service().id(); }
-  uint32_t method_id() const;
+  uint32_t channel_id() const { return channel().id(); }
+  uint32_t service_id() const { return service_id_; }
+  uint32_t method_id() const { return method_id_; }
 
   // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
   // status from sending the packet, or FAILED_PRECONDITION if the Call is not
@@ -96,16 +101,6 @@
     }
   }
 
-  void EndClientStream() {
-    client_stream_state_ = kClientStreamInactive;
-
-#if PW_RPC_CLIENT_STREAM_END_CALLBACK
-    if (on_client_stream_end_) {
-      on_client_stream_end_();
-    }
-#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
-  }
-
   bool has_client_stream() const { return HasClientStream(type_); }
 
   bool client_stream_open() const {
@@ -115,22 +110,26 @@
  protected:
   // Creates a Call for a closed RPC.
   constexpr Call(MethodType type)
-      : rpc_state_(kInactive),
+      : endpoint_(nullptr),
+        channel_(nullptr),
+        service_id_(0),
+        method_id_(0),
+        rpc_state_(kInactive),
         type_(type),
         client_stream_state_(kClientStreamInactive) {}
 
   // Creates a Call for an active RPC.
   Call(const CallContext& call, MethodType type);
 
-  // Initialize rpc_state_ to closed since move-assignment will check if the
-  // Call is active before moving into it.
-  Call(Call&& other) : rpc_state_(kInactive) { *this = std::move(other); }
+  // Constructor for use when move-initializing a call. Initialize rpc_state_ to
+  // closed since move-assignment will check if the Call is active before moving
+  // into it.
+  Call() : rpc_state_(kInactive) {}
 
-  Call& operator=(Call&& other);
+  void MoveFrom(Call& other);
 
-  const Method& method() const { return call_.method(); }
-
-  const Channel& channel() const { return call_.channel(); }
+  Endpoint& endpoint() const { return *endpoint_; }
+  Channel& channel() const { return *channel_; }
 
   void set_on_error(Function<void(Status)> on_error) {
     on_error_ = std::move(on_error);
@@ -140,19 +139,8 @@
     on_next_ = std::move(on_next);
   }
 
-  // set_on_client_stream_end is templated so that it can be conditionally
-  // disabled with a helpful static_assert message.
-  template <typename UnusedType = void>
-  void set_on_client_stream_end(
-      [[maybe_unused]] Function<void()> on_client_stream_end) {
-    static_assert(
-        cfg::kClientStreamEndCallbackEnabled<UnusedType>,
-        "The client stream end callback is disabled, so "
-        "set_on_client_stream_end cannot be called. To enable the client end "
-        "callback, set PW_RPC_CLIENT_STREAM_END_CALLBACK to 1.");
-#if PW_RPC_CLIENT_STREAM_END_CALLBACK
-    on_client_stream_end_ = std::move(on_client_stream_end);
-#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+  void MarkClientStreamCompleted() {
+    client_stream_state_ = kClientStreamInactive;
   }
 
   constexpr const Channel::OutputBuffer& buffer() const { return response_; }
@@ -169,6 +157,14 @@
   void ReleasePayloadBuffer();
 
  private:
+  Packet StreamPacket(std::span<const std::byte> payload) const {
+    return Packet(PacketType::SERVER_STREAM,
+                  channel_id(),
+                  service_id(),
+                  method_id(),
+                  payload);
+  }
+
   Status CloseAndSendFinalPacket(PacketType type,
                                  std::span<const std::byte> response,
                                  Status status);
@@ -177,7 +173,11 @@
   // active when this is called.
   void Close();
 
-  CallContext call_;
+  internal::Endpoint* endpoint_;
+  internal::Channel* channel_;
+  uint32_t service_id_;
+  uint32_t method_id_;
+
   Channel::OutputBuffer response_;
 
   // Called when the RPC is terminated due to an error.
@@ -187,11 +187,6 @@
   // The raw payload buffer is passed to the callback.
   Function<void(std::span<const std::byte> payload)> on_next_;
 
-#if PW_RPC_CLIENT_STREAM_END_CALLBACK
-  // Called when a client stream completes.
-  Function<void()> on_client_stream_end_;
-#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
-
   enum : bool { kInactive, kActive } rpc_state_;
   MethodType type_;
   enum : bool {
@@ -200,5 +195,61 @@
   } client_stream_state_;
 };
 
+// A Call object, as used by an RPC server.
+class ServerCall : public Call {
+ public:
+  void EndClientStream() {
+    MarkClientStreamCompleted();
+
+#if PW_RPC_CLIENT_STREAM_END_CALLBACK
+    if (on_client_stream_end_) {
+      on_client_stream_end_();
+    }
+#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+  }
+
+ protected:
+  ServerCall(ServerCall&& other) : Call() { *this = std::move(other); }
+
+  ServerCall& operator=(ServerCall&& other);
+
+  constexpr ServerCall(MethodType type) : Call(type) {}
+
+  ServerCall(const CallContext& call, MethodType type) : Call(call, type) {}
+
+  // set_on_client_stream_end is templated so that it can be conditionally
+  // disabled with a helpful static_assert message.
+  template <typename UnusedType = void>
+  void set_on_client_stream_end(
+      [[maybe_unused]] Function<void()> on_client_stream_end) {
+    static_assert(
+        cfg::kClientStreamEndCallbackEnabled<UnusedType>,
+        "The client stream end callback is disabled, so "
+        "set_on_client_stream_end cannot be called. To enable the client end "
+        "callback, set PW_RPC_CLIENT_STREAM_END_CALLBACK to 1.");
+#if PW_RPC_CLIENT_STREAM_END_CALLBACK
+    on_client_stream_end_ = std::move(on_client_stream_end);
+#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+  }
+
+ private:
+#if PW_RPC_CLIENT_STREAM_END_CALLBACK
+  // Called when a client stream completes.
+  Function<void()> on_client_stream_end_;
+#endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+};
+
+// A Call object, as used by an RPC client.
+// TODO(hepler): Refactor client calls to use this class.
+class ClientCall : public Call {
+ protected:
+  ClientCall(ClientCall&& other) { *this = std::move(other); }
+
+  ClientCall& operator=(ClientCall&& other);
+
+ private:
+  Function<void(Status)> on_completed_;
+};
+
 }  // namespace internal
 }  // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/call_context.h b/pw_rpc/public/pw_rpc/internal/call_context.h
index 3a709d5..98effcc 100644
--- a/pw_rpc/public/pw_rpc/internal/call_context.h
+++ b/pw_rpc/public/pw_rpc/internal/call_context.h
@@ -16,73 +16,44 @@
 #include <cstddef>
 #include <cstdint>
 
-#include "pw_assert/assert.h"
 #include "pw_rpc/internal/channel.h"
 
 namespace pw::rpc {
 
 class ServerContext;
 class Service;
-class Server;
 
 namespace internal {
 
+class Endpoint;
 class Method;
 
-// Collects information for an ongoing RPC being processed by the server.
 // The Server creates a CallContext object to represent a method invocation. The
-// CallContext is copied into a ServerReader/Writer for streaming RPCs.
-//
-// CallContext is an internal class. ServerContext is the public server-side
-// interface to the internal::CallContext.
+// CallContext is used to initialize a call object for the RPC.
 class CallContext {
  public:
-  constexpr CallContext()
-      : server_(nullptr),
-        channel_(nullptr),
-        service_(nullptr),
-        method_(nullptr) {}
-
-  constexpr CallContext(Server& server,
+  constexpr CallContext(Endpoint& endpoint,
                         Channel& channel,
                         Service& service,
                         const internal::Method& method)
-      : server_(&server),
-        channel_(&channel),
-        service_(&service),
-        method_(&method) {}
+      : endpoint_(endpoint),
+        channel_(channel),
+        service_(service),
+        method_(method) {}
 
-  constexpr CallContext(const CallContext&) = default;
-  constexpr CallContext& operator=(const CallContext&) = default;
+  Endpoint& endpoint() const { return endpoint_; }
 
-  // Access the ServerContext for this call. Defined in pw_rpc/server_context.h.
-  ServerContext& context();
+  Channel& channel() const { return channel_; }
 
-  Server& server() const {
-    PW_DASSERT(server_ != nullptr);
-    return *server_;
-  }
+  Service& service() const { return service_; }
 
-  Channel& channel() const {
-    PW_DASSERT(channel_ != nullptr);
-    return *channel_;
-  }
-
-  Service& service() const {
-    PW_DASSERT(service_ != nullptr);
-    return *service_;
-  }
-
-  const internal::Method& method() const {
-    PW_DASSERT(method_ != nullptr);
-    return *method_;
-  }
+  const internal::Method& method() const { return method_; }
 
  private:
-  Server* server_;
-  Channel* channel_;
-  Service* service_;
-  const internal::Method* method_;
+  Endpoint& endpoint_;
+  Channel& channel_;
+  Service& service_;
+  const internal::Method& method_;
 };
 
 }  // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/endpoint.h b/pw_rpc/public/pw_rpc/internal/endpoint.h
index 76b2fd6..8cb9633 100644
--- a/pw_rpc/public/pw_rpc/internal/endpoint.h
+++ b/pw_rpc/public/pw_rpc/internal/endpoint.h
@@ -51,14 +51,10 @@
 
   // Finds a call object for an ongoing call associated with this packet, if
   // any. Returns nullptr if no matching call exists.
-  Call* FindCall(const Packet& packet);
-
-  // Adds a call to the internal call registry. This immediately registers the
-  // call and does NOT check if the call is unique.
-  void RegisterCall(Call& call) { calls_.push_front(call); }
-
-  // Removes the provided call from the call registry.
-  void UnregisterCall(const Call& call) { calls_.remove(call); }
+  Call* FindCall(const Packet& packet) {
+    return FindCallById(
+        packet.channel_id(), packet.service_id(), packet.method_id());
+  }
 
   // Finds an internal:::Channel with this ID or nullptr if none matches.
   Channel* GetInternalChannel(uint32_t id) const;
@@ -69,6 +65,24 @@
   Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
 
  private:
+  // Give Call access to the register/unregister functions.
+  friend class Call;
+
+  // Adds a call to the internal call registry. If a matching call already
+  // exists, it is cancelled locally (on_error called, no packet sent).
+  void RegisterCall(Call& call);
+
+  // Registers a call that is known to be unique. The calls list is NOT checked
+  // for existing calls.
+  void RegisterUniqueCall(Call& call) { calls_.push_front(call); }
+
+  // Removes the provided call from the call registry.
+  void UnregisterCall(const Call& call) { calls_.remove(call); }
+
+  Call* FindCallById(uint32_t channel_id,
+                     uint32_t service_id,
+                     uint32_t method_id);
+
   std::span<Channel> channels_;
   IntrusiveList<Call> calls_;
 };
diff --git a/pw_rpc/public/pw_rpc/internal/method.h b/pw_rpc/public/pw_rpc/internal/method.h
index e3c5bbe..e5db6df 100644
--- a/pw_rpc/public/pw_rpc/internal/method.h
+++ b/pw_rpc/public/pw_rpc/internal/method.h
@@ -18,8 +18,13 @@
 #include <utility>
 
 #include "pw_rpc/internal/call_context.h"
+#include "pw_rpc/server_context.h"
 
-namespace pw::rpc::internal {
+namespace pw::rpc {
+
+class Service;
+
+namespace internal {
 
 class Packet;
 
@@ -61,16 +66,14 @@
   // calls the invoker function, which handles the RPC request and response
   // according to the RPC type and protobuf implementation and calls the
   // user-defined RPC function.
-  void Invoke(CallContext& call, const Packet& request) const {
-    return invoker_(*this, call, request);
+  void Invoke(const CallContext& context, const Packet& request) const {
+    return invoker_(context, request);
   }
 
  protected:
-  using Invoker = void (&)(const Method&, CallContext&, const Packet&);
+  using Invoker = void (&)(const CallContext&, const Packet&);
 
-  static constexpr void InvalidInvoker(const Method&,
-                                       CallContext&,
-                                       const Packet&) {}
+  static constexpr void InvalidInvoker(const CallContext&, const Packet&) {}
 
   constexpr Method(uint32_t id, Invoker invoker) : id_(id), invoker_(invoker) {}
 
@@ -111,18 +114,17 @@
 template <auto kMethod>
 using Response = typename MethodTraits<decltype(kMethod)>::Response;
 
-// Function that calls a user-defined method implementation function from a
-// CallContext object.
+// Function that calls a user-defined RPC function on the given Service.
 template <auto kMethod, typename... Args>
-constexpr auto CallMethodImplFunction(CallContext& call, Args&&... args) {
+constexpr auto CallMethodImplFunction(Service& service, Args&&... args) {
   // If the method impl is a member function, deduce the type of the
   // user-defined service from it, then call the method on the service.
   if constexpr (std::is_member_function_pointer_v<decltype(kMethod)>) {
     return (static_cast<typename MethodTraits<decltype(kMethod)>::Service&>(
-                call.service()).*
-            kMethod)(call.context(), std::forward<Args>(args)...);
+                service).*
+            kMethod)(GlobalServerContextStub(), std::forward<Args>(args)...);
   } else {
-    return kMethod(call.context(), std::forward<Args>(args)...);
+    return kMethod(GlobalServerContextStub(), std::forward<Args>(args)...);
   }
 }
 
@@ -136,4 +138,5 @@
 using GeneratedService =
     decltype(BaseFromMember(&Service::_PwRpcInternalGeneratedBase));
 
-}  // namespace pw::rpc::internal
+}  // namespace internal
+}  // namespace pw::rpc
diff --git a/pw_rpc/public/pw_rpc/internal/test_method.h b/pw_rpc/public/pw_rpc/internal/test_method.h
index 05bd6cd..9cb3dca 100644
--- a/pw_rpc/public/pw_rpc/internal/test_method.h
+++ b/pw_rpc/public/pw_rpc/internal/test_method.h
@@ -17,9 +17,11 @@
 #include <cstring>
 #include <span>
 
+#include "pw_rpc/internal/call.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/internal/method_union.h"
 #include "pw_rpc/internal/packet.h"
+#include "pw_rpc/method_type.h"
 #include "pw_rpc/server_context.h"
 #include "pw_status/status_with_size.h"
 
@@ -29,8 +31,8 @@
 // channel ID, request, and payload buffer, and optionally provides a response.
 class TestMethod : public Method {
  public:
-  constexpr TestMethod(uint32_t id)
-      : Method(id, InvokeForTest), last_channel_id_(0), invocations_(0) {}
+  constexpr TestMethod(uint32_t id, MethodType type = MethodType::kUnary)
+      : Method(id, GetInvoker(type)), last_channel_id_(0), invocations_(0) {}
 
   uint32_t last_channel_id() const { return last_channel_id_; }
   const Packet& last_request() const { return last_request_; }
@@ -40,13 +42,34 @@
   void set_status(Status status) { response_status_ = status; }
 
  private:
-  static void InvokeForTest(const Method& method,
-                            CallContext& call,
-                            const Packet& request) {
-    const auto& test_method = static_cast<const TestMethod&>(method);
-    test_method.last_channel_id_ = call.channel().id();
+  class FakeServerCall : public ServerCall {
+   public:
+    FakeServerCall(const CallContext& context, MethodType type)
+        : ServerCall(context, type) {}
+  };
+
+  template <MethodType kType>
+  static void InvokeForTest(const CallContext& context, const Packet& request) {
+    const auto& test_method = static_cast<const TestMethod&>(context.method());
+    test_method.last_channel_id_ = context.channel().id();
     test_method.last_request_ = request;
     test_method.invocations_ += 1;
+
+    // Create a call object so it registers / unregisters with the server.
+    FakeServerCall fake_call(context, kType);
+  }
+
+  static constexpr Invoker GetInvoker(MethodType type) {
+    switch (type) {
+      case MethodType::kUnary:
+        return InvokeForTest<MethodType::kUnary>;
+      case MethodType::kServerStreaming:
+        return InvokeForTest<MethodType::kServerStreaming>;
+      case MethodType::kClientStreaming:
+        return InvokeForTest<MethodType::kClientStreaming>;
+      case MethodType::kBidirectionalStreaming:
+        return InvokeForTest<MethodType::kBidirectionalStreaming>;
+    };
   }
 
   // Make these mutable so they can be set in the Invoke method, which is const.
diff --git a/pw_rpc/public/pw_rpc/internal/test_method_context.h b/pw_rpc/public/pw_rpc/internal/test_method_context.h
index b6a8c5f..14b30a7 100644
--- a/pw_rpc/public/pw_rpc/internal/test_method_context.h
+++ b/pw_rpc/public/pw_rpc/internal/test_method_context.h
@@ -147,7 +147,7 @@
     output_.clear();
     T responder = GetResponder<T>();
     CallMethodImplFunction<kMethod>(
-        call_context(), std::forward<RequestArg>(request)..., responder);
+        service(), std::forward<RequestArg>(request)..., responder);
   }
 
   template <typename T>
@@ -155,7 +155,7 @@
     return T(call_context());
   }
 
-  internal::CallContext& call_context() { return context_; }
+  const internal::CallContext& call_context() const { return context_; }
 
  private:
   static constexpr size_t kNoPayloadPacketSizeBytes =
@@ -166,7 +166,7 @@
   rpc::Channel channel_;
   rpc::Server server_;
   Service service_;
-  internal::CallContext context_;
+  const internal::CallContext context_;
 };
 
 }  // namespace pw::rpc::internal::test
diff --git a/pw_rpc/public/pw_rpc/internal/test_utils.h b/pw_rpc/public/pw_rpc/internal/test_utils.h
index 868e140..3223667 100644
--- a/pw_rpc/public/pw_rpc/internal/test_utils.h
+++ b/pw_rpc/public/pw_rpc/internal/test_utils.h
@@ -136,7 +136,7 @@
                             payload);
   }
 
-  internal::CallContext& get() { return context_; }
+  const internal::CallContext& get() { return context_; }
   auto& output() { return output_; }
   TestServer& server() { return static_cast<TestServer&>(server_); }
 
@@ -146,7 +146,7 @@
   rpc::Server server_;
   Service service_;
 
-  internal::CallContext context_;
+  const internal::CallContext context_;
 };
 
 template <size_t kOutputBufferSize = 128,
diff --git a/pw_rpc/public/pw_rpc/server.h b/pw_rpc/public/pw_rpc/server.h
index f3c3344..e8eabd3 100644
--- a/pw_rpc/public/pw_rpc/server.h
+++ b/pw_rpc/public/pw_rpc/server.h
@@ -55,11 +55,11 @@
 
   void HandleClientStreamPacket(const internal::Packet& packet,
                                 internal::Channel& channel,
-                                internal::Call* call) const;
+                                internal::ServerCall* call) const;
 
   void HandleCancelPacket(const internal::Packet& packet,
                           internal::Channel& channel,
-                          internal::Call* call) const;
+                          internal::ServerCall* call) const;
 
   IntrusiveList<Service> services_;
 };
diff --git a/pw_rpc/public/pw_rpc/server_context.h b/pw_rpc/public/pw_rpc/server_context.h
index 0578931..4970629 100644
--- a/pw_rpc/public/pw_rpc/server_context.h
+++ b/pw_rpc/public/pw_rpc/server_context.h
@@ -16,34 +16,43 @@
 #include <cstddef>
 #include <cstdint>
 
-#include "pw_rpc/internal/call_context.h"
-
 namespace pw::rpc {
 
+class ServerContext;
+
+namespace internal {
+
+ServerContext& GlobalServerContextStub();
+
+}  // namespace internal
+
 // The ServerContext class is DEPRECATED and will be removed from pw_rpc. All
-// information in the ServerContext is accessible through the
+// information formerly in the ServerContext is accessible through the
 // ServerReader/Writer object.
 //
 // The only case where the information in a ServerContext is not available is
 // synchronous unary RPCs. If information like channel_id() is needed in a unary
 // RPC, just use an asynchronous unary RPC.
-class ServerContext : private internal::CallContext {
+class ServerContext {
  public:
-  constexpr ServerContext() = delete;
-
   constexpr ServerContext(const ServerContext&) = delete;
   constexpr ServerContext& operator=(const ServerContext&) = delete;
 
   constexpr ServerContext(ServerContext&&) = delete;
   constexpr ServerContext& operator=(ServerContext&&) = delete;
 
-  friend class internal::CallContext;  // Allow down-casting from CallContext.
+ private:
+  constexpr ServerContext() = default;
+
+  // Allow GlobalServerContextStub() to create a global instance.
+  friend ServerContext& internal::GlobalServerContextStub();
 };
 
 namespace internal {
 
-inline ServerContext& CallContext::context() {
-  return static_cast<ServerContext&>(*this);
+inline ServerContext& GlobalServerContextStub() {
+  static ServerContext global_server_context_stub;
+  return global_server_context_stub;
 }
 
 }  // namespace internal
diff --git a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
index 6d650a4..581af1e 100644
--- a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
+++ b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h
@@ -35,24 +35,24 @@
 //
 // Call's public API is intended for rpc::Server, so hide the public methods
 // with private inheritance.
-class FakeServerReaderWriter : private internal::Call {
+class FakeServerReaderWriter : private internal::ServerCall {
  public:
   constexpr FakeServerReaderWriter()
       : FakeServerReaderWriter(MethodType::kBidirectionalStreaming) {}
 
   // On a real reader/writer, this constructor would not be exposed.
-  FakeServerReaderWriter(CallContext& context,
+  FakeServerReaderWriter(const CallContext& context,
                          MethodType type = MethodType::kBidirectionalStreaming)
-      : Call(context, type) {}
+      : ServerCall(context, type) {}
 
   FakeServerReaderWriter(FakeServerReaderWriter&&) = default;
   FakeServerReaderWriter& operator=(FakeServerReaderWriter&&) = default;
 
   // Pull in protected functions from the hidden Call base as needed.
   using Call::active;
-  using Call::set_on_client_stream_end;
   using Call::set_on_error;
   using Call::set_on_next;
+  using ServerCall::set_on_client_stream_end;
 
   Status Finish(Status status = OkStatus()) {
     return CloseAndSendResponse(status);
@@ -67,19 +67,20 @@
   }
 
   // Expose a few additional methods for test use.
-  Call& as_responder() { return *this; }
+  ServerCall& as_server_call() { return *this; }
   ByteSpan PayloadBuffer() { return AcquirePayloadBuffer(); }
   const Channel::OutputBuffer& output_buffer() { return buffer(); }
 
  protected:
-  constexpr FakeServerReaderWriter(MethodType type) : internal::Call(type) {}
+  constexpr FakeServerReaderWriter(MethodType type)
+      : internal::ServerCall(type) {}
 };
 
 class FakeServerWriter : private FakeServerReaderWriter {
  public:
   constexpr FakeServerWriter()
       : FakeServerReaderWriter(MethodType::kServerStreaming) {}
-  FakeServerWriter(CallContext& context)
+  FakeServerWriter(const CallContext& context)
       : FakeServerReaderWriter(context, MethodType::kServerStreaming) {}
   FakeServerWriter(FakeServerWriter&&) = default;
 
@@ -90,7 +91,7 @@
   using FakeServerReaderWriter::Write;
 
   // Functions for test use.
-  using FakeServerReaderWriter::as_responder;
+  using FakeServerReaderWriter::as_server_call;
   using FakeServerReaderWriter::output_buffer;
   using FakeServerReaderWriter::PayloadBuffer;
 };
@@ -100,13 +101,13 @@
   constexpr FakeServerReader()
       : FakeServerReaderWriter(MethodType::kClientStreaming) {}
 
-  FakeServerReader(CallContext& context)
+  FakeServerReader(const CallContext& context)
       : FakeServerReaderWriter(context, MethodType::kClientStreaming) {}
 
   FakeServerReader(FakeServerReader&&) = default;
 
   using FakeServerReaderWriter::active;
-  using FakeServerReaderWriter::as_responder;
+  using FakeServerReaderWriter::as_server_call;
 
   // Functions for test use.
   using FakeServerReaderWriter::PayloadBuffer;
diff --git a/pw_rpc/raw/method.cc b/pw_rpc/raw/method.cc
index c2bf186..9909a1c 100644
--- a/pw_rpc/raw/method.cc
+++ b/pw_rpc/raw/method.cc
@@ -21,49 +21,48 @@
 
 namespace pw::rpc::internal {
 
-void RawMethod::SynchronousUnaryInvoker(const Method& method,
-                                        CallContext& call,
+void RawMethod::SynchronousUnaryInvoker(const CallContext& context,
                                         const Packet& request) {
-  RawServerResponder responder(call);
+  RawServerResponder responder(context);
   std::span payload_buffer = responder.AcquirePayloadBuffer();
 
   StatusWithSize sws =
-      static_cast<const RawMethod&>(method).function_.synchronous_unary(
-          call, request.payload(), payload_buffer);
+      static_cast<const RawMethod&>(context.method())
+          .function_.synchronous_unary(
+              context.service(), request.payload(), payload_buffer);
 
   responder.Finish(payload_buffer.first(sws.size()), sws.status())
       .IgnoreError();
 }
 
-void RawMethod::AsynchronousUnaryInvoker(const Method& method,
-                                         CallContext& call,
+void RawMethod::AsynchronousUnaryInvoker(const CallContext& context,
                                          const Packet& request) {
-  RawServerResponder responder(call);
-  static_cast<const RawMethod&>(method).function_.asynchronous_unary(
-      call, request.payload(), responder);
+  RawServerResponder responder(context);
+  static_cast<const RawMethod&>(context.method())
+      .function_.asynchronous_unary(
+          context.service(), request.payload(), responder);
 }
 
-void RawMethod::ServerStreamingInvoker(const Method& method,
-                                       CallContext& call,
+void RawMethod::ServerStreamingInvoker(const CallContext& context,
                                        const Packet& request) {
-  RawServerWriter server_writer(call);
-  static_cast<const RawMethod&>(method).function_.server_streaming(
-      call, request.payload(), server_writer);
+  RawServerWriter server_writer(context);
+  static_cast<const RawMethod&>(context.method())
+      .function_.server_streaming(
+          context.service(), request.payload(), server_writer);
 }
 
-void RawMethod::ClientStreamingInvoker(const Method& method,
-                                       CallContext& call,
+void RawMethod::ClientStreamingInvoker(const CallContext& context,
                                        const Packet&) {
-  RawServerReader reader(call);
-  static_cast<const RawMethod&>(method).function_.stream_request(call, reader);
+  RawServerReader reader(context);
+  static_cast<const RawMethod&>(context.method())
+      .function_.stream_request(context.service(), reader);
 }
 
-void RawMethod::BidirectionalStreamingInvoker(const Method& method,
-                                              CallContext& call,
+void RawMethod::BidirectionalStreamingInvoker(const CallContext& context,
                                               const Packet&) {
-  RawServerReaderWriter reader_writer(call);
-  static_cast<const RawMethod&>(method).function_.stream_request(call,
-                                                                 reader_writer);
+  RawServerReaderWriter reader_writer(context);
+  static_cast<const RawMethod&>(context.method())
+      .function_.stream_request(context.service(), reader_writer);
 }
 
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/raw/method_test.cc b/pw_rpc/raw/method_test.cc
index 7b37e9d..6279305 100644
--- a/pw_rpc/raw/method_test.cc
+++ b/pw_rpc/raw/method_test.cc
@@ -107,6 +107,8 @@
 } last_request;
 
 RawServerWriter last_writer;
+RawServerReader last_reader;
+RawServerReaderWriter last_reader_writer;
 
 void DecodeRawTestRequest(ConstByteSpan request) {
   protobuf::Decoder decoder(request);
@@ -126,16 +128,23 @@
   }
 };
 
-StatusWithSize AddFive(ServerContext&,
-                       ConstByteSpan request,
-                       ByteSpan response) {
+StatusWithSize DoNothing(ServerContext&, ConstByteSpan, ByteSpan) {
+  return StatusWithSize::Unknown();
+}
+
+void AddFive(ServerContext&,
+             ConstByteSpan request,
+             RawServerResponder& responder) {
   DecodeRawTestRequest(request);
 
+  std::array<std::byte, 32> response;
   TestResponse::MemoryEncoder test_response(response);
   EXPECT_EQ(OkStatus(), test_response.WriteValue(last_request.integer + 5));
   ConstByteSpan payload(test_response);
 
-  return StatusWithSize::Unauthenticated(payload.size());
+  ASSERT_EQ(OkStatus(),
+            responder.Finish(std::span(response).first(payload.size()),
+                             Status::Unauthenticated()));
 }
 
 void StartStream(ServerContext&,
@@ -145,26 +154,47 @@
   last_writer = std::move(writer);
 }
 
+void ClientStream(ServerContext&, RawServerReader& reader) {
+  last_reader = std::move(reader);
+}
+
+void BidirectionalStream(ServerContext&, RawServerReaderWriter& reader_writer) {
+  last_reader_writer = std::move(reader_writer);
+}
+
 class FakeService : public Service {
  public:
   FakeService(uint32_t id) : Service(id, kMethods) {}
 
-  static constexpr std::array<RawMethodUnion, 2> kMethods = {
-      RawMethod::SynchronousUnary<AddFive>(10u),
-      RawMethod::ServerStreaming<StartStream>(11u),
+  static constexpr std::array<RawMethodUnion, 5> kMethods = {
+      RawMethod::SynchronousUnary<DoNothing>(10u),
+      RawMethod::AsynchronousUnary<AddFive>(11u),
+      RawMethod::ServerStreaming<StartStream>(12u),
+      RawMethod::ClientStreaming<ClientStream>(13u),
+      RawMethod::BidirectionalStreaming<BidirectionalStream>(14u),
   };
 };
 
-TEST(RawMethod, UnaryRpc_SendsResponse) {
+constexpr const RawMethod& kSyncUnary =
+    std::get<0>(FakeService::kMethods).raw_method();
+constexpr const RawMethod& kAsyncUnary =
+    std::get<1>(FakeService::kMethods).raw_method();
+constexpr const RawMethod& kServerStream =
+    std::get<2>(FakeService::kMethods).raw_method();
+constexpr const RawMethod& kClientStream =
+    std::get<3>(FakeService::kMethods).raw_method();
+constexpr const RawMethod& kBidirectionalStream =
+    std::get<4>(FakeService::kMethods).raw_method();
+
+TEST(RawMethod, AsyncUnaryRpc_SendsResponse) {
   std::byte buffer[16];
   stream::MemoryWriter writer(buffer);
   TestRequest::StreamEncoder test_request(writer, ByteSpan());
   ASSERT_EQ(OkStatus(), test_request.WriteInteger(456));
   ASSERT_EQ(OkStatus(), test_request.WriteStatusCode(7));
 
-  const RawMethod& method = std::get<0>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-  method.Invoke(context.get(), context.request(writer.WrittenData()));
+  ServerContextForTest<FakeService> context(kAsyncUnary);
+  kAsyncUnary.Invoke(context.get(), context.request(writer.WrittenData()));
 
   EXPECT_EQ(last_request.integer, 456);
   EXPECT_EQ(last_request.status_code, 7u);
@@ -179,6 +209,18 @@
   EXPECT_EQ(value, 461);
 }
 
+TEST(RawMethod, SyncUnaryRpc_SendsResponse) {
+  ServerContextForTest<FakeService> context(kSyncUnary);
+
+  kSyncUnary.Invoke(context.get(), context.request({}));
+
+  const Packet& packet = context.output().sent_packet();
+  EXPECT_EQ(PacketType::RESPONSE, packet.type());
+  EXPECT_EQ(Status::Unknown(), packet.status());
+  EXPECT_EQ(context.service_id(), packet.service_id());
+  EXPECT_EQ(kSyncUnary.id(), packet.method_id());
+}
+
 TEST(RawMethod, ServerStreamingRpc_SendsNothingWhenInitiallyCalled) {
   std::byte buffer[16];
   stream::MemoryWriter writer(buffer);
@@ -186,10 +228,8 @@
   ASSERT_EQ(OkStatus(), test_request.WriteInteger(777));
   ASSERT_EQ(OkStatus(), test_request.WriteStatusCode(2));
 
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-
-  method.Invoke(context.get(), context.request(writer.WrittenData()));
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request(writer.WrittenData()));
 
   EXPECT_EQ(0u, context.output().packet_count());
   EXPECT_EQ(777, last_request.integer);
@@ -198,11 +238,46 @@
   EXPECT_EQ(OkStatus(), last_writer.Finish());
 }
 
-TEST(RawServerWriter, Write_SendsPreviouslyAcquiredBuffer) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
+TEST(RawMethod, ServerReader_HandlesRequests) {
+  ServerContextForTest<FakeService> context(kClientStream);
+  kClientStream.Invoke(context.get(), context.request({}));
 
-  method.Invoke(context.get(), context.request({}));
+  ConstByteSpan request;
+  last_reader.set_on_next([&request](ConstByteSpan req) { request = req; });
+
+  constexpr const char kRequestValue[] = "This is a request payload!!!";
+  std::array<std::byte, 128> encoded_request = {};
+  auto encoded = context.client_stream(std::as_bytes(std::span(kRequestValue)))
+                     .Encode(encoded_request);
+  ASSERT_EQ(OkStatus(), encoded.status());
+  ASSERT_EQ(OkStatus(),
+            context.server().ProcessPacket(*encoded, context.output()));
+
+  EXPECT_STREQ(reinterpret_cast<const char*>(request.data()), kRequestValue);
+}
+
+TEST(RawMethod, ServerReaderWriter_WritesResponses) {
+  ServerContextForTest<FakeService> context(kBidirectionalStream);
+  kBidirectionalStream.Invoke(context.get(), context.request({}));
+
+  constexpr const char kRequestValue[] = "O_o";
+  const auto kRequestBytes = std::as_bytes(std::span(kRequestValue));
+  EXPECT_EQ(OkStatus(), last_reader_writer.Write(kRequestBytes));
+
+  std::array<std::byte, 128> encoded_response = {};
+  auto encoded = context.server_stream(kRequestBytes).Encode(encoded_response);
+  ASSERT_EQ(OkStatus(), encoded.status());
+
+  ASSERT_EQ(encoded.value().size(), context.output().sent_data().size());
+  EXPECT_EQ(0,
+            std::memcmp(encoded.value().data(),
+                        context.output().sent_data().data(),
+                        encoded.value().size()));
+}
+
+TEST(RawServerWriter, Write_SendsPreviouslyAcquiredBuffer) {
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   auto buffer = last_writer.PayloadBuffer();
 
@@ -221,10 +296,8 @@
 }
 
 TEST(RawServerWriter, Write_SendsExternalBuffer) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-
-  method.Invoke(context.get(), context.request({}));
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   constexpr auto data = bytes::Array<0x0d, 0x06, 0xf0, 0x0d>();
   EXPECT_EQ(last_writer.Write(data), OkStatus());
@@ -239,10 +312,8 @@
 }
 
 TEST(RawServerWriter, Write_EmptyBuffer) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-
-  method.Invoke(context.get(), context.request({}));
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   ASSERT_EQ(last_writer.Write({}), OkStatus());
 
@@ -256,10 +327,8 @@
 }
 
 TEST(RawServerWriter, Write_Closed_ReturnsFailedPrecondition) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-
-  method.Invoke(context.get(), context.request({}));
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   EXPECT_EQ(OkStatus(), last_writer.Finish());
   constexpr auto data = bytes::Array<0x0d, 0x06, 0xf0, 0x0d>();
@@ -267,10 +336,8 @@
 }
 
 TEST(RawServerWriter, Write_BufferTooSmall_ReturnsOutOfRange) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService, 16> context(method);
-
-  method.Invoke(context.get(), context.request({}));
+  ServerContextForTest<FakeService, 16> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   constexpr auto data =
       bytes::Array<0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>();
@@ -279,10 +346,8 @@
 
 TEST(RawServerWriter,
      Destructor_ReleasesAcquiredBufferWithoutSendingAndCloses) {
-  const RawMethod& method = std::get<1>(FakeService::kMethods).raw_method();
-  ServerContextForTest<FakeService> context(method);
-
-  method.Invoke(context.get(), context.request({}));
+  ServerContextForTest<FakeService> context(kServerStream);
+  kServerStream.Invoke(context.get(), context.request({}));
 
   {
     RawServerWriter writer = std::move(last_writer);
diff --git a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
index 71677c5..b05a880 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/internal/method.h
@@ -14,10 +14,10 @@
 #pragma once
 
 #include "pw_bytes/span.h"
-#include "pw_rpc/internal/call.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/method_type.h"
 #include "pw_rpc/raw/server_reader_writer.h"
+#include "pw_rpc/service.h"
 #include "pw_status/status_with_size.h"
 
 namespace pw::rpc::internal {
@@ -38,8 +38,8 @@
   template <auto kMethod>
   static constexpr RawMethod SynchronousUnary(uint32_t id) {
     constexpr SynchronousUnaryFunction wrapper =
-        [](CallContext& call, ConstByteSpan req, ByteSpan res) {
-          return CallMethodImplFunction<kMethod>(call, req, res);
+        [](Service& service, ConstByteSpan req, ByteSpan res) {
+          return CallMethodImplFunction<kMethod>(service, req, res);
         };
     return RawMethod(
         id, SynchronousUnaryInvoker, Function{.synchronous_unary = wrapper});
@@ -48,10 +48,8 @@
   template <auto kMethod>
   static constexpr RawMethod AsynchronousUnary(uint32_t id) {
     constexpr AsynchronousUnaryFunction wrapper =
-        [](CallContext& call,
-           ConstByteSpan req,
-           RawServerResponder& responder) {
-          return CallMethodImplFunction<kMethod>(call, req, responder);
+        [](Service& service, ConstByteSpan req, RawServerResponder& responder) {
+          return CallMethodImplFunction<kMethod>(service, req, responder);
         };
     return RawMethod(
         id, AsynchronousUnaryInvoker, Function{.asynchronous_unary = wrapper});
@@ -60,8 +58,8 @@
   template <auto kMethod>
   static constexpr RawMethod ServerStreaming(uint32_t id) {
     constexpr ServerStreamingFunction wrapper =
-        [](CallContext& call, ConstByteSpan request, RawServerWriter& writer) {
-          return CallMethodImplFunction<kMethod>(call, request, writer);
+        [](Service& service, ConstByteSpan request, RawServerWriter& writer) {
+          return CallMethodImplFunction<kMethod>(service, request, writer);
         };
     return RawMethod(
         id, ServerStreamingInvoker, Function{.server_streaming = wrapper});
@@ -70,9 +68,9 @@
   template <auto kMethod>
   static constexpr RawMethod ClientStreaming(uint32_t id) {
     constexpr StreamRequestFunction wrapper =
-        [](CallContext& call, RawServerReaderWriter& reader) {
+        [](Service& service, RawServerReaderWriter& reader) {
           return CallMethodImplFunction<kMethod>(
-              call, static_cast<RawServerReader&>(reader));
+              service, static_cast<RawServerReader&>(reader));
         };
     return RawMethod(
         id, ClientStreamingInvoker, Function{.stream_request = wrapper});
@@ -81,8 +79,8 @@
   template <auto kMethod>
   static constexpr RawMethod BidirectionalStreaming(uint32_t id) {
     constexpr StreamRequestFunction wrapper =
-        [](CallContext& call, RawServerReaderWriter& reader_writer) {
-          return CallMethodImplFunction<kMethod>(call, reader_writer);
+        [](Service& service, RawServerReaderWriter& reader_writer) {
+          return CallMethodImplFunction<kMethod>(service, reader_writer);
         };
     return RawMethod(
         id, BidirectionalStreamingInvoker, Function{.stream_request = wrapper});
@@ -92,20 +90,20 @@
   static constexpr RawMethod Invalid() { return {0, InvalidInvoker, {}}; }
 
  private:
-  // Generic versions of the user-defined functions.
-  using SynchronousUnaryFunction = StatusWithSize (*)(CallContext&,
+  // Wraps the user-defined functions.
+  using SynchronousUnaryFunction = StatusWithSize (*)(Service&,
                                                       ConstByteSpan,
                                                       ByteSpan);
 
-  using AsynchronousUnaryFunction = void (*)(CallContext&,
+  using AsynchronousUnaryFunction = void (*)(Service&,
                                              ConstByteSpan,
                                              RawServerResponder&);
 
-  using ServerStreamingFunction = void (*)(CallContext&,
+  using ServerStreamingFunction = void (*)(Service&,
                                            ConstByteSpan,
                                            RawServerWriter&);
 
-  using StreamRequestFunction = void (*)(CallContext&, RawServerReaderWriter&);
+  using StreamRequestFunction = void (*)(Service&, RawServerReaderWriter&);
 
   union Function {
     SynchronousUnaryFunction synchronous_unary;
@@ -117,24 +115,18 @@
   constexpr RawMethod(uint32_t id, Invoker invoker, Function function)
       : Method(id, invoker), function_(function) {}
 
-  static void SynchronousUnaryInvoker(const Method& method,
-                                      CallContext& call,
+  static void SynchronousUnaryInvoker(const CallContext& context,
                                       const Packet& request);
 
-  static void AsynchronousUnaryInvoker(const Method& method,
-                                       CallContext& call,
+  static void AsynchronousUnaryInvoker(const CallContext& context,
                                        const Packet& request);
 
-  static void ServerStreamingInvoker(const Method& method,
-                                     CallContext& call,
+  static void ServerStreamingInvoker(const CallContext& context,
                                      const Packet& request);
 
-  static void ClientStreamingInvoker(const Method& method,
-                                     CallContext& call,
-                                     const Packet&);
+  static void ClientStreamingInvoker(const CallContext& context, const Packet&);
 
-  static void BidirectionalStreamingInvoker(const Method& method,
-                                            CallContext& call,
+  static void BidirectionalStreamingInvoker(const CallContext& context,
                                             const Packet&);
 
   // Stores the user-defined RPC.
diff --git a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
index 4abed67..f951f43 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/server_reader_writer.h
@@ -44,7 +44,7 @@
 
 // The RawServerReaderWriter is used to send and receive messages in a raw
 // bidirectional streaming RPC.
-class RawServerReaderWriter : private internal::Call {
+class RawServerReaderWriter : private internal::ServerCall {
  public:
   constexpr RawServerReaderWriter()
       : RawServerReaderWriter(MethodType::kBidirectionalStreaming) {}
@@ -72,9 +72,9 @@
   using internal::Call::channel_id;
 
   // Functions for setting the callbacks.
-  using internal::Call::set_on_client_stream_end;
   using internal::Call::set_on_error;
   using internal::Call::set_on_next;
+  using internal::ServerCall::set_on_client_stream_end;
 
   // Returns a buffer in which a response payload can be built.
   ByteSpan PayloadBuffer() { return AcquirePayloadBuffer(); }
@@ -93,11 +93,12 @@
 
  protected:
   // Constructor for derived classes to use.
-  constexpr RawServerReaderWriter(MethodType type) : internal::Call(type) {}
+  constexpr RawServerReaderWriter(MethodType type)
+      : internal::ServerCall(type) {}
 
   RawServerReaderWriter(const internal::CallContext& call,
                         MethodType type = MethodType::kBidirectionalStreaming)
-      : internal::Call(call, type) {}
+      : internal::ServerCall(call, type) {}
 
   using internal::Call::CloseAndSendResponse;
   using internal::Call::open;  // Deprecated; renamed to active()
diff --git a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
index afe9edc..2e5f9a3 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
@@ -138,8 +138,8 @@
       Base::output().clear();
 
       ByteSpan& response = Base::output().AllocateResponse();
-      auto sws = CallMethodImplFunction<kMethod>(
-          Base::call_context(), request, response);
+      auto sws =
+          CallMethodImplFunction<kMethod>(Base::service(), request, response);
       response = response.first(sws.size());
       return sws;
     } else {
diff --git a/pw_rpc/raw/server_reader_writer_test.cc b/pw_rpc/raw/server_reader_writer_test.cc
index 472f378..56068dd 100644
--- a/pw_rpc/raw/server_reader_writer_test.cc
+++ b/pw_rpc/raw/server_reader_writer_test.cc
@@ -64,6 +64,21 @@
                "hello from pw_rpc");
 }
 
+TEST(RawServerResponder, Open_MultipleTimes_CancelsPrevious) {
+  ReaderWriterTestContext ctx(MethodType::kUnary);
+
+  RawServerResponder one = RawServerResponder::Open<TestService::TestUnaryRpc>(
+      ctx.server, ctx.channel.id(), ctx.service);
+
+  ASSERT_TRUE(one.active());
+
+  RawServerResponder two = RawServerResponder::Open<TestService::TestUnaryRpc>(
+      ctx.server, ctx.channel.id(), ctx.service);
+
+  ASSERT_FALSE(one.active());
+  ASSERT_TRUE(two.active());
+}
+
 TEST(RawServerWriter, Open_ReturnsUsableWriter) {
   ReaderWriterTestContext ctx(MethodType::kServerStreaming);
   RawServerWriter call =
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index 7424288..b67742c 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -33,8 +33,9 @@
 
 Status Server::ProcessPacket(std::span<const byte> data,
                              ChannelOutput& interface) {
-  internal::Call* call;
-  Result<Packet> result = Endpoint::ProcessPacket(data, Packet::kServer, call);
+  internal::Call* base;
+  Result<Packet> result = Endpoint::ProcessPacket(data, Packet::kServer, base);
+  internal::ServerCall* const call = static_cast<internal::ServerCall*>(base);
 
   if (!result.ok()) {
     return result.status();
@@ -72,12 +73,9 @@
 
   switch (packet.type()) {
     case PacketType::REQUEST: {
-      // If the REQUEST is for an ongoing RPC, cancel it, then call it again.
-      if (call != nullptr) {
-        call->HandleError(Status::Cancelled());
-      }
-
-      internal::CallContext context(*this, *channel, *service, *method);
+      // If the REQUEST is for an ongoing RPC, the existing call will be
+      // cancelled when the new call object is created.
+      const internal::CallContext context(*this, *channel, *service, *method);
       method->Invoke(context, packet);
       break;
     }
@@ -120,7 +118,7 @@
 
 void Server::HandleClientStreamPacket(const internal::Packet& packet,
                                       internal::Channel& channel,
-                                      internal::Call* call) const {
+                                      internal::ServerCall* call) const {
   if (call == nullptr) {
     PW_LOG_DEBUG(
         "Received client stream packet for method that is not pending");
@@ -150,7 +148,7 @@
 
 void Server::HandleCancelPacket(const Packet& packet,
                                 internal::Channel& channel,
-                                internal::Call* call) const {
+                                internal::ServerCall* call) const {
   if (call == nullptr) {
     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index 7fe1e6c..7fca018 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -41,7 +41,7 @@
   TestService(uint32_t service_id)
       : Service(service_id, methods_),
         methods_{
-            TestMethod(100),
+            TestMethod(100, MethodType::kBidirectionalStreaming),
             TestMethod(200),
         } {}
 
@@ -256,23 +256,22 @@
 class BidiMethod : public BasicServer {
  protected:
   BidiMethod()
-      : call_(server_,
-              static_cast<internal::Channel&>(channels_[0]),
-              service_,
-              service_.method(100)),
-        responder_(call_) {
+      : responder_(
+            internal::CallContext(server_,
+                                  static_cast<internal::Channel&>(channels_[0]),
+                                  service_,
+                                  service_.method(100))) {
     ASSERT_TRUE(responder_.active());
   }
 
-  internal::CallContext call_;
   internal::test::FakeServerReaderWriter responder_;
 };
 
 TEST_F(BidiMethod, DuplicateCall_CancelsExistingThenCallsAgain) {
-  bool cancelled = false;
+  int cancelled = 0;
   responder_.set_on_error([&cancelled](Status error) {
     if (error.IsCancelled()) {
-      cancelled = true;
+      cancelled += 1;
     }
   });
 
@@ -282,7 +281,7 @@
   EXPECT_EQ(OkStatus(),
             server_.ProcessPacket(PacketForRpc(PacketType::REQUEST), output_));
 
-  EXPECT_TRUE(cancelled);
+  EXPECT_EQ(cancelled, 1);
   EXPECT_EQ(method.invocations(), 1u);
 }
 
diff --git a/pw_rpc/service_test.cc b/pw_rpc/service_test.cc
index fb27848..174de72 100644
--- a/pw_rpc/service_test.cc
+++ b/pw_rpc/service_test.cc
@@ -28,9 +28,7 @@
 
 namespace {
 
-void InvokeIt(const internal::Method&,
-              internal::CallContext&,
-              const internal::Packet&) {}
+void InvokeIt(const internal::CallContext&, const internal::Packet&) {}
 
 class ServiceTestMethod : public internal::Method {
  public: