pw_rpc: Add request_completion to ServerStreamingCall python API
This matches the RequestCompletion API in c++ to send a completion
packet for server streaming calls.
Change-Id: I29877a9388964b10913765d7f75c11ad60c195fb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/168439
Commit-Queue: Austin Foxley <afoxley@google.com>
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Reviewed-by: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_rpc/py/pw_rpc/callback_client/call.py b/pw_rpc/py/pw_rpc/callback_client/call.py
index 0d3d15c..581e4ba 100644
--- a/pw_rpc/py/pw_rpc/callback_client/call.py
+++ b/pw_rpc/py/pw_rpc/callback_client/call.py
@@ -322,6 +322,11 @@
) -> Iterator:
return self._get_responses(count=count, timeout_s=timeout_s)
+ def request_completion(self) -> None:
+ """Sends client completion packet to server."""
+ if not self.completed():
+ self._rpcs.send_client_stream_end(self._rpc)
+
def __iter__(self) -> Iterator:
return self.get_responses()
diff --git a/pw_rpc/py/tests/callback_client_test.py b/pw_rpc/py/tests/callback_client_test.py
index 3b437c2..c014e3e 100755
--- a/pw_rpc/py/tests/callback_client_test.py
+++ b/pw_rpc/py/tests/callback_client_test.py
@@ -688,6 +688,40 @@
]
)
+ def test_request_completion(self) -> None:
+ resp = self.rpc.method.response_type(payload='!!!')
+ self._enqueue_server_stream(CLIENT_CHANNEL_ID, self.rpc.method, resp)
+
+ callback = mock.Mock()
+ call = self.rpc.invoke(self._request(magic_number=3), callback)
+ callback.assert_called_once_with(
+ call, self.rpc.method.response_type(payload='!!!')
+ )
+
+ callback.reset_mock()
+
+ call.request_completion()
+
+ self.assertEqual(
+ self.last_request().type,
+ packet_pb2.PacketType.CLIENT_REQUEST_COMPLETION,
+ )
+
+ # Ensure the RPC can be called after being completed.
+ self._enqueue_server_stream(CLIENT_CHANNEL_ID, self.method, resp)
+ self._enqueue_response(CLIENT_CHANNEL_ID, self.method, Status.OK)
+
+ call = self.rpc.invoke(
+ self._request(magic_number=3), callback, callback
+ )
+
+ callback.assert_has_calls(
+ [
+ mock.call(call, self.method.response_type(payload='!!!')),
+ mock.call(call, Status.OK),
+ ]
+ )
+
def test_nonblocking_with_request_args(self) -> None:
self.rpc.invoke(request_args=dict(magic_number=1138))
self.assertEqual(