pw_rpc: Support abandoning client calls; expand docs
- Add the Abandon() method to client calls. Calling Abandon() is
equivalent to letting a client call go out of scope. The call is
closed locally, but the call is not cancelled on the server. The Java
client already provides similar funtionality.
- Write unit tests for Abandon().
- Document the client call API.
Change-Id: Ib3b859813427e4591edc07223e8dd7a5162a5ac0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126916
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 636ad95..85fa97a 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -991,6 +991,74 @@
- ``(Raw|Nanopb|Pwpb)ServerReaderWriter``
- ``(Raw|Nanopb|Pwpb)ClientReaderWriter``
+Client call API
+---------------
+Client call objects provide a few common methods.
+
+.. cpp:class:: pw::rpc::ClientCallType
+
+ The ``ClientCallType`` will be one of the following types:
+
+ - ``(Raw|Nanopb|Pwpb)UnaryReceiver`` for unary
+ - ``(Raw|Nanopb|Pwpb)ClientReader`` for server streaming
+ - ``(Raw|Nanopb|Pwpb)ClientWriter`` for client streaming
+ - ``(Raw|Nanopb|Pwpb)ClientReaderWriter`` for bidirectional streaming
+
+ .. cpp:function:: bool active() const
+
+ Returns true if the call is active.
+
+ .. cpp:function:: uint32_t channel_id() const
+
+ Returns the channel ID of this call, which is 0 if the call is inactive.
+
+ .. cpp:function:: uint32_t id() const
+
+ Returns the call ID, a unique identifier for this call.
+
+ .. cpp:function:: void Write(RequestType)
+
+ Only available on client and bidirectional streaming calls. Sends a stream
+ request. Returns:
+
+ - ``OK`` - the request was successfully sent
+ - ``FAILED_PRECONDITION`` - the writer is closed
+ - ``INTERNAL`` - pw_rpc was unable to encode message; does not apply to raw
+ calls
+ - other errors - the :cpp:class:`ChannelOutput` failed to send the packet;
+ the error codes are determined by the :cpp:class:`ChannelOutput`
+ implementation
+
+ .. cpp:function:: pw::Status CloseClientStream()
+
+ Only available on client and bidirectional streaming calls. Notifies the
+ server that no further client stream messages will be sent.
+
+ .. cpp:function:: pw::Status Cancel()
+
+ Cancels this RPC. Closes the call and sends a ``CANCELLED`` error to the
+ server. Return statuses are the same as :cpp:func:`Write`.
+
+ .. cpp:function:: void Abandon()
+
+ Closes this RPC locally. Sends a ``CLIENT_STREAM_END``, but no cancellation
+ packet. Future packets for this RPC are dropped, and the client sends a
+ ``FAILED_PRECONDITION`` error in response because the call is not active.
+
+ .. cpp:function:: void set_on_completed(pw::Function<void(ResponseTypeIfUnaryOnly, pw::Status)>)
+
+ Sets the callback that is called when the RPC completes normally. The
+ signature depends on whether the call has a unary or stream response.
+
+ .. cpp:function:: void set_on_error(pw::Function<void(pw::Status)>)
+
+ Sets the callback that is called when the RPC is terminated due to an error.
+
+ .. cpp:function:: void set_on_next(pw::Function<void(ResponseType)>)
+
+ Only available on server and bidirectional streaming calls. Sets the callback
+ that is called for each stream response.
+
Callbacks
---------
The C++ call objects allow users to set callbacks that are invoked when RPC
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index a5d3cfa..8eb7afd 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -249,8 +249,17 @@
&request);
}
+ // Notifies the server that no further client stream messages will be sent.
+ using internal::ClientCall::CloseClientStream;
+
+ // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+ // the server.
using internal::Call::Cancel;
- using internal::Call::CloseClientStream;
+
+ // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+ // packet. Future packets for this RPC are dropped, and the client sends a
+ // FAILED_PRECONDITION error in response because the call is not active.
+ using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
using internal::Call::set_on_error;
@@ -297,6 +306,7 @@
using internal::StreamResponseClientCall::set_on_completed;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
private:
friend class internal::NanopbStreamResponseClientCall<Response>;
@@ -342,6 +352,7 @@
using internal::Call::Cancel;
using internal::Call::CloseClientStream;
+ using internal::ClientCall::Abandon;
private:
friend class internal::NanopbUnaryResponseClientCall<Response>;
@@ -381,6 +392,7 @@
using internal::Call::set_on_error;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
private:
friend class internal::NanopbUnaryResponseClientCall<Response>;
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index 48af2d2..3efe98e 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -26,11 +26,7 @@
// A Call object, as used by an RPC client.
class ClientCall : public Call {
public:
- ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) {
- rpc_lock().lock();
- CloseClientCall();
- rpc_lock().unlock();
- }
+ ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { Abandon(); }
uint32_t id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
@@ -57,6 +53,13 @@
CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
: Call(client, channel_id, service_id, method_id, properties) {}
+ // Public function that closes a call client-side without cancelling it on the
+ // server.
+ void Abandon() PW_LOCKS_EXCLUDED(rpc_lock()) {
+ LockGuard lock(rpc_lock());
+ CloseClientCall();
+ }
+
// Sends CLIENT_STREAM_END if applicable and marks the call as closed.
void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index a03bd46..0d39ecd 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -305,8 +305,17 @@
request);
}
+ // Notifies the server that no further client stream messages will be sent.
+ using internal::ClientCall::CloseClientStream;
+
+ // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+ // the server.
using internal::Call::Cancel;
- using internal::Call::CloseClientStream;
+
+ // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+ // packet. Future packets for this RPC are dropped, and the client sends a
+ // FAILED_PRECONDITION error in response because the call is not active.
+ using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
@@ -351,6 +360,7 @@
using internal::StreamResponseClientCall::channel_id;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
@@ -409,6 +419,7 @@
using internal::Call::Cancel;
using internal::Call::CloseClientStream;
+ using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
@@ -457,6 +468,7 @@
using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
private:
friend class internal::PwpbUnaryResponseClientCall<Response>;
diff --git a/pw_rpc/raw/client_reader_writer_test.cc b/pw_rpc/raw/client_reader_writer_test.cc
index 68ff35a..7b4184f 100644
--- a/pw_rpc/raw/client_reader_writer_test.cc
+++ b/pw_rpc/raw/client_reader_writer_test.cc
@@ -82,7 +82,7 @@
call.set_on_error([](Status) {});
}
-TEST(RawUnaryReceiver, Closed) {
+TEST(RawUnaryReceiver, Cancel) {
RawClientTestContext ctx;
RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
ctx.channel().id(),
@@ -98,9 +98,11 @@
call.set_on_completed([](ConstByteSpan, Status) {});
call.set_on_error([](Status) {});
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
}
-TEST(RawClientWriter, Closed) {
+TEST(RawClientWriter, Cancel) {
RawClientTestContext ctx;
RawClientWriter call = TestService::TestClientStreamRpc(
ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
@@ -115,9 +117,11 @@
call.set_on_completed([](ConstByteSpan, Status) {});
call.set_on_error([](Status) {});
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
}
-TEST(RawClientReader, Closed) {
+TEST(RawClientReader, Cancel) {
RawClientTestContext ctx;
RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
ctx.channel().id(),
@@ -135,9 +139,11 @@
call.set_on_completed([](Status) {});
call.set_on_next([](ConstByteSpan) {});
call.set_on_error([](Status) {});
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
}
-TEST(RawClientReaderWriter, Closed) {
+TEST(RawClientReaderWriter, Cancel) {
RawClientTestContext ctx;
RawClientReaderWriter call =
TestService::TestBidirectionalStreamRpc(ctx.client(),
@@ -157,6 +163,79 @@
call.set_on_completed([](Status) {});
call.set_on_next([](ConstByteSpan) {});
call.set_on_error([](Status) {});
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
+}
+
+TEST(RawUnaryReceiver, Abandon) {
+ RawClientTestContext ctx;
+ RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
+ ctx.channel().id(),
+ {},
+ FailIfOnCompletedCalled,
+ FailIfCalled);
+ call.Abandon();
+
+ ASSERT_FALSE(call.active());
+ EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+ EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+
+ EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
+}
+
+TEST(RawClientWriter, Abandon) {
+ RawClientTestContext ctx;
+ RawClientWriter call = TestService::TestClientStreamRpc(
+ ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
+ call.Abandon();
+
+ ASSERT_FALSE(call.active());
+ EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+ EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
+ EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+ EXPECT_EQ(Status::FailedPrecondition(), call.CloseClientStream());
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
+}
+
+TEST(RawClientReader, Abandon) {
+ RawClientTestContext ctx;
+ RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
+ ctx.channel().id(),
+ {},
+ FailIfOnNextCalled,
+ FailIfCalled,
+ FailIfCalled);
+ call.Abandon();
+
+ ASSERT_FALSE(call.active());
+ EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+ EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+
+ EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
+}
+
+TEST(RawClientReaderWriter, Abandon) {
+ RawClientTestContext ctx;
+ RawClientReaderWriter call =
+ TestService::TestBidirectionalStreamRpc(ctx.client(),
+ ctx.channel().id(),
+ FailIfOnNextCalled,
+ FailIfCalled,
+ FailIfCalled);
+ call.Abandon();
+
+ ASSERT_FALSE(call.active());
+ EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+ EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
+ EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+ EXPECT_EQ(Status::FailedPrecondition(), call.CloseClientStream());
+
+ EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
}
TEST(RawClientReaderWriter, Move_InactiveToActive_EndsClientStream) {
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
index 429bf6d..2d78027 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
@@ -49,11 +49,17 @@
using internal::Call::Write;
// Notifies the server that no further client stream messages will be sent.
- using internal::Call::CloseClientStream;
+ using internal::ClientCall::CloseClientStream;
- // Cancels this RPC.
+ // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+ // the server.
using internal::Call::Cancel;
+ // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+ // packet. Future packets for this RPC are dropped, and the client sends a
+ // FAILED_PRECONDITION error in response because the call is not active.
+ using internal::ClientCall::Abandon;
+
// Allow use as a generic RPC Writer.
using internal::Call::operator Writer&;
using internal::Call::operator const Writer&;
@@ -90,6 +96,7 @@
using internal::StreamResponseClientCall::set_on_next;
using internal::Call::Cancel;
+ using internal::ClientCall::Abandon;
private:
friend class internal::StreamResponseClientCall;
@@ -123,6 +130,7 @@
using internal::Call::Cancel;
using internal::Call::CloseClientStream;
using internal::Call::Write;
+ using internal::ClientCall::Abandon;
// Allow use as a generic RPC Writer.
using internal::Call::operator Writer&;
@@ -157,6 +165,7 @@
using internal::UnaryResponseClientCall::set_on_completed;
using internal::UnaryResponseClientCall::set_on_error;
+ using internal::ClientCall::Abandon;
using internal::UnaryResponseClientCall::Cancel;
private: