pw_rpc: Support abandoning client calls; expand docs

- Add the Abandon() method to client calls. Calling Abandon() is
  equivalent to letting a client call go out of scope. The call is
  closed locally, but the call is not cancelled on the server. The Java
  client already provides similar funtionality.
- Write unit tests for Abandon().
- Document the client call API.

Change-Id: Ib3b859813427e4591edc07223e8dd7a5162a5ac0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126916
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 636ad95..85fa97a 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -991,6 +991,74 @@
     - ``(Raw|Nanopb|Pwpb)ServerReaderWriter``
     - ``(Raw|Nanopb|Pwpb)ClientReaderWriter``
 
+Client call API
+---------------
+Client call objects provide a few common methods.
+
+.. cpp:class:: pw::rpc::ClientCallType
+
+  The ``ClientCallType`` will be one of the following types:
+
+  - ``(Raw|Nanopb|Pwpb)UnaryReceiver`` for unary
+  - ``(Raw|Nanopb|Pwpb)ClientReader`` for server streaming
+  - ``(Raw|Nanopb|Pwpb)ClientWriter`` for client streaming
+  - ``(Raw|Nanopb|Pwpb)ClientReaderWriter`` for bidirectional streaming
+
+  .. cpp:function:: bool active() const
+
+    Returns true if the call is active.
+
+  .. cpp:function:: uint32_t channel_id() const
+
+    Returns the channel ID of this call, which is 0 if the call is inactive.
+
+  .. cpp:function:: uint32_t id() const
+
+    Returns the call ID, a unique identifier for this call.
+
+  .. cpp:function:: void Write(RequestType)
+
+    Only available on client and bidirectional streaming calls. Sends a stream
+    request. Returns:
+
+    - ``OK`` - the request was successfully sent
+    - ``FAILED_PRECONDITION`` - the writer is closed
+    - ``INTERNAL`` - pw_rpc was unable to encode message; does not apply to raw
+      calls
+    - other errors - the :cpp:class:`ChannelOutput` failed to send the packet;
+      the error codes are determined by the :cpp:class:`ChannelOutput`
+      implementation
+
+  .. cpp:function:: pw::Status CloseClientStream()
+
+    Only available on client and bidirectional streaming calls. Notifies the
+    server that no further client stream messages will be sent.
+
+  .. cpp:function:: pw::Status Cancel()
+
+    Cancels this RPC. Closes the call and sends a ``CANCELLED`` error to the
+    server. Return statuses are the same as :cpp:func:`Write`.
+
+  .. cpp:function:: void Abandon()
+
+    Closes this RPC locally. Sends a ``CLIENT_STREAM_END``, but no cancellation
+    packet. Future packets for this RPC are dropped, and the client sends a
+    ``FAILED_PRECONDITION`` error in response because the call is not active.
+
+  .. cpp:function:: void set_on_completed(pw::Function<void(ResponseTypeIfUnaryOnly, pw::Status)>)
+
+    Sets the callback that is called when the RPC completes normally. The
+    signature depends on whether the call has a unary or stream response.
+
+  .. cpp:function:: void set_on_error(pw::Function<void(pw::Status)>)
+
+    Sets the callback that is called when the RPC is terminated due to an error.
+
+  .. cpp:function:: void set_on_next(pw::Function<void(ResponseType)>)
+
+    Only available on server and bidirectional streaming calls. Sets the callback
+    that is called for each stream response.
+
 Callbacks
 ---------
 The C++ call objects allow users to set callbacks that are invoked when RPC
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index a5d3cfa..8eb7afd 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -249,8 +249,17 @@
         &request);
   }
 
+  // Notifies the server that no further client stream messages will be sent.
+  using internal::ClientCall::CloseClientStream;
+
+  // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+  // the server.
   using internal::Call::Cancel;
-  using internal::Call::CloseClientStream;
+
+  // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+  // packet. Future packets for this RPC are dropped, and the client sends a
+  // FAILED_PRECONDITION error in response because the call is not active.
+  using internal::ClientCall::Abandon;
 
   // Functions for setting RPC event callbacks.
   using internal::Call::set_on_error;
