pw_rpc: Protocol updates for client streams
- Add a CLIENT_STREAM packet for messages in client streams.
- Rename CANCEL_SERVER_STREAM to CANCEL, since it applies to all RPC
types.
- Remove server responses to CANCEL packets from protocol description.
- Document cancellation of unary RPCs.
Change-Id: I3b15beb2de94f318fac58fc3d682aefe41056676
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/48800
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_rpc/base_client_call.cc b/pw_rpc/base_client_call.cc
index da4ac01..2560ab8 100644
--- a/pw_rpc/base_client_call.cc
+++ b/pw_rpc/base_client_call.cc
@@ -43,7 +43,7 @@
void BaseClientCall::Cancel() {
if (active()) {
- channel_->Send(NewPacket(PacketType::CANCEL_SERVER_STREAM));
+ channel_->Send(NewPacket(PacketType::CANCEL));
}
}
diff --git a/pw_rpc/client_test.cc b/pw_rpc/client_test.cc
index a5993fea..7594df9 100644
--- a/pw_rpc/client_test.cc
+++ b/pw_rpc/client_test.cc
@@ -80,8 +80,7 @@
TEST(Client, ProcessPacket_ReturnsInvalidArgumentOnServerPacket) {
ClientContextForTest context;
EXPECT_EQ(context.SendPacket(PacketType::REQUEST), Status::InvalidArgument());
- EXPECT_EQ(context.SendPacket(PacketType::CANCEL_SERVER_STREAM),
- Status::InvalidArgument());
+ EXPECT_EQ(context.SendPacket(PacketType::CANCEL), Status::InvalidArgument());
}
} // namespace
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 5cde965..6ff8e43 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -350,58 +350,66 @@
Client-to-server packets
^^^^^^^^^^^^^^^^^^^^^^^^
-+---------------------------+----------------------------------+
-| packet type | description |
-+===========================+==================================+
-| REQUEST | RPC request |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | - payload |
-| | (unless first client stream) |
-| | |
-+---------------------------+----------------------------------+
-| CLIENT_STREAM_END | Client stream finished |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | |
-| | |
-+---------------------------+----------------------------------+
-| CLIENT_ERROR | Received unexpected packet |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | - status |
-+---------------------------+----------------------------------+
-| CANCEL_SERVER_STREAM | Cancel a server stream |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | |
-+---------------------------+----------------------------------+
++-------------------+-------------------------------------+
+| packet type | description |
++===================+=====================================+
+| REQUEST | Invoke an RPC |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | - payload |
+| | (unary & server streaming only) |
+| | |
++-------------------+-------------------------------------+
+| CLIENT_STREAM | Message in a client stream |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | - payload |
+| | |
++-------------------+-------------------------------------+
+| CLIENT_ERROR | Received unexpected packet |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | - status |
+| | |
++-------------------+-------------------------------------+
+| CANCEL | Cancel an ongoing RPC |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | |
++-------------------+-------------------------------------+
+| CLIENT_STREAM_END | Client stream is complete |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | |
++-------------------+-------------------------------------+
**Errors**
The client sends ``CLIENT_ERROR`` packets to a server when it receives a packet
it did not request. If the RPC is a streaming RPC, the server should abort it.
-The status code indicates the type of error. If the client does not distinguish
-between the error types, it can send whichever status is most relevant. The
-status code is logged, but all status codes result in the same action by the
-server: aborting the RPC.
+The status code indicates the type of error. The status code is logged, but all
+status codes result in the same action by the server: aborting the RPC.
* ``NOT_FOUND`` -- Received a packet for a service method the client does not
recognize.
@@ -410,38 +418,41 @@
Server-to-client packets
^^^^^^^^^^^^^^^^^^^^^^^^
-+-------------------+--------------------------------+
-| packet type | description |
-+===================+================================+
-| RESPONSE | RPC response |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | - payload |
-| | - status |
-| | (unless in server stream) |
-+-------------------+--------------------------------+
-| SERVER_STREAM_END | Server stream and RPC finished |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id |
-| | - method_id |
-| | - status |
-+-------------------+--------------------------------+
-| SERVER_ERROR | Received unexpected packet |
-| | |
-| | .. code-block:: text |
-| | |
-| | - channel_id |
-| | - service_id (if relevant) |
-| | - method_id (if relevant) |
-| | - status |
-+-------------------+--------------------------------+
++-------------------+-------------------------------------+
+| packet type | description |
++===================+=====================================+
+| RESPONSE | RPC response |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | - payload |
+| | - status |
+| | (unary & client streaming only) |
+| | |
++-------------------+-------------------------------------+
+| SERVER_STREAM_END | Server stream and RPC finished |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id |
+| | - method_id |
+| | - status |
+| | |
++-------------------+-------------------------------------+
+| SERVER_ERROR | Received unexpected packet |
+| | |
+| | .. code-block:: text |
+| | |
+| | - channel_id |
+| | - service_id (if relevant) |
+| | - method_id (if relevant) |
+| | - status |
+| | |
++-------------------+-------------------------------------+
**Errors**
@@ -450,7 +461,8 @@
status field indicates the type of error.
* ``NOT_FOUND`` -- The requested service or method does not exist.
-* ``FAILED_PRECONDITION`` -- Attempted to cancel an RPC that is not pending.
+* ``FAILED_PRECONDITION`` -- A client stream or cancel packet was sent for an
+ RPC that is not pending.
* ``RESOURCE_EXHAUSTED`` -- The request came on a new channel, but a channel
could not be allocated for it.
* ``INTERNAL`` -- The server was unable to respond to an RPC due to an
@@ -484,6 +496,29 @@
];
}
+The client may attempt to cancel a unary RPC by sending a ``CANCEL`` packet. The
+server sends no response to a cancelled RPC. If the server processes the unary
+RPC synchronously (the handling thread sends the response), it may not be
+possible to cancel the RPC.
+
+.. seqdiag::
+ :scale: 110
+
+ seqdiag {
+ default_note_color = aliceblue;
+
+ client -> server [
+ label = "request",
+ leftnote = "PacketType.REQUEST\nchannel ID\nservice ID\nmethod ID\npayload"
+ ];
+
+ client -> server [
+ noactivate,
+ label = "cancel",
+ leftnote = "PacketType.CANCEL\nchannel ID\nservice ID\nmethod ID"
+ ];
+ }
+
Server streaming RPC
^^^^^^^^^^^^^^^^^^^^
In a server streaming RPC, the client sends a single request and the server
@@ -502,7 +537,7 @@
client <-- server [
noactivate,
- label = "responses (zero or more)",
+ label = "messages (zero or more)",
rightnote = "PacketType.RESPONSE\nchannel ID\nservice ID\nmethod ID\npayload"
];
@@ -512,8 +547,8 @@
];
}
-Server streaming RPCs may be cancelled by the client. The client sends a
-``CANCEL_SERVER_STREAM`` packet to terminate the RPC.
+The client may terminate a server streaming RPC by sending a ``CANCEL`` packet.
+The server sends no response.
.. seqdiag::
:scale: 110
@@ -528,32 +563,23 @@
client <-- server [
noactivate,
- label = "responses (zero or more)",
+ label = "messages (zero or more)",
rightnote = "PacketType.RESPONSE\nchannel ID\nservice ID\nmethod ID\npayload"
];
client -> server [
noactivate,
label = "cancel",
- leftnote = "PacketType.CANCEL_SERVER_STREAM\nchannel ID\nservice ID\nmethod ID"
- ];
-
- client <- server [
- label = "done",
- rightnote = "PacketType.SERVER_STREAM_END\nchannel ID\nservice ID\nmethod ID\nstatus"
+ leftnote = "PacketType.CANCEL\nchannel ID\nservice ID\nmethod ID"
];
}
Client streaming RPC
^^^^^^^^^^^^^^^^^^^^
-In a client streaming RPC, the client sends any number of RPC requests followed
-by a ``CLIENT_STREAM_END`` packet. The server then sends a single response.
-
-The first client-to-server RPC packet does not include a payload.
-
-.. attention::
-
- ``pw_rpc`` does not yet support client streaming RPCs.
+In a client streaming RPC, the client starts the RPC by sending a ``REQUEST``
+packet with no payload. It then sends any number of messages in
+``CLIENT_STREAM`` packets, followed by a ``CLIENT_STREAM_END``. The server sends
+a single response to finish the RPC.
.. seqdiag::
:scale: 110
@@ -568,8 +594,8 @@
client --> server [
noactivate,
- label = "requests (zero or more)",
- leftnote = "PacketType.REQUEST\nchannel ID\nservice ID\nmethod ID\npayload"
+ label = "messages (zero or more)",
+ leftnote = "PacketType.CLIENT_STREAM\nchannel ID\nservice ID\nmethod ID\npayload"
];
client -> server [
@@ -584,8 +610,9 @@
];
}
-The server may terminate a client streaming RPC at any time by sending its
-response packet.
+The server may finish the RPC at any time by sending its ``RESPONSE`` packet,
+even if it has not yet received the ``CLIENT_STREAM_END`` packet. The client may
+terminate the RPC at any time by sending a ``CANCEL`` packet.
.. seqdiag::
:scale: 110
@@ -600,29 +627,24 @@
client --> server [
noactivate,
- label = "requests (zero or more)",
- leftnote = "PacketType.REQUEST\nchannel ID\nservice ID\nmethod ID\npayload"
+ label = "messages (zero or more)",
+ leftnote = "PacketType.CLIENT_STREAM\nchannel ID\nservice ID\nmethod ID\npayload"
];
- client <- server [
- label = "response",
- rightnote = "PacketType.RESPONSE\nchannel ID\nservice ID\nmethod ID\npayload\nstatus"
+ client -> server [
+ noactivate,
+ label = "cancel",
+ rightnote = "PacketType.CANCEL\nchannel ID\nservice ID\nmethod ID"
];
}
Bidirectional streaming RPC
^^^^^^^^^^^^^^^^^^^^^^^^^^^
In a bidirectional streaming RPC, the client sends any number of requests and
-the server sends any number of responses. The client sends a
-``CLIENT_STREAM_END`` packet when it has finished sending requests. The server
-sends a ``SERVER_STREAM_END`` packet after it receives the client's
-``CLIENT_STREAM_END`` and finished sending its responses.
-
-The first client-to-server RPC packet does not include a payload.
-
-.. attention::
-
- ``pw_rpc`` does not yet support bidirectional streaming RPCs.
+the server sends any number of responses. The client invokes the RPC by sending
+a ``REQUEST`` with no payload. It sends a ``CLIENT_STREAM_END`` packet when it
+has finished sending requests. The server sends a ``SERVER_STREAM_END`` packet
+to finish the RPC.
.. seqdiag::
:scale: 110
@@ -637,15 +659,15 @@
client --> server [
noactivate,
- label = "requests (zero or more)",
- leftnote = "PacketType.REQUEST\nchannel ID\nservice ID\nmethod ID\npayload"
+ label = "messages (zero or more)",
+ leftnote = "PacketType.CLIENT_STREAM\nchannel ID\nservice ID\nmethod ID\npayload"
];
... (messages in any order) ...
client <-- server [
noactivate,
- label = "responses (zero or more)",
+ label = "messages (zero or more)",
rightnote = "PacketType.RESPONSE\nchannel ID\nservice ID\nmethod ID\npayload"
];
@@ -655,22 +677,15 @@
leftnote = "PacketType.CLIENT_STREAM_END\nchannel ID\nservice ID\nmethod ID"
];
- client <-- server [
- noactivate,
- label = "responses (zero or more)",
- rightnote = "PacketType.RPC\nchannel ID\nservice ID\nmethod ID\npayload"
- ];
-
client <- server [
label = "done",
rightnote = "PacketType.SERVER_STREAM_END\nchannel ID\nservice ID\nmethod ID\nstatus"
];
}
-The server may terminate the RPC at any time by sending a ``SERVER_STREAM_END``
-packet with the status, even if the client has not sent its ``STREAM_END``. The
-client may cancel the RPC at any time by sending a ``CANCEL_SERVER_STREAM``
-packet.
+The server may finish the RPC at any time by sending the ``SERVER_STREAM_END``
+packet, even if it has not received the ``CLIENT_STREAM_END`` packet. The client
+may terminate the RPC at any time by sending a ``CANCEL`` packet.
.. seqdiag::
:scale: 110
@@ -680,31 +695,25 @@
client -> server [
label = "start",
- leftnote = "PacketType.RPC\nchannel ID\nservice ID\nmethod ID"
+ leftnote = "PacketType.REQUEST\nchannel ID\nservice ID\nmethod ID"
];
client --> server [
noactivate,
- label = "requests (zero or more)",
- leftnote = "PacketType.RPC\nchannel ID\nservice ID\nmethod ID\npayload"
+ label = "messages (zero or more)",
+ leftnote = "PacketType.CLIENT_STREAM\nchannel ID\nservice ID\nmethod ID\npayload"
];
client <-- server [
noactivate,
- label = "responses (zero or more)",
- rightnote = "PacketType.RPC\nchannel ID\nservice ID\nmethod ID\npayload"
+ label = "messages (zero or more)",
+ rightnote = "PacketType.RESPONSE\nchannel ID\nservice ID\nmethod ID\npayload"
];
client -> server [
noactivate,
label = "cancel",
- leftnote = "PacketType.CANCEL_SERVER_STREAM\nchannel ID\nservice ID\nmethod ID"
- ];
-
- client <- server [
- label = "done",
- rightnote = "PacketType.STREAM_END\nchannel ID\nservice ID\nmethod ID\nstatus"
- ];
+ leftnote = "PacketType.CANCEL\nchannel ID\nservice ID\nmethod ID" ];
}
RPC server
diff --git a/pw_rpc/internal/packet.proto b/pw_rpc/internal/packet.proto
index 459ac94..4438247 100644
--- a/pw_rpc/internal/packet.proto
+++ b/pw_rpc/internal/packet.proto
@@ -26,14 +26,17 @@
// A request from a client for a service method.
REQUEST = 0;
- // A client stream has completed.
- CLIENT_STREAM_END = 2;
+ // A message in a client stream.
+ CLIENT_STREAM = 2;
// The client received a packet for an RPC it did not request.
CLIENT_ERROR = 4;
- // The client requests cancellation of an ongoing server stream.
- CANCEL_SERVER_STREAM = 6;
+ // The client requests cancellation of an ongoing RPC.
+ CANCEL = 6;
+
+ // A client stream has completed.
+ CLIENT_STREAM_END = 8;
// Server-to-client packets
diff --git a/pw_rpc/py/pw_rpc/packets.py b/pw_rpc/py/pw_rpc/packets.py
index 3f15468..d36b7a4 100644
--- a/pw_rpc/py/pw_rpc/packets.py
+++ b/pw_rpc/py/pw_rpc/packets.py
@@ -67,11 +67,10 @@
def encode_cancel(rpc: tuple) -> bytes:
channel, service, method = _ids(rpc)
- return packet_pb2.RpcPacket(
- type=packet_pb2.PacketType.CANCEL_SERVER_STREAM,
- channel_id=channel,
- service_id=service,
- method_id=method).SerializeToString()
+ return packet_pb2.RpcPacket(type=packet_pb2.PacketType.CANCEL,
+ channel_id=channel,
+ service_id=service,
+ method_id=method).SerializeToString()
def for_server(packet):
diff --git a/pw_rpc/py/tests/callback_client_test.py b/pw_rpc/py/tests/callback_client_test.py
index ed7ec9c..0373a07 100755
--- a/pw_rpc/py/tests/callback_client_test.py
+++ b/pw_rpc/py/tests/callback_client_test.py
@@ -341,8 +341,7 @@
call.cancel()
- self.assertEqual(self._last_request.type,
- packet_pb2.PacketType.CANCEL_SERVER_STREAM)
+ self.assertEqual(self._last_request.type, packet_pb2.PacketType.CANCEL)
# Ensure the RPC can be called after being cancelled.
self._enqueue_response(1, stub.method, response=resp)
diff --git a/pw_rpc/py/tests/packets_test.py b/pw_rpc/py/tests/packets_test.py
index 4bf4dc1..11ae676 100755
--- a/pw_rpc/py/tests/packets_test.py
+++ b/pw_rpc/py/tests/packets_test.py
@@ -58,7 +58,7 @@
self.assertEqual(
packet,
- RpcPacket(type=PacketType.CANCEL_SERVER_STREAM,
+ RpcPacket(type=PacketType.CANCEL,
channel_id=9,
service_id=8,
method_id=7))
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index 59da7d2..1c9e813 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -104,15 +104,18 @@
method->Invoke(call, packet);
break;
}
- case PacketType::CLIENT_STREAM_END:
+ case PacketType::CLIENT_STREAM:
// TODO(hepler): Support client streaming RPCs.
break;
case PacketType::CLIENT_ERROR:
HandleClientError(packet);
break;
- case PacketType::CANCEL_SERVER_STREAM:
+ case PacketType::CANCEL:
HandleCancelPacket(packet, *channel);
break;
+ case PacketType::CLIENT_STREAM_END:
+ // TODO(hepler): Handle client stream end packets.
+ break;
default:
channel->Send(Packet::ServerError(packet, Status::Unimplemented()));
PW_LOG_WARN("Unable to handle packet of type %u",
diff --git a/pw_rpc/server_test.cc b/pw_rpc/server_test.cc
index b15d276..541000b 100644
--- a/pw_rpc/server_test.cc
+++ b/pw_rpc/server_test.cc
@@ -213,9 +213,8 @@
TEST_F(BasicServer, ProcessPacket_Cancel_MethodNotActive_SendsError) {
// Set up a fake ServerWriter representing an ongoing RPC.
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 1, 42, 100),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 1, 42, 100),
+ output_));
const Packet& packet = output_.sent_packet();
EXPECT_EQ(packet.type(), PacketType::SERVER_ERROR);
@@ -242,18 +241,16 @@
TEST_F(MethodPending, ProcessPacket_Cancel_ClosesServerWriter) {
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 1, 42, 100),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 1, 42, 100),
+ output_));
EXPECT_FALSE(writer_.open());
}
TEST_F(MethodPending, ProcessPacket_Cancel_SendsStreamEndPacket) {
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 1, 42, 100),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 1, 42, 100),
+ output_));
const Packet& packet = output_.sent_packet();
EXPECT_EQ(packet.type(), PacketType::SERVER_STREAM_END);
@@ -276,9 +273,8 @@
TEST_F(MethodPending, ProcessPacket_Cancel_IncorrectChannel) {
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 2, 42, 100),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 2, 42, 100),
+ output_));
EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
EXPECT_EQ(output_.sent_packet().status(), Status::FailedPrecondition());
@@ -287,9 +283,8 @@
TEST_F(MethodPending, ProcessPacket_Cancel_IncorrectService) {
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 1, 43, 100),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 1, 43, 100),
+ output_));
EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
@@ -300,9 +295,8 @@
TEST_F(MethodPending, ProcessPacket_CancelIncorrectMethod) {
EXPECT_EQ(OkStatus(),
- server_.ProcessPacket(
- EncodeRequest(PacketType::CANCEL_SERVER_STREAM, 1, 42, 101),
- output_));
+ server_.ProcessPacket(EncodeRequest(PacketType::CANCEL, 1, 42, 101),
+ output_));
EXPECT_EQ(output_.sent_packet().type(), PacketType::SERVER_ERROR);
EXPECT_EQ(output_.sent_packet().status(), Status::NotFound());
EXPECT_TRUE(writer_.open());