pw_rpc: Common Endpoint base for Server/Client

- Create pw::rpc::internal::Endpoint class that serves as the common
  base for the Server and Client. It manages the internal
  lists of active calls and available RPC channels.
- Refactor the pw::rpc::Server to derive from Endpoint. The Client will
  be refactored to use Endpoint in a subsequent change.
- No longer send responses to corrupt packets. If packets are being
  corrupted, a response isn't likely to be helpful.
- Remove the unnecessary internal::Server class.

Change-Id: Ib02922907ee61a4e590412aa464127590bde6441
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60185
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index a326e70..c8f7096 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -43,13 +43,14 @@
     name = "server",
     srcs = [
         "call.cc",
+        "endpoint.cc",
         "public/pw_rpc/internal/call.h",
         "public/pw_rpc/internal/call_context.h",
+        "public/pw_rpc/internal/endpoint.h",
         "public/pw_rpc/internal/hash.h",
         "public/pw_rpc/internal/method.h",
         "public/pw_rpc/internal/method_lookup.h",
         "public/pw_rpc/internal/method_union.h",
-        "public/pw_rpc/internal/server.h",
         "server.cc",
         "service.cc",
     ],
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index dd6489b..bc5f7a1 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -58,13 +58,14 @@
   ]
   sources = [
     "call.cc",
+    "endpoint.cc",
     "public/pw_rpc/internal/call.h",
     "public/pw_rpc/internal/call_context.h",
+    "public/pw_rpc/internal/endpoint.h",
     "public/pw_rpc/internal/hash.h",
     "public/pw_rpc/internal/method.h",
     "public/pw_rpc/internal/method_lookup.h",
     "public/pw_rpc/internal/method_union.h",
-    "public/pw_rpc/internal/server.h",
     "server.cc",
     "service.cc",
   ]
