pw_rpc: Python client support for keeping RPCs open

- Support opening an RPC without calling it in the core pw_rpc client.
- Provide a keep_open option so the client keep RPCs going even if they
  are completed.

Change-Id: Idec8247f9ee0b432c0bfb5dbcf1d08ae1e27daea
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/39823
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/py/docs.rst b/pw_rpc/py/docs.rst
index 002e260..9d075ee 100644
--- a/pw_rpc/py/docs.rst
+++ b/pw_rpc/py/docs.rst
@@ -12,6 +12,11 @@
 .. automodule:: pw_rpc.client
   :members: Client, ClientImpl
 
+pw_rpc.callback_client
+======================
+.. autoclass:: pw_rpc.callback_client.Impl
+  :members:
+
 pw_rpc.console_tools
 ====================
 .. automodule:: pw_rpc.console_tools
diff --git a/pw_rpc/py/pw_rpc/callback_client.py b/pw_rpc/py/pw_rpc/callback_client.py
index 0a7127c..ea7fad8 100644
--- a/pw_rpc/py/pw_rpc/callback_client.py
+++ b/pw_rpc/py/pw_rpc/callback_client.py
@@ -115,12 +115,15 @@
                response: ResponseCallback = _default_response,
                completion: CompletionCallback = _default_completion,
                error: ErrorCallback = _default_error,
-               override_pending: bool = True) -> '_AsyncCall':
+               *,
+               override_pending: bool = True,
+               keep_open: bool = False) -> '_AsyncCall':
         """Invokes an RPC with callbacks."""
         self._rpcs.send_request(self._rpc,
                                 request,
                                 _Callbacks(response, completion, error),
-                                override_pending=override_pending)
+                                override_pending=override_pending,
+                                keep_open=keep_open)
         return _AsyncCall(self._rpcs, self._rpc)
 
     def __repr__(self) -> str:
@@ -391,7 +394,9 @@
                response: ResponseCallback = _default_response,
                completion: CompletionCallback = _default_completion,
                error: ErrorCallback = _default_error,
-               override_pending: bool = True) -> _AsyncCall:
+               *,
+               override_pending: bool = True,
+               keep_open: bool = False) -> _AsyncCall:
         raise NotImplementedError
 
 
@@ -404,7 +409,9 @@
                response: ResponseCallback = _default_response,
                completion: CompletionCallback = _default_completion,
                error: ErrorCallback = _default_error,
-               override_pending: bool = True) -> _AsyncCall:
+               *,
+               override_pending: bool = True,
+               keep_open: bool = False) -> _AsyncCall:
         raise NotImplementedError
 
 
diff --git a/pw_rpc/py/pw_rpc/client.py b/pw_rpc/py/pw_rpc/client.py
index 11d8192..229778c 100644
--- a/pw_rpc/py/pw_rpc/client.py
+++ b/pw_rpc/py/pw_rpc/client.py
@@ -16,8 +16,8 @@
 import abc
 from dataclasses import dataclass
 import logging
-from typing import (Any, Collection, Dict, Iterable, Iterator, List,
-                    NamedTuple, Optional)
+from typing import (Any, Collection, Dict, Iterable, Iterator, NamedTuple,
+                    Optional)
 
 from google.protobuf.message import DecodeError
 from pw_status import Status
@@ -43,27 +43,26 @@
         return f'PendingRpc(channel={self.channel.id}, method={self.method})'
 
 
+class _PendingRpcMetadata:
+    def __init__(self, context: Any, keep_open: bool):
+        self.context = context
+        self.keep_open = keep_open
+
+
 class PendingRpcs:
     """Tracks pending RPCs and encodes outgoing RPC packets."""
     def __init__(self):
-        self._pending: Dict[PendingRpc, List] = {}
+        self._pending: Dict[PendingRpc, _PendingRpcMetadata] = {}
 
     def request(self,
                 rpc: PendingRpc,
                 request,
                 context,
-                override_pending: bool = False) -> bytes:
+                override_pending: bool = True,
+                keep_open: bool = False) -> bytes:
         """Starts the provided RPC and returns the encoded packet to send."""
         # Ensure that every context is a unique object by wrapping it in a list.
