pw_rpc: Server streaming RPC

Enables invoking a server streaming RPC. This change primarily adds the
tests to verify that server streaming RPC calls are working as intended.
Currently only a non-blocking invoke is supported. Future work is
required to support the open method and blocking calls.

Bug: b/194329554
Change-Id: Id8f44ad4a60c70fea37b032586a29a14d266ac62
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61342
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Commit-Queue: Jared Weinstein <jaredweinstein@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 99fdbe2..40046bd 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -75,7 +75,7 @@
   it('processPacket with invalid proto data', () => {
     const textEncoder = new TextEncoder();
     const data = textEncoder.encode('NOT a packet!');
-    expect(client.processPacket(data)).toEqual(Status.DATA_LOSS)
+    expect(client.processPacket(data)).toEqual(Status.DATA_LOSS);
   });
 
   it('processPacket not for client', () => {
@@ -158,7 +158,7 @@
     packet.setChannelId(channelId);
     packet.setServiceId(method.service.id);
     packet.setMethodId(method.id);
-    packet.setStatus(status)
+    packet.setStatus(status);
     packet.setPayload(payload);
 
     nextPackets.push([packet.serializeBinary(), Status.OK]);
@@ -253,9 +253,9 @@
       expect(requests.length).toBeGreaterThan(0);
       requests = [];
 
-      expect(call.cancel()).toBeTrue()
-      expect(call.cancel()).toBeFalse()
-      expect(onNext).not.toHaveBeenCalled()
+      expect(call.cancel()).toBeTrue();
+      expect(call.cancel()).toBeFalse();
+      expect(onNext).not.toHaveBeenCalled();
     });
 
     it('nonblocking duplicate calls first is cancelled', () => {
@@ -279,4 +279,79 @@
       expect(call.callbackException!.message).toEqual('Something went wrong!');
     });
   })
+
+  describe('ServerStreaming', () => {
+    let serverStreaming: MethodStub;
+    let request: any;
+    let requestType: any;
+    let responseType: any;
+
+    beforeEach(async () => {
+      serverStreaming = client.channel()?.methodStub(
+          'pw.rpc.test1.TheTestService.SomeServerStreaming')!;
+      requestType = serverStreaming.method.requestType;
+      responseType = serverStreaming.method.responseType;
+      request = new requestType();
+    });
+
+    it('non-blocking call', () => {
+      const response1 = new responseType();
+      response1.setPayload('!!!');
+      const response2 = new responseType();
+      response2.setPayload('?');
+
+      const request = new requestType()
+      request.setMagicNumber(4);
+
+      for (let i = 0; i < 3; i++) {
+        enqueueServerStream(
+            1, serverStreaming.method, response1.serializeBinary());
+        enqueueServerStream(
+            1, serverStreaming.method, response2.serializeBinary());
+        enqueueResponse(1, serverStreaming.method, Status.ABORTED);
+
+        const onNext = jasmine.createSpy();
+        const onCompleted = jasmine.createSpy();
+        const onError = jasmine.createSpy();
+        serverStreaming.invoke(request, onNext, onCompleted, onError);
+
+        expect(onNext).toHaveBeenCalledWith(response1);
+        expect(onNext).toHaveBeenCalledWith(response2);
+        expect(onError).not.toHaveBeenCalled();
+        expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
+
+        expect(sentPayload(serverStreaming.method.requestType).getMagicNumber())
+            .toEqual(4);
+      }
+    });
+
+    it('non-blocking cancel', () => {
+      const response = new responseType();
+      response.setPayload('!!!');
+      enqueueServerStream(
+          1, serverStreaming.method, response.serializeBinary());
+      const request = new requestType();
+      request.setMagicNumber(3);
+
+      const onNext = jasmine.createSpy();
+      const onCompleted = jasmine.createSpy();
+      const onError = jasmine.createSpy();
+      let call = serverStreaming.invoke(request, onNext, () => {}, () => {});
+      expect(onNext).toHaveBeenCalledOnceWith(response);
+
+      onNext.calls.reset();
+
+      call.cancel();
+      expect(lastRequest().getType()).toEqual(PacketType.CANCEL);
+
+      // Ensure the RPC can be called after being cancelled.
+      enqueueServerStream(
+          1, serverStreaming.method, response.serializeBinary());
+      enqueueResponse(1, serverStreaming.method, Status.OK);
+      call = serverStreaming.invoke(request, onNext, onCompleted, onError);
+      expect(onNext).toHaveBeenCalledWith(response);
+      expect(onError).not.toHaveBeenCalled();
+      expect(onCompleted).toHaveBeenCalledOnceWith(Status.OK);
+    });
+  });
 });
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index fb8a0ba..2d63f68 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -78,7 +78,15 @@
 
 Server Streaming RPC
 --------------------
-Unsupported
+Once the server stream is invoked, responses are read via the ``onNext``
+callback.
+
+.. code-block:: typescript
+
+  serverStreamRpc = client.channel()?.methodStub(
+      'pw.rpc.test1.TheTestService.SomeUnary')!;
+  const onNext = (response) => {console.log(response)};
+  const call = serverStreamRpc.invoke(undefined, onNext);
 
 Client Streaming RPC
 --------------------
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index 3ea505d..c30f695 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -76,7 +76,10 @@
       onNext: Callback = () => {},
       onCompleted: Callback = () => {},
       onError: Callback = () => {}): ServerStreamingCall {
-    throw Error('ClientStreaming invoke() not implemented');
+    const call = new ServerStreamingCall(
+        this.rpcs, this.rpc, onNext, onCompleted, onError);
+    call.invoke(request);
+    return call;
   }
 }