@@ -297,6 +306,7 @@
   using internal::StreamResponseClientCall::set_on_completed;
 
   using internal::Call::Cancel;
+  using internal::ClientCall::Abandon;
 
  private:
   friend class internal::NanopbStreamResponseClientCall<Response>;
@@ -342,6 +352,7 @@
 
   using internal::Call::Cancel;
   using internal::Call::CloseClientStream;
+  using internal::ClientCall::Abandon;
 
  private:
   friend class internal::NanopbUnaryResponseClientCall<Response>;
@@ -381,6 +392,7 @@
   using internal::Call::set_on_error;
 
   using internal::Call::Cancel;
+  using internal::ClientCall::Abandon;
 
  private:
   friend class internal::NanopbUnaryResponseClientCall<Response>;
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index 48af2d2..3efe98e 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -26,11 +26,7 @@
 // A Call object, as used by an RPC client.
 class ClientCall : public Call {
  public:
-  ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) {
-    rpc_lock().lock();
-    CloseClientCall();
-    rpc_lock().unlock();
-  }
+  ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { Abandon(); }
 
   uint32_t id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
     LockGuard lock(rpc_lock());
@@ -57,6 +53,13 @@
              CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
       : Call(client, channel_id, service_id, method_id, properties) {}
 
+  // Public function that closes a call client-side without cancelling it on the
+  // server.
+  void Abandon() PW_LOCKS_EXCLUDED(rpc_lock()) {
+    LockGuard lock(rpc_lock());
+    CloseClientCall();
+  }
+
   // Sends CLIENT_STREAM_END if applicable and marks the call as closed.
   void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index a03bd46..0d39ecd 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -305,8 +305,17 @@
         request);
   }
 
+  // Notifies the server that no further client stream messages will be sent.
+  using internal::ClientCall::CloseClientStream;
+
+  // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+  // the server.
   using internal::Call::Cancel;
-  using internal::Call::CloseClientStream;
+
+  // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+  // packet. Future packets for this RPC are dropped, and the client sends a
+  // FAILED_PRECONDITION error in response because the call is not active.
+  using internal::ClientCall::Abandon;
 
   // Functions for setting RPC event callbacks.
   using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
@@ -351,6 +360,7 @@
   using internal::StreamResponseClientCall::channel_id;
 
   using internal::Call::Cancel;
+  using internal::ClientCall::Abandon;
 
   // Functions for setting RPC event callbacks.
   using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
@@ -409,6 +419,7 @@
 
   using internal::Call::Cancel;
   using internal::Call::CloseClientStream;
+  using internal::ClientCall::Abandon;
 
   // Functions for setting RPC event callbacks.
   using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
@@ -457,6 +468,7 @@
   using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
 
   using internal::Call::Cancel;
+  using internal::ClientCall::Abandon;
 
  private:
   friend class internal::PwpbUnaryResponseClientCall<Response>;
diff --git a/pw_rpc/raw/client_reader_writer_test.cc b/pw_rpc/raw/client_reader_writer_test.cc
index 68ff35a..7b4184f 100644
--- a/pw_rpc/raw/client_reader_writer_test.cc
+++ b/pw_rpc/raw/client_reader_writer_test.cc
@@ -82,7 +82,7 @@
   call.set_on_error([](Status) {});
 }
 