-        unique_ctx = [context]
-
-        if override_pending:
-            self._pending[rpc] = unique_ctx
-        elif self._pending.setdefault(rpc, unique_ctx) is not unique_ctx:
-            # If the context was not added, the RPC was already pending.
-            raise Error(f'Sent request for {rpc}, but it is already pending! '
-                        'Cancel the RPC before invoking it again')
-
+        self.open(rpc, context, override_pending, keep_open)
         _LOG.debug('Starting %s', rpc)
         return packets.encode_request(rpc, request)
 
@@ -71,12 +70,33 @@
                      rpc: PendingRpc,
                      request,
                      context,
-                     override_pending: bool = False) -> None:
+                     override_pending: bool = False,
+                     keep_open: bool = False) -> None:
         """Calls request and sends the resulting packet to the channel."""
         # TODO(hepler): Remove `type: ignore` on this and similar lines when
         #     https://github.com/python/mypy/issues/5485 is fixed
         rpc.channel.output(  # type: ignore
-            self.request(rpc, request, context, override_pending))
+            self.request(rpc, request, context, override_pending, keep_open))
+
+    def open(self,
+             rpc: PendingRpc,
+             context,
+             override_pending: bool = False,
+             keep_open: bool = False) -> None:
+        """Creates a context for an RPC, but does not invoke it.
+
+        open() can be used to receive streaming responses to an RPC that was not
+        invoked by this client. For example, a server may stream logs with a
+        server streaming RPC prior to any clients invoking it.
+        """
+        metadata = _PendingRpcMetadata(context, keep_open)
+
+        if override_pending:
+            self._pending[rpc] = metadata
+        elif self._pending.setdefault(rpc, metadata) is not metadata:
+            # If the context was not added, the RPC was already pending.
+            raise Error(f'Sent request for {rpc}, but it is already pending! '
+                        'Cancel the RPC before invoking it again')
 
     def cancel(self, rpc: PendingRpc) -> Optional[bytes]:
         """Cancels the RPC. Returns the CANCEL packet to send.
@@ -110,10 +130,14 @@
     def get_pending(self, rpc: PendingRpc, status: Optional[Status]):
         """Gets the pending RPC's context. If status is set, clears the RPC."""
         if status is None:
-            return self._pending[rpc][0]  # Unwrap the context from the list
+            return self._pending[rpc].context
 
-        _LOG.debug('Finishing %s with status %s', rpc, status)
-        return self._pending.pop(rpc)[0]
+        if self._pending[rpc].keep_open:
+            _LOG.debug('%s finished with status %s; keeping open', rpc, status)
+            return self._pending[rpc].context
+
+        _LOG.debug('%s finished with status %s', rpc, status)
+        return self._pending.pop(rpc).context
 
 
 class ClientImpl(abc.ABC):
diff --git a/pw_rpc/py/tests/callback_client_test.py b/pw_rpc/py/tests/callback_client_test.py
index 9f9d3e6..ed7ec9c 100755
--- a/pw_rpc/py/tests/callback_client_test.py
+++ b/pw_rpc/py/tests/callback_client_test.py
@@ -167,6 +167,38 @@
             self.assertIs(Status.ABORTED, status)
             self.assertEqual('0_o', response.payload)
 
+    def test_invoke_unary_rpc_keep_open(self) -> None:
+        method = self._service.SomeUnary.method
+
+        payload_1 = method.response_type(payload='-_-')
+        payload_2 = method.response_type(payload='0_o')
+
+        self._enqueue_response(1, method, Status.ABORTED, payload_1)
+
+        replies: list = []
+        enqueue_replies = lambda _, reply: replies.append(reply)
+
+        self._service.SomeUnary.invoke(method.request_type(magic_number=6),
+                                       enqueue_replies,
+                                       enqueue_replies,
+                                       keep_open=True)
+
+        self.assertEqual([payload_1, Status.ABORTED], replies)
+
+        # Send another packet and make sure it is processed even though the RPC
+        # terminated.
+        self._client.process_packet(
+            packet_pb2.RpcPacket(
+                type=packet_pb2.PacketType.RESPONSE,
+                channel_id=1,
+                service_id=method.service.id,
+                method_id=method.id,
+                status=Status.OK.value,
+                payload=payload_2.SerializeToString()).SerializeToString())
+
+        self.assertEqual([payload_1, Status.ABORTED, payload_2, Status.OK],
+                         replies)
+
     def test_invoke_unary_rpc_with_callback(self):
         method = self._service.SomeUnary.method