pw_rpc: Support listening to responses in Python client

Add an open() call to the Python RPC client alongside invoke(). Open
ignores errors when sending the initial RPC request. This can be used to
listen for responses from an RPC that was not explicitly requested. This
is helpful when an RPC server cannot be immediately reached (e.g. the
device isn't plugged in yet).

Change-Id: Id96c840a033b814750fced8c784ea078d8f30aa8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/59640
Reviewed-by: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_rpc/py/pw_rpc/callback_client/call.py b/pw_rpc/py/pw_rpc/callback_client/call.py
index 92777d1..f758f01 100644
--- a/pw_rpc/py/pw_rpc/callback_client/call.py
+++ b/pw_rpc/py/pw_rpc/callback_client/call.py
@@ -86,11 +86,12 @@
         self.on_completed = on_completed or Call._default_completion
         self.on_error = on_error or Call._default_error
 
-    def _invoke(self, request: Optional[Message]) -> None:
+    def _invoke(self, request: Optional[Message], ignore_errors: bool) -> None:
         """Calls the RPC. This must be called immediately after __init__."""
         previous = self._rpcs.send_request(self._rpc,
                                            request,
                                            self,
+                                           ignore_errors=ignore_errors,
                                            override_pending=True)
 
         # TODO(hepler): Remove the cancel_duplicate_calls option.
diff --git a/pw_rpc/py/pw_rpc/callback_client/impl.py b/pw_rpc/py/pw_rpc/callback_client/impl.py
index 8218ae2..59a185e 100644
--- a/pw_rpc/py/pw_rpc/callback_client/impl.py
+++ b/pw_rpc/py/pw_rpc/callback_client/impl.py
@@ -97,18 +97,21 @@
             f'\n\n{textwrap.indent(docstring, "  ")}\n\n'
             f'  Returns {annotation}.')
 
-    def _start_call(self, call_type: Type[CallType],
-                    request: Optional[Message], timeout_s: OptionalTimeout,
+    def _start_call(self,
+                    call_type: Type[CallType],
+                    request: Optional[Message],
+                    timeout_s: OptionalTimeout,
                     on_next: Optional[OnNextCallback],
                     on_completed: Optional[OnCompletedCallback],
-                    on_error: Optional[OnErrorCallback]) -> CallType:
+                    on_error: Optional[OnErrorCallback],
+                    ignore_errors: bool = False) -> CallType:
         """Creates the Call object and invokes the RPC using it."""
         if timeout_s is UseDefault.VALUE:
             timeout_s = self.default_timeout_s
 
         call = call_type(self._rpcs, self._rpc, timeout_s, on_next,
                          on_completed, on_error)
-        call._invoke(request)  # pylint: disable=protected-access
+        call._invoke(request, ignore_errors)  # pylint: disable=protected-access
         return call
 
     def _client_streaming_call_type(self,
@@ -178,6 +181,18 @@
                                 self.method.get_request(request, request_args),
                                 timeout_s, on_next, on_completed, on_error)
 
+    def open(self,
+             request: Message = None,
+             on_next: OnNextCallback = None,
+             on_completed: OnCompletedCallback = None,
+             on_error: OnErrorCallback = None,
+             *,
+             request_args: Dict[str, Any] = None) -> UnaryCall:
+        """Invokes the unary RPC and returns a call object."""
+        return self._start_call(UnaryCall,
+                                self.method.get_request(request, request_args),
+                                None, on_next, on_completed, on_error, True)
+
 
 class _ServerStreamingMethodClient(_MethodClient):
     def invoke(
@@ -195,6 +210,22 @@
                                 self.method.get_request(request, request_args),
                                 timeout_s, on_next, on_completed, on_error)
 
