pw_rpc: Server streaming RPC
Enables invoking a server streaming RPC. This change primarily adds the
tests to verify that server streaming RPC calls are working as intended.
Currently only a non-blocking invoke is supported. Future work is
required to support the open method and blocking calls.
Bug: b/194329554
Change-Id: Id8f44ad4a60c70fea37b032586a29a14d266ac62
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61342
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Commit-Queue: Jared Weinstein <jaredweinstein@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 99fdbe2..40046bd 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -75,7 +75,7 @@
it('processPacket with invalid proto data', () => {
const textEncoder = new TextEncoder();
const data = textEncoder.encode('NOT a packet!');
- expect(client.processPacket(data)).toEqual(Status.DATA_LOSS)
+ expect(client.processPacket(data)).toEqual(Status.DATA_LOSS);
});
it('processPacket not for client', () => {
@@ -158,7 +158,7 @@
packet.setChannelId(channelId);
packet.setServiceId(method.service.id);
packet.setMethodId(method.id);
- packet.setStatus(status)
+ packet.setStatus(status);
packet.setPayload(payload);
nextPackets.push([packet.serializeBinary(), Status.OK]);
@@ -253,9 +253,9 @@
expect(requests.length).toBeGreaterThan(0);
requests = [];
- expect(call.cancel()).toBeTrue()
- expect(call.cancel()).toBeFalse()
- expect(onNext).not.toHaveBeenCalled()
+ expect(call.cancel()).toBeTrue();
+ expect(call.cancel()).toBeFalse();
+ expect(onNext).not.toHaveBeenCalled();
});
it('nonblocking duplicate calls first is cancelled', () => {
@@ -279,4 +279,79 @@
expect(call.callbackException!.message).toEqual('Something went wrong!');
});
})
+
+ describe('ServerStreaming', () => {
+ let serverStreaming: MethodStub;
+ let request: any;
+ let requestType: any;
+ let responseType: any;
+
+ beforeEach(async () => {
+ serverStreaming = client.channel()?.methodStub(
+ 'pw.rpc.test1.TheTestService.SomeServerStreaming')!;
+ requestType = serverStreaming.method.requestType;
+ responseType = serverStreaming.method.responseType;
+ request = new requestType();
+ });
+
+ it('non-blocking call', () => {
+ const response1 = new responseType();
+ response1.setPayload('!!!');
+ const response2 = new responseType();
+ response2.setPayload('?');
+
+ const request = new requestType()
+ request.setMagicNumber(4);
+
+ for (let i = 0; i < 3; i++) {
+ enqueueServerStream(
+ 1, serverStreaming.method, response1.serializeBinary());
+ enqueueServerStream(
+ 1, serverStreaming.method, response2.serializeBinary());
+ enqueueResponse(1, serverStreaming.method, Status.ABORTED);
+
+ const onNext = jasmine.createSpy();
+ const onCompleted = jasmine.createSpy();
+ const onError = jasmine.createSpy();
+ serverStreaming.invoke(request, onNext, onCompleted, onError);
+
+ expect(onNext).toHaveBeenCalledWith(response1);
+ expect(onNext).toHaveBeenCalledWith(response2);
+ expect(onError).not.toHaveBeenCalled();
+ expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
+
+ expect(sentPayload(serverStreaming.method.requestType).getMagicNumber())
+ .toEqual(4);
+ }
+ });
+
+ it('non-blocking cancel', () => {
+ const response = new responseType();
+ response.setPayload('!!!');
+ enqueueServerStream(
+ 1, serverStreaming.method, response.serializeBinary());
+ const request = new requestType();
+ request.setMagicNumber(3);
+
+ const onNext = jasmine.createSpy();
+ const onCompleted = jasmine.createSpy();
+ const onError = jasmine.createSpy();
+ let call = serverStreaming.invoke(request, onNext, () => {}, () => {});
+ expect(onNext).toHaveBeenCalledOnceWith(response);
+
+ onNext.calls.reset();
+
+ call.cancel();
+ expect(lastRequest().getType()).toEqual(PacketType.CANCEL);
+
+ // Ensure the RPC can be called after being cancelled.
+ enqueueServerStream(
+ 1, serverStreaming.method, response.serializeBinary());
+ enqueueResponse(1, serverStreaming.method, Status.OK);
+ call = serverStreaming.invoke(request, onNext, onCompleted, onError);
+ expect(onNext).toHaveBeenCalledWith(response);
+ expect(onError).not.toHaveBeenCalled();
+ expect(onCompleted).toHaveBeenCalledOnceWith(Status.OK);
+ });
+ });
});
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index fb8a0ba..2d63f68 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -78,7 +78,15 @@
Server Streaming RPC
--------------------
-Unsupported
+Once the server stream is invoked, responses are read via the ``onNext``
+callback.
+
+.. code-block:: typescript
+
+ serverStreamRpc = client.channel()?.methodStub(
+ 'pw.rpc.test1.TheTestService.SomeUnary')!;
+ const onNext = (response) => {console.log(response)};
+ const call = serverStreamRpc.invoke(undefined, onNext);
Client Streaming RPC
--------------------
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index 3ea505d..c30f695 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -76,7 +76,10 @@
onNext: Callback = () => {},
onCompleted: Callback = () => {},
onError: Callback = () => {}): ServerStreamingCall {
- throw Error('ClientStreaming invoke() not implemented');
+ const call = new ServerStreamingCall(
+ this.rpcs, this.rpc, onNext, onCompleted, onError);
+ call.invoke(request);
+ return call;
}
}