pw_rpc: Common Endpoint base for Server/Client
- Create pw::rpc::internal::Endpoint class that serves as the common
base for the Server and Client. It manages the internal
lists of active calls and available RPC channels.
- Refactor the pw::rpc::Server to derive from Endpoint. The Client will
be refactored to use Endpoint in a subsequent change.
- No longer send responses to corrupt packets. If packets are being
corrupted, a response isn't likely to be helpful.
- Remove the unnecessary internal::Server class.
Change-Id: Ib02922907ee61a4e590412aa464127590bde6441
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60185
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index a326e70..c8f7096 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -43,13 +43,14 @@
name = "server",
srcs = [
"call.cc",
+ "endpoint.cc",
"public/pw_rpc/internal/call.h",
"public/pw_rpc/internal/call_context.h",
+ "public/pw_rpc/internal/endpoint.h",
"public/pw_rpc/internal/hash.h",
"public/pw_rpc/internal/method.h",
"public/pw_rpc/internal/method_lookup.h",
"public/pw_rpc/internal/method_union.h",
- "public/pw_rpc/internal/server.h",
"server.cc",
"service.cc",
],
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index dd6489b..bc5f7a1 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -58,13 +58,14 @@
]
sources = [
"call.cc",
+ "endpoint.cc",
"public/pw_rpc/internal/call.h",
"public/pw_rpc/internal/call_context.h",
+ "public/pw_rpc/internal/endpoint.h",
"public/pw_rpc/internal/hash.h",
"public/pw_rpc/internal/method.h",
"public/pw_rpc/internal/method_lookup.h",
"public/pw_rpc/internal/method_union.h",
- "public/pw_rpc/internal/server.h",
"server.cc",
"service.cc",
]
diff --git a/pw_rpc/CMakeLists.txt b/pw_rpc/CMakeLists.txt
index 5b9edf5..9e10afd 100644
--- a/pw_rpc/CMakeLists.txt
+++ b/pw_rpc/CMakeLists.txt
@@ -25,6 +25,7 @@
pw_add_module_library(pw_rpc.server
SOURCES
call.cc
+ endpoint.cc
server.cc
service.cc
PUBLIC_DEPS
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index 8b13b87..939165a 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -17,7 +17,7 @@
#include "pw_assert/check.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
namespace pw::rpc::internal {
namespace {
@@ -50,7 +50,7 @@
type_(type),
client_stream_state_(HasClientStream(type) ? kClientStreamOpen
: kClientStreamClosed) {
- call_.server().RegisterResponder(*this);
+ call_.server().RegisterCall(*this);
}
Call& Call::operator=(Call&& other) {
@@ -64,7 +64,7 @@
if (other.open()) {
other.Close();
- other.call_.server().RegisterResponder(*this);
+ other.call_.server().RegisterCall(*this);
}
// Move the rest of the member variables.
@@ -130,7 +130,7 @@
void Call::Close() {
PW_DCHECK(open());
- call_.server().RemoveResponder(*this);
+ call_.server().UnregisterCall(*this);
rpc_state_ = kClosed;
client_stream_state_ = kClientStreamClosed;
}
diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc
index 4a058e7..cee015a 100644
--- a/pw_rpc/call_test.cc
+++ b/pw_rpc/call_test.cc
@@ -38,6 +38,8 @@
namespace internal {
namespace {
+constexpr Packet kPacket(PacketType::REQUEST, 99, 16, 8);
+
using pw::rpc::internal::test::FakeServerWriter;
using std::byte;
@@ -71,20 +73,16 @@
ServerContextForTest<TestService> context(TestService::method.method());
FakeServerWriter writer(context.get());
- auto& writers = context.server().writers();
- EXPECT_FALSE(writers.empty());
- auto it = std::find_if(writers.begin(), writers.end(), [&](auto& w) {
- return &w == &writer.as_responder();
- });
- ASSERT_NE(it, writers.end());
+ Call* call = context.server().FindCall(kPacket);
+ ASSERT_NE(call, nullptr);
+ EXPECT_EQ(static_cast<void*>(call), static_cast<void*>(&writer));
}
TEST(ServerWriter, Destruct_RemovesFromServer) {
ServerContextForTest<TestService> context(TestService::method.method());
{ FakeServerWriter writer(context.get()); }
- auto& writers = context.server().writers();
- EXPECT_TRUE(writers.empty());
+ EXPECT_EQ(context.server().FindCall(kPacket), nullptr);
}
TEST(ServerWriter, Finish_RemovesFromServer) {
@@ -93,8 +91,7 @@
EXPECT_EQ(OkStatus(), writer.Finish());
- auto& writers = context.server().writers();
- EXPECT_TRUE(writers.empty());
+ EXPECT_EQ(context.server().FindCall(kPacket), nullptr);
}
TEST(ServerWriter, Finish_SendsResponse) {
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
new file mode 100644
index 0000000..a3886f5
--- /dev/null
+++ b/pw_rpc/endpoint.cc
@@ -0,0 +1,88 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/internal/endpoint.h"
+
+#include "pw_log/log.h"
+
+namespace pw::rpc::internal {
+
+Endpoint::~Endpoint() {
+ // Since the calls remove themselves from the Endpoint in
+ // CloseAndSendResponse(), close responders until no responders remain.
+ while (!calls_.empty()) {
+ calls_.front().CloseAndSendResponse(OkStatus()).IgnoreError();
+ }
+}
+
+Result<Packet> Endpoint::ProcessPacket(std::span<const std::byte> data,
+ Packet::Destination destination,
+ Call*& ongoing_call) {
+ Result<Packet> result = Packet::FromBuffer(data);
+
+ if (!result.ok()) {
+ PW_LOG_WARN("Failed to decode pw_rpc packet");
+ return Status::DataLoss();
+ }
+
+ Packet& packet = *result;
+
+ if (packet.channel_id() == Channel::kUnassignedChannelId ||
+ packet.service_id() == 0 || packet.method_id() == 0) {
+ PW_LOG_WARN("Received malformed pw_rpc packet");
+ return Status::DataLoss();
+ }
+
+ if (packet.destination() != destination) {
+ return Status::InvalidArgument();
+ }
+
+ // Find an existing reader/writer for this RPC, if any.
+ ongoing_call = FindCall(packet);
+
+ return result;
+}
+
+Call* Endpoint::FindCall(const Packet& packet) {
+ for (Call& call : calls_) {
+ if (packet.channel_id() == call.channel_id() &&
+ packet.service_id() == call.service_id() &&
+ packet.method_id() == call.method_id()) {
+ return &call;
+ }
+ }
+ return nullptr;
+}
+
+Channel* Endpoint::GetInternalChannel(uint32_t id) const {
+ for (Channel& c : channels_) {
+ if (c.id() == id) {
+ return &c;
+ }
+ }
+ return nullptr;
+}
+
+Channel* Endpoint::AssignChannel(uint32_t id, ChannelOutput& interface) {
+ internal::Channel* channel =
+ GetInternalChannel(Channel::kUnassignedChannelId);
+ if (channel == nullptr) {
+ return nullptr;
+ }
+
+ *channel = Channel(id, &interface);
+ return channel;
+}
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/method_test.cc b/pw_rpc/method_test.cc
index e87c74d..a639260 100644
--- a/pw_rpc/method_test.cc
+++ b/pw_rpc/method_test.cc
@@ -18,7 +18,6 @@
#include "gtest/gtest.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
#include "pw_rpc/internal/test_method.h"
#include "pw_rpc/method_type.h"
#include "pw_rpc/server.h"
@@ -50,11 +49,10 @@
TEST(Method, Invoke) {
Channel channel(123, nullptr);
- rpc::Server server(std::span(static_cast<rpc::Channel*>(&channel), 1));
+ Server server(std::span(static_cast<rpc::Channel*>(&channel), 1));
TestService service;
- CallContext call(
- static_cast<internal::Server&>(server), channel, service, kTestMethod);
+ CallContext call(server, channel, service, kTestMethod);
Packet empty_packet;
EXPECT_EQ(kTestMethod.invocations(), 0u);
diff --git a/pw_rpc/public/pw_rpc/internal/call_context.h b/pw_rpc/public/pw_rpc/internal/call_context.h
index adc28a2..3a709d5 100644
--- a/pw_rpc/public/pw_rpc/internal/call_context.h
+++ b/pw_rpc/public/pw_rpc/internal/call_context.h
@@ -23,11 +23,11 @@
class ServerContext;
class Service;
+class Server;
namespace internal {
class Method;
-class Server;
// Collects information for an ongoing RPC being processed by the server.
// The Server creates a CallContext object to represent a method invocation. The
diff --git a/pw_rpc/public/pw_rpc/internal/endpoint.h b/pw_rpc/public/pw_rpc/internal/endpoint.h
new file mode 100644
index 0000000..76b2fd6
--- /dev/null
+++ b/pw_rpc/public/pw_rpc/internal/endpoint.h
@@ -0,0 +1,76 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <span>
+
+#include "pw_containers/intrusive_list.h"
+#include "pw_result/result.h"
+#include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/packet.h"
+
+namespace pw::rpc::internal {
+
+// Manages a list of channels and a list of ongoing calls for either a server or
+// client.
+//
+// For clients, calls start when they send a REQUEST packet to a server. For
+// servers, calls start when the REQUEST packet is received. In either case,
+// calls add themselves to the Endpoint's list when they're started and
+// remove themselves when they complete. Calls do this through their associated
+// Server or Client object, which derive from Endpoint.
+class Endpoint {
+ public:
+ ~Endpoint();
+
+ // Finds an RPC Channel with this ID or nullptr if none matches.
+ rpc::Channel* GetChannel(uint32_t id) const { return GetInternalChannel(id); }
+
+ protected:
+ constexpr Endpoint(std::span<rpc::Channel> channels)
+ : channels_(static_cast<internal::Channel*>(channels.data()),
+ channels.size()) {}
+
+ // Parses an RPC packet and sets ongoing_call to the matching call, if any.
+ // Returns the parsed packet or an error.
+ Result<Packet> ProcessPacket(std::span<const std::byte> data,
+ Packet::Destination packet,
+ Call*& ongoing_call);
+
+ // Finds a call object for an ongoing call associated with this packet, if
+ // any. Returns nullptr if no matching call exists.
+ Call* FindCall(const Packet& packet);
+
+ // Adds a call to the internal call registry. This immediately registers the
+ // call and does NOT check if the call is unique.
+ void RegisterCall(Call& call) { calls_.push_front(call); }
+
+ // Removes the provided call from the call registry.
+ void UnregisterCall(const Call& call) { calls_.remove(call); }
+
+ // Finds an internal:::Channel with this ID or nullptr if none matches.
+ Channel* GetInternalChannel(uint32_t id) const;
+
+ // Creates a channel with the provided ID and ChannelOutput, if a channel slot
+ // is available. Returns a pointer to the channel if one is created, nullptr
+ // otherwise.
+ Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
+
+ private:
+ std::span<Channel> channels_;
+ IntrusiveList<Call> calls_;
+};
+
+} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/server.h b/pw_rpc/public/pw_rpc/internal/server.h
deleted file mode 100644
index 4556509..0000000
--- a/pw_rpc/public/pw_rpc/internal/server.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2020 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include "pw_rpc/server.h"
-
-namespace pw::rpc::internal {
-
-class Server : public rpc::Server {
- public:
- Server() = delete;
-
- void RegisterResponder(Call& writer) { writers().push_front(writer); }
-
- void RemoveResponder(const Call& writer) { writers().remove(writer); }
-};
-
-} // namespace pw::rpc::internal
diff --git a/pw_rpc/public/pw_rpc/internal/test_method_context.h b/pw_rpc/public/pw_rpc/internal/test_method_context.h
index a7b7261..61aa735 100644
--- a/pw_rpc/public/pw_rpc/internal/test_method_context.h
+++ b/pw_rpc/public/pw_rpc/internal/test_method_context.h
@@ -20,7 +20,7 @@
#include "pw_rpc/internal/fake_channel_output.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
namespace pw::rpc::internal::test {
@@ -98,7 +98,7 @@
channel_(Channel::Create<123>(&output_)),
server_(std::span(&channel_, 1)),
service_(std::forward<ServiceArgs>(service_args)...),
- context_(static_cast<internal::Server&>(server_),
+ context_(server_,
static_cast<internal::Channel&>(channel_),
service_,
method) {
diff --git a/pw_rpc/public/pw_rpc/internal/test_utils.h b/pw_rpc/public/pw_rpc/internal/test_utils.h
index 501ec0d..868e140 100644
--- a/pw_rpc/public/pw_rpc/internal/test_utils.h
+++ b/pw_rpc/public/pw_rpc/internal/test_utils.h
@@ -26,7 +26,7 @@
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
+#include "pw_rpc/server.h"
namespace pw::rpc::internal {
@@ -75,10 +75,10 @@
Status send_status_;
};
-// Version of the internal::Server with extra methods exposed for testing.
+// Version of the Server with extra methods exposed for testing.
class TestServer : public Server {
public:
- using internal::Server::writers;
+ using Server::FindCall;
};
template <typename Service,
diff --git a/pw_rpc/public/pw_rpc/server.h b/pw_rpc/public/pw_rpc/server.h
index f3ffe88..f3c3344 100644
--- a/pw_rpc/public/pw_rpc/server.h
+++ b/pw_rpc/public/pw_rpc/server.h
@@ -21,19 +21,16 @@
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/call.h"
#include "pw_rpc/internal/channel.h"
+#include "pw_rpc/internal/endpoint.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/service.h"
#include "pw_status/status.h"
namespace pw::rpc {
-class Server {
+class Server : public internal::Endpoint {
public:
- constexpr Server(std::span<Channel> channels)
- : channels_(static_cast<internal::Channel*>(channels.data()),
- channels.size()) {}
-
- ~Server();
+ constexpr Server(std::span<Channel> channels) : Endpoint(channels) {}
// Registers a service with the server. This should not be called directly
// with a Service; instead, use a generated class which inherits from it.
@@ -50,32 +47,21 @@
Status ProcessPacket(std::span<const std::byte> packet,
ChannelOutput& interface);
- constexpr size_t channel_count() const { return channels_.size(); }
-
- protected:
- IntrusiveList<internal::Call>& writers() { return responders_; }
-
private:
+ friend class internal::Call;
+
std::tuple<Service*, const internal::Method*> FindMethod(
const internal::Packet& packet);
- using ResponderIterator = IntrusiveList<internal::Call>::iterator;
-
void HandleClientStreamPacket(const internal::Packet& packet,
internal::Channel& channel,
- ResponderIterator responder) const;
+ internal::Call* call) const;
+
void HandleCancelPacket(const internal::Packet& packet,
internal::Channel& channel,
- ResponderIterator responder) const;
+ internal::Call* call) const;
- internal::Channel* FindChannel(uint32_t id) const;
- internal::Channel* AssignChannel(uint32_t id, ChannelOutput& interface);
-
- std::span<internal::Channel> channels_;
IntrusiveList<Service> services_;
-
- // Any asynchronously handled RPCs.
- IntrusiveList<internal::Call> responders_;
};
} // namespace pw::rpc
diff --git a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
index 33b2c84..e0e098d 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/test_method_context.h
@@ -22,7 +22,6 @@
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/internal/method_lookup.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
#include "pw_rpc/internal/test_method_context.h"
#include "pw_rpc/raw/fake_channel_output.h"
#include "pw_rpc/raw/internal/method.h"
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index 4be0881..7424288 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -17,8 +17,8 @@
#include <algorithm>
#include "pw_log/log.h"
+#include "pw_rpc/internal/endpoint.h"
#include "pw_rpc/internal/packet.h"
-#include "pw_rpc/internal/server.h"
#include "pw_rpc/server_context.h"
namespace pw::rpc {
@@ -29,58 +29,20 @@
using internal::Packet;
using internal::PacketType;
-bool DecodePacket(ChannelOutput& interface,
- std::span<const byte> data,
- Packet& packet) {
- Result<Packet> result = Packet::FromBuffer(data);
- if (!result.ok()) {
- PW_LOG_WARN("Failed to decode packet on interface %s", interface.name());
- return false;
- }
-
- packet = result.value();
-
- // If the packet is malformed, don't try to process it.
- if (packet.channel_id() == Channel::kUnassignedChannelId ||
- packet.service_id() == 0 || packet.method_id() == 0) {
- PW_LOG_WARN("Received incomplete packet on interface %s", interface.name());
-
- // Only send an ERROR response if a valid channel ID was provided.
- if (packet.channel_id() != Channel::kUnassignedChannelId) {
- internal::Channel temp_channel(packet.channel_id(), &interface);
- temp_channel.Send(Packet::ServerError(packet, Status::DataLoss()))
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- }
- return false;
- }
-
- return true;
-}
-
} // namespace
-Server::~Server() {
- // Since the responders remove themselves from the server in
- // CloseAndSendResponse(), close responders until no responders remain.
- while (!responders_.empty()) {
- responders_.front()
- .CloseAndSendResponse(OkStatus())
- .IgnoreError(); // TODO(pwbug/387): Handle Status properly
- }
-}
-
Status Server::ProcessPacket(std::span<const byte> data,
ChannelOutput& interface) {
- Packet packet;
- if (!DecodePacket(interface, data, packet)) {
- return Status::DataLoss();
+ internal::Call* call;
+ Result<Packet> result = Endpoint::ProcessPacket(data, Packet::kServer, call);
+
+ if (!result.ok()) {
+ return result.status();
}
- if (packet.destination() != Packet::kServer) {
- return Status::InvalidArgument();
- }
+ Packet& packet = *result;
- internal::Channel* channel = FindChannel(packet.channel_id());
+ internal::Channel* channel = GetInternalChannel(packet.channel_id());
if (channel == nullptr) {
// If the requested channel doesn't exist, try to dynamically assign one.
channel = AssignChannel(packet.channel_id(), interface);
@@ -108,39 +70,30 @@
return OkStatus(); // OK since the packet was handled.
}
- // Find an existing reader/writer for this RPC, if any.
- auto responder =
- std::find_if(responders_.begin(), responders_.end(), [&](auto& w) {
- return packet.channel_id() == w.channel_id() &&
- packet.service_id() == w.service_id() &&
- packet.method_id() == w.method_id();
- });
-
switch (packet.type()) {
case PacketType::REQUEST: {
// If the REQUEST is for an ongoing RPC, cancel it, then call it again.
- if (responder != responders_.end()) {
- responder->HandleError(Status::Cancelled());
+ if (call != nullptr) {
+ call->HandleError(Status::Cancelled());
}
- internal::CallContext call(
- static_cast<internal::Server&>(*this), *channel, *service, *method);
- method->Invoke(call, packet);
+ internal::CallContext context(*this, *channel, *service, *method);
+ method->Invoke(context, packet);
break;
}
case PacketType::CLIENT_STREAM:
- HandleClientStreamPacket(packet, *channel, responder);
+ HandleClientStreamPacket(packet, *channel, call);
break;
case PacketType::CLIENT_ERROR:
- if (responder != responders_.end()) {
- responder->HandleError(packet.status());
+ if (call != nullptr) {
+ call->HandleError(packet.status());
}
break;
case PacketType::CANCEL:
- HandleCancelPacket(packet, *channel, responder);
+ HandleCancelPacket(packet, *channel, call);
break;
case PacketType::CLIENT_STREAM_END:
- HandleClientStreamPacket(packet, *channel, responder);
+ HandleClientStreamPacket(packet, *channel, call);
break;
default:
channel->Send(Packet::ServerError(packet, Status::Unimplemented()))
@@ -167,8 +120,8 @@
void Server::HandleClientStreamPacket(const internal::Packet& packet,
internal::Channel& channel,
- ResponderIterator responder) const {
- if (responder == responders_.end()) {
+ internal::Call* call) const {
+ if (call == nullptr) {
PW_LOG_DEBUG(
"Received client stream packet for method that is not pending");
channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
@@ -176,55 +129,35 @@
return;
}
- if (!responder->has_client_stream()) {
+ if (!call->has_client_stream()) {
channel.Send(Packet::ServerError(packet, Status::InvalidArgument()))
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
return;
}
- if (!responder->client_stream_open()) {
+ if (!call->client_stream_open()) {
channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
return;
}
if (packet.type() == PacketType::CLIENT_STREAM) {
- responder->HandleClientStream(packet.payload());
+ call->HandleClientStream(packet.payload());
} else { // Handle PacketType::CLIENT_STREAM_END.
- responder->EndClientStream();
+ call->EndClientStream();
}
}
void Server::HandleCancelPacket(const Packet& packet,
internal::Channel& channel,
- ResponderIterator responder) const {
- if (responder == responders_.end()) {
+ internal::Call* call) const {
+ if (call == nullptr) {
channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
PW_LOG_DEBUG("Received CANCEL packet for method that is not pending");
} else {
- responder->HandleError(Status::Cancelled());
+ call->HandleError(Status::Cancelled());
}
}
-internal::Channel* Server::FindChannel(uint32_t id) const {
- for (internal::Channel& c : channels_) {
- if (c.id() == id) {
- return &c;
- }
- }
- return nullptr;
-}
-
-internal::Channel* Server::AssignChannel(uint32_t id,
- ChannelOutput& interface) {
- internal::Channel* channel = FindChannel(Channel::kUnassignedChannelId);
- if (channel == nullptr) {
- return nullptr;
- }
-
- *channel = internal::Channel(id, &interface);
- return channel;
-}
-
} // namespace pw::rpc
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index 8f68de2..dc23e59 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -143,22 +143,20 @@
EXPECT_EQ(output_.packet_count(), 0u);
}
-TEST_F(BasicServer, ProcessPacket_NoService_SendsDataLoss) {
+TEST_F(BasicServer, ProcessPacket_NoService_SendsNothing) {
EXPECT_EQ(Status::DataLoss(),
server_.ProcessPacket(EncodeRequest(PacketType::REQUEST, 1, 0, 101),
output_));
- EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
- EXPECT_EQ(output_.sent_packet().status(), Status::DataLoss());
+ EXPECT_EQ(output_.packet_count(), 0u);
}
-TEST_F(BasicServer, ProcessPacket_NoMethod_SendsDataLoss) {
+TEST_F(BasicServer, ProcessPacket_NoMethod_SendsNothing) {
EXPECT_EQ(Status::DataLoss(),
server_.ProcessPacket(EncodeRequest(PacketType::REQUEST, 1, 42, 0),
output_));
- EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
- EXPECT_EQ(output_.sent_packet().status(), Status::DataLoss());
+ EXPECT_EQ(output_.packet_count(), 0u);
}
TEST_F(BasicServer, ProcessPacket_InvalidMethod_NothingIsInvoked) {
@@ -258,7 +256,7 @@
class BidiMethod : public BasicServer {
protected:
BidiMethod()
- : call_(static_cast<internal::Server&>(server_),
+ : call_(server_,
static_cast<internal::Channel&>(channels_[0]),
service_,
service_.method(100)),
@@ -408,7 +406,7 @@
class ServerStreamingMethod : public BasicServer {
protected:
ServerStreamingMethod()
- : call_(static_cast<internal::Server&>(server_),
+ : call_(server_,
static_cast<internal::Channel&>(channels_[0]),
service_,
service_.method(100)),