+    def open(self,
+             request: Message = None,
+             on_next: OnNextCallback = None,
+             on_completed: OnCompletedCallback = None,
+             on_error: OnErrorCallback = None,
+             *,
+             request_args: Dict[str, Any] = None) -> ServerStreamingCall:
+        """Returns a call object for the RPC, even if the RPC cannot be invoked.
+
+        Can be used to listen for responses from an RPC server that may yet be
+        available.
+        """
+        return self._start_call(ServerStreamingCall,
+                                self.method.get_request(request, request_args),
+                                None, on_next, on_completed, on_error, True)
+
 
 class _ClientStreamingMethodClient(_MethodClient):
     def invoke(
@@ -208,7 +239,20 @@
         """Invokes the client streaming RPC and returns a call object"""
         return self._start_call(
             self._client_streaming_call_type(ClientStreamingCall), None,
-            timeout_s, on_next, on_completed, on_error)
+            timeout_s, on_next, on_completed, on_error, True)
+
+    def open(self,
+             on_next: OnNextCallback = None,
+             on_completed: OnCompletedCallback = None,
+             on_error: OnErrorCallback = None) -> ClientStreamingCall:
+        """Returns a call object for the RPC, even if the RPC cannot be invoked.
+
+        Can be used to listen for responses from an RPC server that may yet be
+        available.
+        """
+        return self._start_call(
+            self._client_streaming_call_type(ClientStreamingCall), None, None,
+            on_next, on_completed, on_error, True)
 
     def __call__(
             self,
@@ -219,7 +263,6 @@
 
 
 class _BidirectionalStreamingMethodClient(_MethodClient):
-    """Invokes the bidirectional streaming RPC and returns a call object."""
     def invoke(
         self,
         on_next: OnNextCallback = None,
@@ -228,10 +271,24 @@
         *,
         timeout_s: OptionalTimeout = UseDefault.VALUE
     ) -> BidirectionalStreamingCall:
+        """Invokes the bidirectional streaming RPC and returns a call object."""
         return self._start_call(
             self._client_streaming_call_type(BidirectionalStreamingCall), None,
             timeout_s, on_next, on_completed, on_error)
 
+    def open(self,
+             on_next: OnNextCallback = None,
+             on_completed: OnCompletedCallback = None,
+             on_error: OnErrorCallback = None) -> BidirectionalStreamingCall:
+        """Returns a call object for the RPC, even if the RPC cannot be invoked.
+
+        Can be used to listen for responses from an RPC server that may yet be
+        available.
+        """
+        return self._start_call(
+            self._client_streaming_call_type(BidirectionalStreamingCall), None,
+            None, on_next, on_completed, on_error, True)
+
     def __call__(
             self,
             requests: Iterable[Message] = (),
diff --git a/pw_rpc/py/pw_rpc/client.py b/pw_rpc/py/pw_rpc/client.py
index 2ade68f..7f115a9 100644
--- a/pw_rpc/py/pw_rpc/client.py
+++ b/pw_rpc/py/pw_rpc/client.py
@@ -26,7 +26,7 @@
 from pw_rpc.descriptors import Channel, Service, Method
 from pw_rpc.internal.packet_pb2 import PacketType, RpcPacket
 
-_LOG = logging.getLogger(__name__)
+_LOG = logging.getLogger(__package__)
 
 
 class Error(Exception):
@@ -67,6 +67,8 @@
                      rpc: PendingRpc,
                      request: Optional[Message],
                      context: object,
+                     *,
+                     ignore_errors: bool = False,
                      override_pending: bool = False) -> Any:
         """Starts the provided RPC and sends the request packet to the channel.
 
@@ -74,11 +76,17 @@
           the previous context object or None
         """
         previous = self.open(rpc, context, override_pending)
+        packet = packets.encode_request(rpc, request)
 
-        # TODO(hepler): Remove `type: ignore` on this and similar lines when
-        #     https://github.com/python/mypy/issues/5485 is fixed
-        rpc.channel.output(  # type: ignore
-            packets.encode_request(rpc, request))
+        # TODO(hepler): Remove `type: ignore[misc]` below when
+        #     https://github.com/python/mypy/issues/10711 is fixed.
+        if ignore_errors:
+            try:
+                rpc.channel.output(packet)  # type: ignore[misc]
+            except Exception as err:  # pylint: disable=broad-except
+                _LOG.debug('Ignoring exception when starting RPC: %s', err)
+        else:
+            rpc.channel.output(packet)  # type: ignore[misc]
 
         return previous
 
diff --git a/pw_rpc/py/tests/callback_client_test.py b/pw_rpc/py/tests/callback_client_test.py
index 0c2aaf0..c0a2489 100755
--- a/pw_rpc/py/tests/callback_client_test.py
+++ b/pw_rpc/py/tests/callback_client_test.py
@@ -16,7 +16,7 @@
 
 import unittest
 from unittest import mock
-from typing import Any, List, Tuple
+from typing import Any, List, Optional, Tuple
 
 from pw_protobuf_compiler import python_protos
 from pw_status import Status
@@ -72,6 +72,8 @@
         self._next_packets: List[Tuple[bytes, Status]] = []
         self.send_responses_after_packets: float = 1
 
+        self.output_exception: Optional[Exception] = None
+
     def last_request(self) -> packet_pb2.RpcPacket:
         assert self.requests
         return self.requests[-1]
@@ -127,12 +129,18 @@
              process_status))
 
     def _handle_packet(self, data: bytes) -> None:
+        if self.output_exception:
+            raise self.output_exception  # pylint: disable=raising-bad-type
+
         self.requests.append(packets.decode(data))
 
         if self.send_responses_after_packets > 1:
             self.send_responses_after_packets -= 1
             return
 
+        self._process_enqueued_packets()
+
+    def _process_enqueued_packets(self) -> None:
         # Set send_responses_after_packets to infinity to prevent potential
         # infinite recursion when a packet causes another packet to send.
         send_after_count = self.send_responses_after_packets