-TEST(RawUnaryReceiver, Closed) {
+TEST(RawUnaryReceiver, Cancel) {
   RawClientTestContext ctx;
   RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
                                                     ctx.channel().id(),
@@ -98,9 +98,11 @@
 
   call.set_on_completed([](ConstByteSpan, Status) {});
   call.set_on_error([](Status) {});
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
 }
 
-TEST(RawClientWriter, Closed) {
+TEST(RawClientWriter, Cancel) {
   RawClientTestContext ctx;
   RawClientWriter call = TestService::TestClientStreamRpc(
       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
@@ -115,9 +117,11 @@
 
   call.set_on_completed([](ConstByteSpan, Status) {});
   call.set_on_error([](Status) {});
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
 }
 
-TEST(RawClientReader, Closed) {
+TEST(RawClientReader, Cancel) {
   RawClientTestContext ctx;
   RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
                                                           ctx.channel().id(),
@@ -135,9 +139,11 @@
   call.set_on_completed([](Status) {});
   call.set_on_next([](ConstByteSpan) {});
   call.set_on_error([](Status) {});
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
 }
 
-TEST(RawClientReaderWriter, Closed) {
+TEST(RawClientReaderWriter, Cancel) {
   RawClientTestContext ctx;
   RawClientReaderWriter call =
       TestService::TestBidirectionalStreamRpc(ctx.client(),
@@ -157,6 +163,79 @@
   call.set_on_completed([](Status) {});
   call.set_on_next([](ConstByteSpan) {});
   call.set_on_error([](Status) {});
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
+}
+
+TEST(RawUnaryReceiver, Abandon) {
+  RawClientTestContext ctx;
+  RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
+                                                    ctx.channel().id(),
+                                                    {},
+                                                    FailIfOnCompletedCalled,
+                                                    FailIfCalled);
+  call.Abandon();
+
+  ASSERT_FALSE(call.active());
+  EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+  EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+
+  EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
+}
+
+TEST(RawClientWriter, Abandon) {
+  RawClientTestContext ctx;
+  RawClientWriter call = TestService::TestClientStreamRpc(
+      ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
+  call.Abandon();
+
+  ASSERT_FALSE(call.active());
+  EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+  EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
+  EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+  EXPECT_EQ(Status::FailedPrecondition(), call.CloseClientStream());
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
+}
+
+TEST(RawClientReader, Abandon) {
+  RawClientTestContext ctx;
+  RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
+                                                          ctx.channel().id(),
+                                                          {},
+                                                          FailIfOnNextCalled,
+                                                          FailIfCalled,
+                                                          FailIfCalled);
+  call.Abandon();
+
+  ASSERT_FALSE(call.active());
+  EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+  EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+
+  EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
+}
+
+TEST(RawClientReaderWriter, Abandon) {
+  RawClientTestContext ctx;
+  RawClientReaderWriter call =
+      TestService::TestBidirectionalStreamRpc(ctx.client(),
+                                              ctx.channel().id(),
+                                              FailIfOnNextCalled,
+                                              FailIfCalled,
+                                              FailIfCalled);
+  call.Abandon();
+
+  ASSERT_FALSE(call.active());
+  EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
+
+  EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
+  EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
+  EXPECT_EQ(Status::FailedPrecondition(), call.CloseClientStream());
+
+  EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
 }
 
 TEST(RawClientReaderWriter, Move_InactiveToActive_EndsClientStream) {
diff --git a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
index 429bf6d..2d78027 100644
--- a/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
+++ b/pw_rpc/raw/public/pw_rpc/raw/client_reader_writer.h
@@ -49,11 +49,17 @@
   using internal::Call::Write;
 
   // Notifies the server that no further client stream messages will be sent.
-  using internal::Call::CloseClientStream;
+  using internal::ClientCall::CloseClientStream;
 
-  // Cancels this RPC.
+  // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
+  // the server.
   using internal::Call::Cancel;
 
+  // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
+  // packet. Future packets for this RPC are dropped, and the client sends a
+  // FAILED_PRECONDITION error in response because the call is not active.
+  using internal::ClientCall::Abandon;
+
   // Allow use as a generic RPC Writer.
   using internal::Call::operator Writer&;
   using internal::Call::operator const Writer&;
@@ -90,6 +96,7 @@
   using internal::StreamResponseClientCall::set_on_next;
 
   using internal::Call::Cancel;
+  using internal::ClientCall::Abandon;
 
  private:
   friend class internal::StreamResponseClientCall;
@@ -123,6 +130,7 @@
   using internal::Call::Cancel;
   using internal::Call::CloseClientStream;
   using internal::Call::Write;
+  using internal::ClientCall::Abandon;
 
   // Allow use as a generic RPC Writer.
   using internal::Call::operator Writer&;
@@ -157,6 +165,7 @@
   using internal::UnaryResponseClientCall::set_on_completed;
   using internal::UnaryResponseClientCall::set_on_error;
 
+  using internal::ClientCall::Abandon;
   using internal::UnaryResponseClientCall::Cancel;
 
  private: