pw_rpc: getResponses() logic
getResponses is required for blocking calls and reading responses from a
server streaming or bidirectional RPC.
Future work is required to support a timeout on the blocking call.
Bug: b/194329554
No-Docs-Update-Reason: No changes to external API
Change-Id: I430787c069ff7c3b095ecb6c938462043e838484
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61788
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/package.json b/package.json
index 75bd9c9..211580b 100644
--- a/package.json
+++ b/package.json
@@ -49,7 +49,8 @@
"rxjs": "^7.2.0",
"tmp": "0.2.1",
"ts-protoc-gen": "^0.15.0",
- "typescript": "^4.3.5"
+ "typescript": "^4.3.5",
+ "wait-queue": "^1.1.4"
},
"scripts": {
"check": "gts check",
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index 066dc46..49b70f0 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -34,6 +34,7 @@
"//pw_rpc:packet_proto_tspb",
"//pw_status/ts:pw_status",
"@npm//@types/google-protobuf",
+ "@npm//wait-queue",
],
)
@@ -53,6 +54,7 @@
ts_library(
name = "rpc_utils_test_lib",
srcs = [
+ "call_test.ts",
"descriptors_test.ts",
"packets_test.ts",
],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 82f59c1..d62bed5 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -14,6 +14,7 @@
import {Message} from 'google-protobuf';
import {Status} from 'pigweed/pw_status/ts/status';
+import WaitQueue = require('wait-queue')
import {PendingCalls, Rpc} from './rpc_classes';
@@ -34,6 +35,9 @@
/** Represent an in-progress or completed RPC call. */
export class Call {
+ // Responses ordered by arrival time. Undefined signifies stream completion.
+ private responseQueue = new WaitQueue<Message|undefined>();
+
private rpcs: PendingCalls;
private rpc: Rpc;
@@ -85,20 +89,60 @@
}
handleResponse(response: Message): void {
+ this.responseQueue.push(response);
const callback = () => this.onNext(response);
this.invokeCallback(callback)
}
handleCompletion(status: Status) {
+ this.status = status;
+ this.responseQueue.push(undefined);
const callback = () => this.onCompleted(status);
this.invokeCallback(callback)
}
handleError(error: Status): void {
this.error = error;
+ this.responseQueue.push(undefined);
this.invokeCallback(() => this.onError(error));
}
+ /**
+ * Yields responses up the specified count as they are added.
+ *
+ * Throws an error as soon as it is received even if there are still responses
+ * in the queue.
+ *
+ * Usage
+ * ```
+ * for await (const response of call.getResponses(5)) {
+ * console.log(response);
+ * }
+ * ```
+ *
+ * @param {number} count The number of responses to read before returning.
+ * If no value is specified, getResponses will block until the stream
+ * either ends or hits an error.
+ */
+ async * getResponses(count?: number): AsyncGenerator<Message> {
+ this.checkErrors();
+
+ if (this.completed() && this.responseQueue.length == 0) {
+ return;
+ }
+
+ let remaining = count ?? Number.POSITIVE_INFINITY;
+ while (remaining > 0) {
+ const response = await this.responseQueue.shift();
+ if (response === undefined) {
+ return;
+ }
+ this.checkErrors();
+ yield response!;
+ remaining -= 1;
+ }
+ }
+
cancel(): boolean {
if (this.completed()) {
return false;
diff --git a/pw_rpc/ts/call_test.ts b/pw_rpc/ts/call_test.ts
new file mode 100644
index 0000000..f38502b
--- /dev/null
+++ b/pw_rpc/ts/call_test.ts
@@ -0,0 +1,134 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/* eslint-env browser, jasmine */
+import 'jasmine';
+
+import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
+
+import {Call} from './call';
+import {Channel, Method, Service} from './descriptors';
+import {PendingCalls, Rpc} from './rpc_classes';
+
+class FakeRpc {
+ readonly channel: any = undefined;
+ readonly service: any = undefined;
+ readonly method: any = undefined;
+
+ idSet: [number, number, number] = [1, 2, 3];
+ idString = '1.2.3';
+}
+
+describe('Call', () => {
+ let call: Call;
+
+ beforeEach(() => {
+ const noop = () => {};
+ const pendingCalls = new PendingCalls();
+ const channel = jasmine.createSpy()
+ const rpc = new FakeRpc()
+ call = new Call(pendingCalls, rpc, noop, noop, noop);
+ });
+
+ function newMessage(magicNumber: number = 1): Request {
+ const message = new Request();
+ message.setMagicNumber(magicNumber);
+ return message;
+ }
+
+ it('getResponse returns all responses.', async () => {
+ const message1 = newMessage(1);
+ const message2 = newMessage(2);
+ const message3 = newMessage(3);
+
+ // Queue three responses
+ call.handleResponse(message1);
+ call.handleResponse(message2);
+ call.handleResponse(message3);
+
+ let responses = call.getResponses(2);
+ expect((await responses.next()).value).toEqual(message1);
+ expect((await responses.next()).value).toEqual(message2);
+ expect((await responses.next()).done).toBeTrue();
+
+ responses = call.getResponses(1);
+ expect((await responses.next()).value).toEqual(message3);
+ expect((await responses.next()).done).toBeTrue();
+ });
+
+ it('getResponse early returns on stream end.', async () => {
+ const message = newMessage();
+ let responses = call.getResponses(2);
+
+ // Queue one response and an early completion.
+ call.handleResponse(message);
+ call.handleCompletion(0);
+
+ expect((await responses.next()).value).toEqual(message);
+ expect((await responses.next()).done).toBeTrue();
+ });
+
+ it('getResponse promise is rejected on stream error.', async () => {
+ const message = newMessage();
+ let responses = call.getResponses(3);
+
+ call.handleResponse(message);
+ expect((await responses.next()).value).toEqual(message);
+
+ call.handleResponse(message);
+ call.handleError(1);
+
+ // Promise is rejected as soon as an error is received, even if there is a
+ // response in the queue.
+ await expectAsync(responses.next()).toBeRejected()
+ });
+
+ it('getResponse waits if queue is empty', async () => {
+ const message1 = newMessage(1);
+ const message2 = newMessage(2);
+ let responses = call.getResponses(2);
+
+ // Queue two responses after a small delay
+ setTimeout(() => {
+ call.handleResponse(message1);
+ call.handleResponse(message2);
+ call.handleCompletion(0);
+ expect(call.completed()).toBeTrue();
+ }, 200);
+
+ expect(call.completed()).toBeFalse();
+ expect((await responses.next()).value).toEqual(message1);
+ expect((await responses.next()).value).toEqual(message2);
+ expect((await responses.next()).done).toBeTrue();
+ });
+
+ it('getResponse without count fetches all results', async () => {
+ const message1 = newMessage(1);
+ const message2 = newMessage(2);
+ let responses = call.getResponses();
+
+ call.handleResponse(message1);
+ expect((await responses.next()).value).toEqual(message1);
+
+ setTimeout(() => {
+ call.handleResponse(message2);
+ call.handleCompletion(0);
+ expect(call.completed()).toBeTrue();
+ }, 200);
+
+ expect(call.completed()).toBeFalse();
+ expect((await responses.next()).value).toEqual(message2);
+ expect((await responses.next()).done).toBeTrue();
+ });
+});
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index 0be9d77..29e3777 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -81,17 +81,30 @@
Server Streaming RPC
--------------------
-Once the server stream is invoked, responses are read via the ``onNext``
-callback.
+Once the server stream is invoked, responses can either be read by providing
+the ``onNext`` callback or using the promise API.
.. code-block:: typescript
serverStreamRpc = client.channel()?.methodStub(
'pw.rpc.test1.TheTestService.SomeUnary')!
as ServerStreamingMethodStub;
+
+ // Callback
const onNext = (response) => {console.log(response)};
const call = serverStreamRpc.invoke(undefined, onNext);
+ // Promise
+ const call = serverStreamRpc.invoke();
+ for await (const response of call.getResponses(2)) {
+ console.log(response);
+ }
+ const responses = call.getResponse() // All responses until stream end.
+ while (!responses.done) {
+ console.log(await responses.value());
+ }
+
+
Client Streaming RPC
--------------------
Unsupported
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
index 2548ad9..d611be4 100644
--- a/pw_rpc/ts/rpc_classes.ts
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -21,9 +21,9 @@
/** Data class for a pending RPC call. */
export class Rpc {
- channel: Channel;
- service: Service;
- method: Method;
+ readonly channel: Channel;
+ readonly service: Service;
+ readonly method: Method;
constructor(channel: Channel, service: Service, method: Method) {
this.channel = channel;
@@ -78,7 +78,7 @@
* server streaming RPC prior to any clients invoking it.
*/
open(rpc: Rpc, call: Call): Call|undefined {
- console.debug('Starting %s', rpc);
+ console.debug(`Starting ${rpc}`);
const previous = this.pending.get(rpc.idString);
this.pending.set(rpc.idString, call)
return previous;
diff --git a/yarn.lock b/yarn.lock
index 9d355dd..6448a34 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -4865,6 +4865,11 @@
resolved "https://registry.yarnpkg.com/void-elements/-/void-elements-2.0.1.tgz#c066afb582bb1cb4128d60ea92392e94d5e9dbec"
integrity sha1-wGavtYK7HLQSjWDqkjkulNXp2+w=
+wait-queue@^1.1.4:
+ version "1.1.4"
+ resolved "https://registry.yarnpkg.com/wait-queue/-/wait-queue-1.1.4.tgz#344f9bdd6e011ddc0bb1e3252eeb41234f7a8a85"
+ integrity sha512-/VdMghiBDG/Ch43ZRp3d8OSd8A0dx8hfkBO7AfWCDzMn2blHquMf+3gqHHhYcggSBpKf7VTzA939bb0DevYKBA==
+
which-boxed-primitive@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/which-boxed-primitive/-/which-boxed-primitive-1.0.2.tgz#13757bc89b209b049fe5d86430e21cf40a89a8e6"