@@ -274,8 +282,8 @@
                                    self.method.response_type(payload='0_o'))
 
             callback = mock.Mock()
-            call = self._service.SomeUnary.invoke(
-                self._request(magic_number=5), callback, callback)
+            call = self.rpc.invoke(self._request(magic_number=5), callback,
+                                   callback)
 
             callback.assert_has_calls([
                 mock.call(call, self.method.response_type(payload='0_o')),
@@ -286,6 +294,25 @@
                 5,
                 self._sent_payload(self.method.request_type).magic_number)
 
+    def test_open(self) -> None:
+        self.output_exception = IOError('something went wrong sending!')
+
+        for _ in range(3):
+            self._enqueue_response(1, self.method, Status.ABORTED,
+                                   self.method.response_type(payload='0_o'))
+
+            callback = mock.Mock()
+            call = self.rpc.open(self._request(magic_number=5), callback,
+                                 callback)
+            self.assertEqual(self.requests, [])
+
+            self._process_enqueued_packets()
+
+            callback.assert_has_calls([
+                mock.call(call, self.method.response_type(payload='0_o')),
+                mock.call(call, Status.ABORTED)
+            ])
+
     def test_blocking_server_error(self) -> None:
         for _ in range(3):
             self._enqueue_error(1, self.method, Status.NOT_FOUND)
@@ -412,8 +439,8 @@
             self._enqueue_response(1, self.method, Status.ABORTED)
 
             callback = mock.Mock()
-            call = self._service.SomeServerStreaming.invoke(
-                self._request(magic_number=3), callback, callback)
+            call = self.rpc.invoke(self._request(magic_number=3), callback,
+                                   callback)
 
             callback.assert_has_calls([
                 mock.call(call, self.method.response_type(payload='!!!')),
@@ -425,6 +452,29 @@
                 3,
                 self._sent_payload(self.method.request_type).magic_number)
 
+    def test_open(self) -> None:
+        self.output_exception = IOError('something went wrong sending!')
+        rep1 = self.method.response_type(payload='!!!')
+        rep2 = self.method.response_type(payload='?')
+
+        for _ in range(3):
+            self._enqueue_server_stream(1, self.method, rep1)
+            self._enqueue_server_stream(1, self.method, rep2)
+            self._enqueue_response(1, self.method, Status.ABORTED)
+
+            callback = mock.Mock()
+            call = self.rpc.open(self._request(magic_number=3), callback,
+                                 callback)
+            self.assertEqual(self.requests, [])
+
+            self._process_enqueued_packets()
+
+            callback.assert_has_calls([
+                mock.call(call, self.method.response_type(payload='!!!')),
+                mock.call(call, self.method.response_type(payload='?')),
+                mock.call(call, Status.ABORTED),
+            ])
+
     def test_nonblocking_cancel(self) -> None:
         resp = self.rpc.method.response_type(payload='!!!')
         self._enqueue_server_stream(1, self.rpc.method, resp)
@@ -565,6 +615,24 @@
             self.assertIsNone(stream.error)
             self.assertEqual(payload_1, stream.response)
 
+    def test_open(self) -> None:
+        self.output_exception = IOError('something went wrong sending!')
+        payload = self.method.response_type(payload='-_-')
+
+        for _ in range(3):
+            self._enqueue_response(1, self.method, Status.OK, payload)
+
+            callback = mock.Mock()
+            call = self.rpc.open(callback, callback, callback)
+            self.assertEqual(self.requests, [])
+
+            self._process_enqueued_packets()
+
+            callback.assert_has_calls([
+                mock.call(call, payload),
+                mock.call(call, Status.OK),
+            ])
+
     def test_nonblocking_finish(self) -> None:
         """Tests a client streaming RPC ended by the client."""
         payload_1 = self.method.response_type(payload='-_-')
@@ -746,6 +814,28 @@
             self.assertIs(Status.OK, stream.status)
             self.assertIsNone(stream.error)
 
+    def test_open(self) -> None:
+        self.output_exception = IOError('something went wrong sending!')
+        rep1 = self.method.response_type(payload='!!!')
+        rep2 = self.method.response_type(payload='?')
+
+        for _ in range(3):
+            self._enqueue_server_stream(1, self.method, rep1)
+            self._enqueue_server_stream(1, self.method, rep2)
+            self._enqueue_response(1, self.method, Status.OK)
+
+            callback = mock.Mock()
+            call = self.rpc.open(callback, callback, callback)
+            self.assertEqual(self.requests, [])
+
+            self._process_enqueued_packets()
+
+            callback.assert_has_calls([
+                mock.call(call, self.method.response_type(payload='!!!')),
+                mock.call(call, self.method.response_type(payload='?')),
+                mock.call(call, Status.OK),
+            ])
+
     @mock.patch('pw_rpc.callback_client.call.Call._default_response')
     def test_nonblocking(self, callback) -> None:
         """Tests a bidirectional streaming RPC ended by the server."""