pw_rpc: Python client support for keeping RPCs open
- Support opening an RPC without calling it in the core pw_rpc client.
- Provide a keep_open option so the client keep RPCs going even if they
are completed.
Change-Id: Idec8247f9ee0b432c0bfb5dbcf1d08ae1e27daea
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/39823
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/py/docs.rst b/pw_rpc/py/docs.rst
index 002e260..9d075ee 100644
--- a/pw_rpc/py/docs.rst
+++ b/pw_rpc/py/docs.rst
@@ -12,6 +12,11 @@
.. automodule:: pw_rpc.client
:members: Client, ClientImpl
+pw_rpc.callback_client
+======================
+.. autoclass:: pw_rpc.callback_client.Impl
+ :members:
+
pw_rpc.console_tools
====================
.. automodule:: pw_rpc.console_tools
diff --git a/pw_rpc/py/pw_rpc/callback_client.py b/pw_rpc/py/pw_rpc/callback_client.py
index 0a7127c..ea7fad8 100644
--- a/pw_rpc/py/pw_rpc/callback_client.py
+++ b/pw_rpc/py/pw_rpc/callback_client.py
@@ -115,12 +115,15 @@
response: ResponseCallback = _default_response,
completion: CompletionCallback = _default_completion,
error: ErrorCallback = _default_error,
- override_pending: bool = True) -> '_AsyncCall':
+ *,
+ override_pending: bool = True,
+ keep_open: bool = False) -> '_AsyncCall':
"""Invokes an RPC with callbacks."""
self._rpcs.send_request(self._rpc,
request,
_Callbacks(response, completion, error),
- override_pending=override_pending)
+ override_pending=override_pending,
+ keep_open=keep_open)
return _AsyncCall(self._rpcs, self._rpc)
def __repr__(self) -> str:
@@ -391,7 +394,9 @@
response: ResponseCallback = _default_response,
completion: CompletionCallback = _default_completion,
error: ErrorCallback = _default_error,
- override_pending: bool = True) -> _AsyncCall:
+ *,
+ override_pending: bool = True,
+ keep_open: bool = False) -> _AsyncCall:
raise NotImplementedError
@@ -404,7 +409,9 @@
response: ResponseCallback = _default_response,
completion: CompletionCallback = _default_completion,
error: ErrorCallback = _default_error,
- override_pending: bool = True) -> _AsyncCall:
+ *,
+ override_pending: bool = True,
+ keep_open: bool = False) -> _AsyncCall:
raise NotImplementedError
diff --git a/pw_rpc/py/pw_rpc/client.py b/pw_rpc/py/pw_rpc/client.py
index 11d8192..229778c 100644
--- a/pw_rpc/py/pw_rpc/client.py
+++ b/pw_rpc/py/pw_rpc/client.py
@@ -16,8 +16,8 @@
import abc
from dataclasses import dataclass
import logging
-from typing import (Any, Collection, Dict, Iterable, Iterator, List,
- NamedTuple, Optional)
+from typing import (Any, Collection, Dict, Iterable, Iterator, NamedTuple,
+ Optional)
from google.protobuf.message import DecodeError
from pw_status import Status
@@ -43,27 +43,26 @@
return f'PendingRpc(channel={self.channel.id}, method={self.method})'
+class _PendingRpcMetadata:
+ def __init__(self, context: Any, keep_open: bool):
+ self.context = context
+ self.keep_open = keep_open
+
+
class PendingRpcs:
"""Tracks pending RPCs and encodes outgoing RPC packets."""
def __init__(self):
- self._pending: Dict[PendingRpc, List] = {}
+ self._pending: Dict[PendingRpc, _PendingRpcMetadata] = {}
def request(self,
rpc: PendingRpc,
request,
context,
- override_pending: bool = False) -> bytes:
+ override_pending: bool = True,
+ keep_open: bool = False) -> bytes:
"""Starts the provided RPC and returns the encoded packet to send."""
# Ensure that every context is a unique object by wrapping it in a list.
- unique_ctx = [context]
-
- if override_pending:
- self._pending[rpc] = unique_ctx
- elif self._pending.setdefault(rpc, unique_ctx) is not unique_ctx:
- # If the context was not added, the RPC was already pending.
- raise Error(f'Sent request for {rpc}, but it is already pending! '
- 'Cancel the RPC before invoking it again')
-
+ self.open(rpc, context, override_pending, keep_open)
_LOG.debug('Starting %s', rpc)
return packets.encode_request(rpc, request)
@@ -71,12 +70,33 @@
rpc: PendingRpc,
request,
context,
- override_pending: bool = False) -> None:
+ override_pending: bool = False,
+ keep_open: bool = False) -> None:
"""Calls request and sends the resulting packet to the channel."""
# TODO(hepler): Remove `type: ignore` on this and similar lines when
# https://github.com/python/mypy/issues/5485 is fixed
rpc.channel.output( # type: ignore
- self.request(rpc, request, context, override_pending))
+ self.request(rpc, request, context, override_pending, keep_open))
+
+ def open(self,
+ rpc: PendingRpc,
+ context,
+ override_pending: bool = False,
+ keep_open: bool = False) -> None:
+ """Creates a context for an RPC, but does not invoke it.
+
+ open() can be used to receive streaming responses to an RPC that was not
+ invoked by this client. For example, a server may stream logs with a
+ server streaming RPC prior to any clients invoking it.
+ """
+ metadata = _PendingRpcMetadata(context, keep_open)
+
+ if override_pending:
+ self._pending[rpc] = metadata
+ elif self._pending.setdefault(rpc, metadata) is not metadata:
+ # If the context was not added, the RPC was already pending.
+ raise Error(f'Sent request for {rpc}, but it is already pending! '
+ 'Cancel the RPC before invoking it again')
def cancel(self, rpc: PendingRpc) -> Optional[bytes]:
"""Cancels the RPC. Returns the CANCEL packet to send.
@@ -110,10 +130,14 @@
def get_pending(self, rpc: PendingRpc, status: Optional[Status]):
"""Gets the pending RPC's context. If status is set, clears the RPC."""
if status is None:
- return self._pending[rpc][0] # Unwrap the context from the list
+ return self._pending[rpc].context
- _LOG.debug('Finishing %s with status %s', rpc, status)
- return self._pending.pop(rpc)[0]
+ if self._pending[rpc].keep_open:
+ _LOG.debug('%s finished with status %s; keeping open', rpc, status)
+ return self._pending[rpc].context
+
+ _LOG.debug('%s finished with status %s', rpc, status)
+ return self._pending.pop(rpc).context
class ClientImpl(abc.ABC):
diff --git a/pw_rpc/py/tests/callback_client_test.py b/pw_rpc/py/tests/callback_client_test.py
index 9f9d3e6..ed7ec9c 100755
--- a/pw_rpc/py/tests/callback_client_test.py
+++ b/pw_rpc/py/tests/callback_client_test.py
@@ -167,6 +167,38 @@
self.assertIs(Status.ABORTED, status)
self.assertEqual('0_o', response.payload)
+ def test_invoke_unary_rpc_keep_open(self) -> None:
+ method = self._service.SomeUnary.method
+
+ payload_1 = method.response_type(payload='-_-')
+ payload_2 = method.response_type(payload='0_o')
+
+ self._enqueue_response(1, method, Status.ABORTED, payload_1)
+
+ replies: list = []
+ enqueue_replies = lambda _, reply: replies.append(reply)
+
+ self._service.SomeUnary.invoke(method.request_type(magic_number=6),
+ enqueue_replies,
+ enqueue_replies,
+ keep_open=True)
+
+ self.assertEqual([payload_1, Status.ABORTED], replies)
+
+ # Send another packet and make sure it is processed even though the RPC
+ # terminated.
+ self._client.process_packet(
+ packet_pb2.RpcPacket(
+ type=packet_pb2.PacketType.RESPONSE,
+ channel_id=1,
+ service_id=method.service.id,
+ method_id=method.id,
+ status=Status.OK.value,
+ payload=payload_2.SerializeToString()).SerializeToString())
+
+ self.assertEqual([payload_1, Status.ABORTED, payload_2, Status.OK],
+ replies)
+
def test_invoke_unary_rpc_with_callback(self):
method = self._service.SomeUnary.method