pw_rpc: getResponses() logic

getResponses is required for blocking calls and reading responses from a
server streaming or bidirectional RPC.

Future work is required to support a timeout on the blocking call.

Bug: b/194329554
No-Docs-Update-Reason: No changes to external API
Change-Id: I430787c069ff7c3b095ecb6c938462043e838484
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61788
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/package.json b/package.json
index 75bd9c9..211580b 100644
--- a/package.json
+++ b/package.json
@@ -49,7 +49,8 @@
     "rxjs": "^7.2.0",
     "tmp": "0.2.1",
     "ts-protoc-gen": "^0.15.0",
-    "typescript": "^4.3.5"
+    "typescript": "^4.3.5",
+    "wait-queue": "^1.1.4"
   },
   "scripts": {
     "check": "gts check",
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index 066dc46..49b70f0 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -34,6 +34,7 @@
         "//pw_rpc:packet_proto_tspb",
         "//pw_status/ts:pw_status",
         "@npm//@types/google-protobuf",
+        "@npm//wait-queue",
     ],
 )
 
@@ -53,6 +54,7 @@
 ts_library(
     name = "rpc_utils_test_lib",
     srcs = [
+        "call_test.ts",
         "descriptors_test.ts",
         "packets_test.ts",
     ],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 82f59c1..d62bed5 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -14,6 +14,7 @@
 
 import {Message} from 'google-protobuf';
 import {Status} from 'pigweed/pw_status/ts/status';
+import WaitQueue = require('wait-queue')
 
 import {PendingCalls, Rpc} from './rpc_classes';
 
@@ -34,6 +35,9 @@
 
 /** Represent an in-progress or completed RPC call. */
 export class Call {
+  // Responses ordered by arrival time. Undefined signifies stream completion.
+  private responseQueue = new WaitQueue<Message|undefined>();
+
   private rpcs: PendingCalls;
   private rpc: Rpc;
 
@@ -85,20 +89,60 @@
   }
 
   handleResponse(response: Message): void {
+    this.responseQueue.push(response);
     const callback = () => this.onNext(response);
     this.invokeCallback(callback)
   }
 
   handleCompletion(status: Status) {
+    this.status = status;
+    this.responseQueue.push(undefined);
     const callback = () => this.onCompleted(status);
     this.invokeCallback(callback)
   }
 
   handleError(error: Status): void {
     this.error = error;
+    this.responseQueue.push(undefined);
     this.invokeCallback(() => this.onError(error));
   }
 
+  /**
+   * Yields responses up the specified count as they are added.
+   *
+   * Throws an error as soon as it is received even if there are still responses
+   * in the queue.
+   *
+   * Usage
+   * ```
+   * for await (const response of call.getResponses(5)) {
+   *  console.log(response);
+   * }
+   * ```
+   *
+   * @param {number} count The number of responses to read before returning.
+   *    If no value is specified, getResponses will block until the stream
+   *    either ends or hits an error.
+   */
+  async * getResponses(count?: number): AsyncGenerator<Message> {
+    this.checkErrors();
+
+    if (this.completed() && this.responseQueue.length == 0) {
+      return;
+    }
+
+    let remaining = count ?? Number.POSITIVE_INFINITY;
+    while (remaining > 0) {
+      const response = await this.responseQueue.shift();
+      if (response === undefined) {
+        return;
+      }
+      this.checkErrors();
+      yield response!;
+      remaining -= 1;
+    }
+  }
+
   cancel(): boolean {
     if (this.completed()) {
       return false;
diff --git a/pw_rpc/ts/call_test.ts b/pw_rpc/ts/call_test.ts
new file mode 100644
index 0000000..f38502b
--- /dev/null
+++ b/pw_rpc/ts/call_test.ts
@@ -0,0 +1,134 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/* eslint-env browser, jasmine */
+import 'jasmine';
+
+import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
+
+import {Call} from './call';
+import {Channel, Method, Service} from './descriptors';
+import {PendingCalls, Rpc} from './rpc_classes';
+
+class FakeRpc {
+  readonly channel: any = undefined;
+  readonly service: any = undefined;
+  readonly method: any = undefined;
+
+  idSet: [number, number, number] = [1, 2, 3];
+  idString = '1.2.3';
+}
+
+describe('Call', () => {
+  let call: Call;
+
+  beforeEach(() => {
+    const noop = () => {};
+    const pendingCalls = new PendingCalls();
+    const channel = jasmine.createSpy()
+    const rpc = new FakeRpc()
+    call = new Call(pendingCalls, rpc, noop, noop, noop);
+  });
+
+  function newMessage(magicNumber: number = 1): Request {
+    const message = new Request();
+    message.setMagicNumber(magicNumber);
+    return message;
+  }
+
+  it('getResponse returns all responses.', async () => {
+    const message1 = newMessage(1);
+    const message2 = newMessage(2);
+    const message3 = newMessage(3);
+
+    // Queue three responses
+    call.handleResponse(message1);
+    call.handleResponse(message2);
+    call.handleResponse(message3);
+
+    let responses = call.getResponses(2);
+    expect((await responses.next()).value).toEqual(message1);
+    expect((await responses.next()).value).toEqual(message2);
+    expect((await responses.next()).done).toBeTrue();
+
+    responses = call.getResponses(1);
+    expect((await responses.next()).value).toEqual(message3);
+    expect((await responses.next()).done).toBeTrue();
+  });
+
+  it('getResponse early returns on stream end.', async () => {
+    const message = newMessage();
+    let responses = call.getResponses(2);
+
+    // Queue one response and an early completion.
+    call.handleResponse(message);
+    call.handleCompletion(0);
+
+    expect((await responses.next()).value).toEqual(message);
+    expect((await responses.next()).done).toBeTrue();
+  });
+
+  it('getResponse promise is rejected on stream error.', async () => {
+    const message = newMessage();
+    let responses = call.getResponses(3);
+
+    call.handleResponse(message);
+    expect((await responses.next()).value).toEqual(message);
+
+    call.handleResponse(message);
+    call.handleError(1);
+
+    // Promise is rejected as soon as an error is received, even if there is a
+    // response in the queue.
+    await expectAsync(responses.next()).toBeRejected()
+  });
+
+  it('getResponse waits if queue is empty', async () => {
+    const message1 = newMessage(1);
+    const message2 = newMessage(2);
+    let responses = call.getResponses(2);
+
+    // Queue two responses after a small delay
+    setTimeout(() => {
+      call.handleResponse(message1);
+      call.handleResponse(message2);
+      call.handleCompletion(0);
+      expect(call.completed()).toBeTrue();
+    }, 200);
+
+    expect(call.completed()).toBeFalse();
+    expect((await responses.next()).value).toEqual(message1);
+    expect((await responses.next()).value).toEqual(message2);
+    expect((await responses.next()).done).toBeTrue();
+  });
+
+  it('getResponse without count fetches all results', async () => {
+    const message1 = newMessage(1);
+    const message2 = newMessage(2);
+    let responses = call.getResponses();
+
+    call.handleResponse(message1);
+    expect((await responses.next()).value).toEqual(message1);
+
+    setTimeout(() => {
+      call.handleResponse(message2);
+      call.handleCompletion(0);
+      expect(call.completed()).toBeTrue();
+    }, 200);
+
+    expect(call.completed()).toBeFalse();
+    expect((await responses.next()).value).toEqual(message2);
+    expect((await responses.next()).done).toBeTrue();
+  });
+});
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index 0be9d77..29e3777 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -81,17 +81,30 @@
 
 Server Streaming RPC
 --------------------
-Once the server stream is invoked, responses are read via the ``onNext``
-callback.
+Once the server stream is invoked, responses can either be read by providing
+the ``onNext`` callback or using the promise API.
 
 .. code-block:: typescript
 
   serverStreamRpc = client.channel()?.methodStub(
       'pw.rpc.test1.TheTestService.SomeUnary')!
       as ServerStreamingMethodStub;
+
+  // Callback
   const onNext = (response) => {console.log(response)};
   const call = serverStreamRpc.invoke(undefined, onNext);
 
+  // Promise
+  const call = serverStreamRpc.invoke();
+  for await (const response of call.getResponses(2)) {
+   console.log(response);
+  }
+  const responses = call.getResponse() // All responses until stream end.
+  while (!responses.done) {
+    console.log(await responses.value());
+  }
+
+
 Client Streaming RPC
 --------------------
 Unsupported
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
index 2548ad9..d611be4 100644
--- a/pw_rpc/ts/rpc_classes.ts
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -21,9 +21,9 @@
 
 /** Data class for a pending RPC call. */
 export class Rpc {
-  channel: Channel;
-  service: Service;
-  method: Method;
+  readonly channel: Channel;
+  readonly service: Service;
+  readonly method: Method;
 
   constructor(channel: Channel, service: Service, method: Method) {
     this.channel = channel;
@@ -78,7 +78,7 @@
    * server streaming RPC prior to any clients invoking it.
    */
   open(rpc: Rpc, call: Call): Call|undefined {
-    console.debug('Starting %s', rpc);
+    console.debug(`Starting ${rpc}`);
     const previous = this.pending.get(rpc.idString);
     this.pending.set(rpc.idString, call)
     return previous;
diff --git a/yarn.lock b/yarn.lock
index 9d355dd..6448a34 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -4865,6 +4865,11 @@
   resolved "https://registry.yarnpkg.com/void-elements/-/void-elements-2.0.1.tgz#c066afb582bb1cb4128d60ea92392e94d5e9dbec"
   integrity sha1-wGavtYK7HLQSjWDqkjkulNXp2+w=
 
+wait-queue@^1.1.4:
+  version "1.1.4"
+  resolved "https://registry.yarnpkg.com/wait-queue/-/wait-queue-1.1.4.tgz#344f9bdd6e011ddc0bb1e3252eeb41234f7a8a85"
+  integrity sha512-/VdMghiBDG/Ch43ZRp3d8OSd8A0dx8hfkBO7AfWCDzMn2blHquMf+3gqHHhYcggSBpKf7VTzA939bb0DevYKBA==
+
 which-boxed-primitive@^1.0.2:
   version "1.0.2"
   resolved "https://registry.yarnpkg.com/which-boxed-primitive/-/which-boxed-primitive-1.0.2.tgz#13757bc89b209b049fe5d86430e21cf40a89a8e6"