diff --git a/pw_rpc/CMakeLists.txt b/pw_rpc/CMakeLists.txt
index 5b9edf5..9e10afd 100644
--- a/pw_rpc/CMakeLists.txt
+++ b/pw_rpc/CMakeLists.txt
@@ -25,6 +25,7 @@
 pw_add_module_library(pw_rpc.server
   SOURCES
     call.cc
+    endpoint.cc
     server.cc
     service.cc
   PUBLIC_DEPS
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index 8b13b87..939165a 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -17,7 +17,7 @@
 #include "pw_assert/check.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
 
 namespace pw::rpc::internal {
 namespace {
@@ -50,7 +50,7 @@
       type_(type),
       client_stream_state_(HasClientStream(type) ? kClientStreamOpen
                                                  : kClientStreamClosed) {
-  call_.server().RegisterResponder(*this);
+  call_.server().RegisterCall(*this);
 }
 
 Call& Call::operator=(Call&& other) {
@@ -64,7 +64,7 @@
 
   if (other.open()) {
     other.Close();
-    other.call_.server().RegisterResponder(*this);
+    other.call_.server().RegisterCall(*this);
   }
 
   // Move the rest of the member variables.
@@ -130,7 +130,7 @@
 void Call::Close() {
   PW_DCHECK(open());
 
-  call_.server().RemoveResponder(*this);
+  call_.server().UnregisterCall(*this);
   rpc_state_ = kClosed;
   client_stream_state_ = kClientStreamClosed;
 }
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index 4a058e7..cee015a 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -38,6 +38,8 @@
 namespace internal {
 namespace {
 
+constexpr Packet kPacket(PacketType::REQUEST, 99, 16, 8);
+
 using pw::rpc::internal::test::FakeServerWriter;
 using std::byte;
 
@@ -71,20 +73,16 @@
   ServerContextForTest<TestService> context(TestService::method.method());
   FakeServerWriter writer(context.get());
 
-  auto& writers = context.server().writers();
-  EXPECT_FALSE(writers.empty());
-  auto it = std::find_if(writers.begin(), writers.end(), [&](auto& w) {
-    return &w == &writer.as_responder();
-  });
-  ASSERT_NE(it, writers.end());
+  Call* call = context.server().FindCall(kPacket);
+  ASSERT_NE(call, nullptr);
+  EXPECT_EQ(static_cast<void*>(call), static_cast<void*>(&writer));
 }
 
 TEST(ServerWriter, Destruct_RemovesFromServer) {
   ServerContextForTest<TestService> context(TestService::method.method());
   { FakeServerWriter writer(context.get()); }
 
-  auto& writers = context.server().writers();
-  EXPECT_TRUE(writers.empty());
+  EXPECT_EQ(context.server().FindCall(kPacket), nullptr);
 }
 
 TEST(ServerWriter, Finish_RemovesFromServer) {
@@ -93,8 +91,7 @@
 
   EXPECT_EQ(OkStatus(), writer.Finish());
 
-  auto& writers = context.server().writers();
-  EXPECT_TRUE(writers.empty());
+  EXPECT_EQ(context.server().FindCall(kPacket), nullptr);
 }
 
 TEST(ServerWriter, Finish_SendsResponse) {
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
new file mode 100644
index 0000000..a3886f5
--- /dev/null
+++ b/pw_rpc/endpoint.cc
@@ -0,0 +1,88 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/endpoint.h"
+
+#include "pw_log/log.h"
+
+namespace pw::rpc::internal {
+
+Endpoint::~Endpoint() {
+  // Since the calls remove themselves from the Endpoint in
+  // CloseAndSendResponse(), close responders until no responders remain.
+  while (!calls_.empty()) {
+    calls_.front().CloseAndSendResponse(OkStatus()).IgnoreError();
+  }
+}
+
+Result<Packet> Endpoint::ProcessPacket(std::span<const std::byte> data,
+                                       Packet::Destination destination,
+                                       Call*& ongoing_call) {
+  Result<Packet> result = Packet::FromBuffer(data);
+
+  if (!result.ok()) {
+    PW_LOG_WARN("Failed to decode pw_rpc packet");
+    return Status::DataLoss();
+  }
+
+  Packet& packet = *result;
+
+  if (packet.channel_id() == Channel::kUnassignedChannelId ||
+      packet.service_id() == 0 || packet.method_id() == 0) {
+    PW_LOG_WARN("Received malformed pw_rpc packet");
+    return Status::DataLoss();
+  }
+
+  if (packet.destination() != destination) {
+    return Status::InvalidArgument();
+  }
+
+  // Find an existing reader/writer for this RPC, if any.
+  ongoing_call = FindCall(packet);
+
+  return result;
+}
+
+Call* Endpoint::FindCall(const Packet& packet) {
+  for (Call& call : calls_) {
+    if (packet.channel_id() == call.channel_id() &&
+        packet.service_id() == call.service_id() &&
+        packet.method_id() == call.method_id()) {
+      return &call;
+    }
+  }
+  return nullptr;
+}
+
+Channel* Endpoint::GetInternalChannel(uint32_t id) const {
+  for (Channel& c : channels_) {
+    if (c.id() == id) {
+      return &c;
+    }
+  }
+  return nullptr;
+}
+
+Channel* Endpoint::AssignChannel(uint32_t id, ChannelOutput& interface) {
+  internal::Channel* channel =
+      GetInternalChannel(Channel::kUnassignedChannelId);
+  if (channel == nullptr) {
+    return nullptr;
+  }
+
+  *channel = Channel(id, &interface);
+  return channel;
+}
+
+}  // namespace pw::rpc::internal
diff --git a/pw_rpc/method_test.cc b/pw_rpc/method_test.cc
index e87c74d..a639260 100644
--- a/pw_rpc/method_test.cc
+++ b/pw_rpc/method_test.cc
@@ -18,7 +18,6 @@
 
 #include "gtest/gtest.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
 #include "pw_rpc/internal/test_method.h"
 #include "pw_rpc/method_type.h"
 #include "pw_rpc/server.h"
@@ -50,11 +49,10 @@
 
 TEST(Method, Invoke) {
   Channel channel(123, nullptr);
-  rpc::Server server(std::span(static_cast<rpc::Channel*>(&channel), 1));
+  Server server(std::span(static_cast<rpc::Channel*>(&channel), 1));
   TestService service;
 
-  CallContext call(
-      static_cast<internal::Server&>(server), channel, service, kTestMethod);
+  CallContext call(server, channel, service, kTestMethod);
   Packet empty_packet;
 
   EXPECT_EQ(kTestMethod.invocations(), 0u);
diff --git a/pw_rpc/public/pw_rpc/internal/call_context.h b/pw_rpc/public/pw_rpc/internal/call_context.h
index adc28a2..3a709d5 100644
--- a/pw_rpc/public/pw_rpc/internal/call_context.h
+++ b/pw_rpc/public/pw_rpc/internal/call_context.h
@@ -23,11 +23,11 @@
 
 class ServerContext;
 class Service;
+class Server;
 
 namespace internal {
 
 class Method;
-class Server;
 
 // Collects information for an ongoing RPC being processed by the server.
 // The Server creates a CallContext object to represent a method invocation. The
diff --git a/pw_rpc/public/pw_rpc/internal/endpoint.h b/pw_rpc/public/pw_rpc/internal/endpoint.h
new file mode 100644
index 0000000..76b2fd6
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/endpoint.h
@@ -0,0 +1,76 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <span>
+
+#include "pw_containers/intrusive_list.h"
+#include "pw_result/result.h"
+#include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/packet.h"
+
+namespace pw::rpc::internal {
+
+// Manages a list of channels and a list of ongoing calls for either a server or
+// client.
+//
+// For clients, calls start when they send a REQUEST packet to a server. For
+// servers, calls start when the REQUEST packet is received. In either case,
+// calls add themselves to the Endpoint's list when they're started and
+// remove themselves when they complete. Calls do this through their associated
+// Server or Client object, which derive from Endpoint.
+class Endpoint {
+ public:
+  ~Endpoint();
+
+  // Finds an RPC Channel with this ID or nullptr if none matches.
+  rpc::Channel* GetChannel(uint32_t id) const { return GetInternalChannel(id); }
+
+ protected:
+  constexpr Endpoint(std::span<rpc::Channel> channels)
+      : channels_(static_cast<internal::Channel*>(channels.data()),
+                  channels.size()) {}
+
+  // Parses an RPC packet and sets ongoing_call to the matching call, if any.
+  // Returns the parsed packet or an error.
+  Result<Packet> ProcessPacket(std::span<const std::byte> data,
+                               Packet::Destination packet,
+                               Call*& ongoing_call);
+
+  // Finds a call object for an ongoing call associated with this packet, if
+  // any. Returns nullptr if no matching call exists.
+  Call* FindCall(const Packet& packet);
+
+  // Adds a call to the internal call registry. This immediately registers the
+  // call and does NOT check if the call is unique.
+  void RegisterCall(Call& call) { calls_.push_front(call); }
+
+  // Removes the provided call from the call registry.
+  void UnregisterCall(const Call& call) { calls_.remove(call); }
+
+  // Finds an internal:::Channel with this ID or nullptr if none matches.
+  Channel* GetInternalChannel(uint32_t id) const;
+
+  // Creates a channel with the provided ID and ChannelOutput, if a channel slot
+  // is available. Returns a pointer to the channel if one is created, nullptr
+  // otherwise.
+  Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
+
+ private:
+  std::span<Channel> channels_;
+  IntrusiveList<Call> calls_;
+};
+
+}  // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/server.h b/pw_rpc/public/pw_rpc/internal/server.h
deleted file mode 100644
index 4556509..0000000
--- a/pw_rpc/public/pw_rpc/internal/server.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2020 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include "pw_rpc/server.h"
-
-namespace pw::rpc::internal {
-
-class Server : public rpc::Server {
- public:
-  Server() = delete;
-
-  void RegisterResponder(Call& writer) { writers().push_front(writer); }
-
-  void RemoveResponder(const Call& writer) { writers().remove(writer); }
-};
-
-}  // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/test_method_context.h b/pw_rpc/public/pw_rpc/internal/test_method_context.h
index a7b7261..61aa735 100644
--- a/pw_rpc/public/pw_rpc/internal/test_method_context.h
+++ b/pw_rpc/public/pw_rpc/internal/test_method_context.h
@@ -20,7 +20,7 @@
 #include "pw_rpc/internal/fake_channel_output.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
 
 namespace pw::rpc::internal::test {
 
@@ -98,7 +98,7 @@
         channel_(Channel::Create<123>(&output_)),
         server_(std::span(&channel_, 1)),
         service_(std::forward<ServiceArgs>(service_args)...),
-        context_(static_cast<internal::Server&>(server_),
+        context_(server_,
                  static_cast<internal::Channel&>(channel_),
                  service_,
                  method) {
diff --git a/pw_rpc/public/pw_rpc/internal/test_utils.h b/pw_rpc/public/pw_rpc/internal/test_utils.h
index 501ec0d..868e140 100644
--- a/pw_rpc/public/pw_rpc/internal/test_utils.h
+++ b/pw_rpc/public/pw_rpc/internal/test_utils.h
@@ -26,7 +26,7 @@
 #include "pw_rpc/internal/channel.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
 
 namespace pw::rpc::internal {
 
@@ -75,10 +75,10 @@
   Status send_status_;
 };
 
-// Version of the internal::Server with extra methods exposed for testing.
+// Version of the Server with extra methods exposed for testing.
 class TestServer : public Server {
  public:
-  using internal::Server::writers;
+  using Server::FindCall;
 };
 
 template <typename Service,
diff --git a/pw_rpc/public/pw_rpc/server.h b/pw_rpc/public/pw_rpc/server.h
index f3ffe88..f3c3344 100644
--- a/pw_rpc/public/pw_rpc/server.h
+++ b/pw_rpc/public/pw_rpc/server.h
@@ -21,19 +21,16 @@
 #include "pw_rpc/channel.h"
 #include "pw_rpc/internal/call.h"
 #include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/endpoint.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/service.h"
 #include "pw_status/status.h"
 
 namespace pw::rpc {
 
-class Server {
+class Server : public internal::Endpoint {
  public:
-  constexpr Server(std::span<Channel> channels)
-      : channels_(static_cast<internal::Channel*>(channels.data()),
-                  channels.size()) {}
-
-  ~Server();
+  constexpr Server(std::span<Channel> channels) : Endpoint(channels) {}
 
   // Registers a service with the server. This should not be called directly
   // with a Service; instead, use a generated class which inherits from it.
@@ -50,32 +47,21 @@
   Status ProcessPacket(std::span<const std::byte> packet,
                        ChannelOutput& interface);
 
-  constexpr size_t channel_count() const { return channels_.size(); }
-
- protected:
-  IntrusiveList<internal::Call>& writers() { return responders_; }
-
  private:
+  friend class internal::Call;
+
   std::tuple<Service*, const internal::Method*> FindMethod(
       const internal::Packet& packet);
 
-  using ResponderIterator = IntrusiveList<internal::Call>::iterator;
-
   void HandleClientStreamPacket(const internal::Packet& packet,
                                 internal::Channel& channel,
-                                ResponderIterator responder) const;
+                                internal::Call* call) const;
+
   void HandleCancelPacket(const internal::Packet& packet,
                           internal::Channel& channel,
-                          ResponderIterator responder) const;
+                          internal::Call* call) const;
 
-  internal::Channel* FindChannel(uint32_t id) const;
-  internal::Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
-
-  std::span<internal::Channel> channels_;
   IntrusiveList<Service> services_;
-
-  // Any asynchronously handled RPCs.
-  IntrusiveList<internal::Call> responders_;
 };
 
 }  // namespace pw::rpc
diff --git a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
index 33b2c84..e0e098d 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
@@ -22,7 +22,6 @@
 #include "pw_rpc/internal/hash.h"
 #include "pw_rpc/internal/method_lookup.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
 #include "pw_rpc/internal/test_method_context.h"
 #include "pw_rpc/raw/fake_channel_output.h"
 #include "pw_rpc/raw/internal/method.h"
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index 4be0881..7424288 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -17,8 +17,8 @@
 #include <algorithm>
 
 #include "pw_log/log.h"
+#include "pw_rpc/internal/endpoint.h"
 #include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
 #include "pw_rpc/server_context.h"
 
 namespace pw::rpc {
@@ -29,58 +29,20 @@
 using internal::Packet;
 using internal::PacketType;
 
-bool DecodePacket(ChannelOutput& interface,
-                  std::span<const byte> data,
-                  Packet& packet) {
-  Result<Packet> result = Packet::FromBuffer(data);
-  if (!result.ok()) {
-    PW_LOG_WARN("Failed to decode packet on interface %s", interface.name());
-    return false;
-  }
-
-  packet = result.value();
-
-  // If the packet is malformed, don't try to process it.
-  if (packet.channel_id() == Channel::kUnassignedChannelId ||
-      packet.service_id() == 0 || packet.method_id() == 0) {
-    PW_LOG_WARN("Received incomplete packet on interface %s", interface.name());
-
-    // Only send an ERROR response if a valid channel ID was provided.
-    if (packet.channel_id() != Channel::kUnassignedChannelId) {
-      internal::Channel temp_channel(packet.channel_id(), &interface);
-      temp_channel.Send(Packet::ServerError(packet, Status::DataLoss()))
-          .IgnoreError();  // TODO(pwbug/387): Handle Status properly
-    }
-    return false;
-  }
-
-  return true;
-}
-
 }  // namespace
 
-Server::~Server() {
-  // Since the responders remove themselves from the server in
-  // CloseAndSendResponse(), close responders until no responders remain.
-  while (!responders_.empty()) {
-    responders_.front()
-        .CloseAndSendResponse(OkStatus())
-        .IgnoreError();  // TODO(pwbug/387): Handle Status properly
-  }
-}
-
 Status Server::ProcessPacket(std::span<const byte> data,
                              ChannelOutput& interface) {
-  Packet packet;
-  if (!DecodePacket(interface, data, packet)) {
-    return Status::DataLoss();
+  internal::Call* call;
+  Result<Packet> result = Endpoint::ProcessPacket(data, Packet::kServer, call);
+
+  if (!result.ok()) {
+    return result.status();
   }
 
-  if (packet.destination() != Packet::kServer) {
-    return Status::InvalidArgument();
-  }
+  Packet& packet = *result;
 
-  internal::Channel* channel = FindChannel(packet.channel_id());
+  internal::Channel* channel = GetInternalChannel(packet.channel_id());
   if (channel == nullptr) {
     // If the requested channel doesn't exist, try to dynamically assign one.
     channel = AssignChannel(packet.channel_id(), interface);
@@ -108,39 +70,30 @@
     return OkStatus();  // OK since the packet was handled.
   }
 
-  // Find an existing reader/writer for this RPC, if any.
-  auto responder =
-      std::find_if(responders_.begin(), responders_.end(), [&](auto& w) {
-        return packet.channel_id() == w.channel_id() &&
-               packet.service_id() == w.service_id() &&
-               packet.method_id() == w.method_id();
-      });
-
   switch (packet.type()) {
     case PacketType::REQUEST: {
       // If the REQUEST is for an ongoing RPC, cancel it, then call it again.
-      if (responder != responders_.end()) {
-        responder->HandleError(Status::Cancelled());
+      if (call != nullptr) {
+        call->HandleError(Status::Cancelled());
       }
 
-      internal::CallContext call(
-          static_cast<internal::Server&>(*this), *channel, *service, *method);
-      method->Invoke(call, packet);
+      internal::CallContext context(*this, *channel, *service, *method);
+      method->Invoke(context, packet);
       break;
     }
     case PacketType::CLIENT_STREAM:
-      HandleClientStreamPacket(packet, *channel, responder);
+      HandleClientStreamPacket(packet, *channel, call);
       break;
     case PacketType::CLIENT_ERROR:
-      if (responder != responders_.end()) {
-        responder->HandleError(packet.status());
+      if (call != nullptr) {
+        call->HandleError(packet.status());
       }
       break;
     case PacketType::CANCEL:
-      HandleCancelPacket(packet, *channel, responder);
+      HandleCancelPacket(packet, *channel, call);
       break;
     case PacketType::CLIENT_STREAM_END:
-      HandleClientStreamPacket(packet, *channel, responder);
+      HandleClientStreamPacket(packet, *channel, call);
       break;
     default:
       channel->Send(Packet::ServerError(packet, Status::Unimplemented()))
@@ -167,8 +120,8 @@
 
 void Server::HandleClientStreamPacket(const internal::Packet& packet,
                                       internal::Channel& channel,
-                                      ResponderIterator responder) const {
-  if (responder == responders_.end()) {
+                                      internal::Call* call) const {
+  if (call == nullptr) {
     PW_LOG_DEBUG(
         "Received client stream packet for method that is not pending");
     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
@@ -176,55 +129,35 @@
     return;
   }
 
-  if (!responder->has_client_stream()) {
+  if (!call->has_client_stream()) {
     channel.Send(Packet::ServerError(packet, Status::InvalidArgument()))
         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
     return;
   }
 
-  if (!responder->client_stream_open()) {
+  if (!call->client_stream_open()) {
     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
     return;
   }
 
   if (packet.type() == PacketType::CLIENT_STREAM) {
-    responder->HandleClientStream(packet.payload());
+    call->HandleClientStream(packet.payload());
   } else {  // Handle PacketType::CLIENT_STREAM_END.
-    responder->EndClientStream();
+    call->EndClientStream();
   }
 }
 
 void Server::HandleCancelPacket(const Packet& packet,
                                 internal::Channel& channel,
-                                ResponderIterator responder) const {
-  if (responder == responders_.end()) {
+                                internal::Call* call) const {
+  if (call == nullptr) {
     channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
     PW_LOG_DEBUG("Received CANCEL packet for method that is not pending");
   } else {
-    responder->HandleError(Status::Cancelled());
+    call->HandleError(Status::Cancelled());
   }
 }
 
-internal::Channel* Server::FindChannel(uint32_t id) const {
-  for (internal::Channel& c : channels_) {
-    if (c.id() == id) {
-      return &c;
-    }
-  }
-  return nullptr;
-}
-
-internal::Channel* Server::AssignChannel(uint32_t id,
-                                         ChannelOutput& interface) {
-  internal::Channel* channel = FindChannel(Channel::kUnassignedChannelId);
-  if (channel == nullptr) {
-    return nullptr;
-  }
-
-  *channel = internal::Channel(id, &interface);
-  return channel;
-}
-
 }  // namespace pw::rpc
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index 8f68de2..dc23e59 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -143,22 +143,20 @@
   EXPECT_EQ(output_.packet_count(), 0u);
 }
 
-TEST_F(BasicServer, ProcessPacket_NoService_SendsDataLoss) {
+TEST_F(BasicServer, ProcessPacket_NoService_SendsNothing) {
   EXPECT_EQ(Status::DataLoss(),
             server_.ProcessPacket(EncodeRequest(PacketType::REQUEST, 1, 0, 101),
                                   output_));
 
-  EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
-  EXPECT_EQ(output_.sent_packet().status(), Status::DataLoss());
+  EXPECT_EQ(output_.packet_count(), 0u);
 }
 
-TEST_F(BasicServer, ProcessPacket_NoMethod_SendsDataLoss) {
+TEST_F(BasicServer, ProcessPacket_NoMethod_SendsNothing) {
   EXPECT_EQ(Status::DataLoss(),
             server_.ProcessPacket(EncodeRequest(PacketType::REQUEST, 1, 42, 0),
                                   output_));
 
-  EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
-  EXPECT_EQ(output_.sent_packet().status(), Status::DataLoss());
+  EXPECT_EQ(output_.packet_count(), 0u);
 }
 
 TEST_F(BasicServer, ProcessPacket_InvalidMethod_NothingIsInvoked) {
@@ -258,7 +256,7 @@
 class BidiMethod : public BasicServer {
  protected:
   BidiMethod()
-      : call_(static_cast<internal::Server&>(server_),
+      : call_(server_,
               static_cast<internal::Channel&>(channels_[0]),
               service_,
               service_.method(100)),
@@ -408,7 +406,7 @@
 class ServerStreamingMethod : public BasicServer {
  protected:
   ServerStreamingMethod()
-      : call_(static_cast<internal::Server&>(server_),
+      : call_(server_,
               static_cast<internal::Channel&>(channels_[0]),
               service_,
               service_.method(100)),