pw_rpc: Non-blocking unary RPC

Tests: `bazel test //pw_rpc/ts:rpc_test`

Bug: b/194329554
Change-Id: Ie8777c18f063c5746277ff9be77691773b2fda29
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60924
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index a14688e..066dc46 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -81,6 +81,7 @@
         "//pw_protobuf_compiler/ts:proto_lib",
         "//pw_rpc:packet_proto_tspb",
         "//pw_status/ts:pw_status",
+        "@npm//@types/google-protobuf",
         "@npm//@types/jasmine",
         "@npm//@types/node",
     ],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 9f4dabb..82f59c1 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -19,6 +19,19 @@
 
 export type Callback = (a: any) => any;
 
+class RpcError extends Error {
+  constructor(rpc: Rpc, status: Status) {
+    let message = '';
+    if (status === Status.NOT_FOUND) {
+      message = ': the RPC server does not support this RPC';
+    } else if (status === Status.DATA_LOSS) {
+      message = ': an error occurred while decoding the RPC payload';
+    }
+
+    super(`${rpc.method} failed with error ${Status[status]}${message}`);
+  }
+}
+
 /** Represent an in-progress or completed RPC call. */
 export class Call {
   private rpcs: PendingCalls;
@@ -34,7 +47,6 @@
   error?: Status;
   callbackException?: Error;
 
-
   constructor(
       rpcs: PendingCalls,
       rpc: Rpc,
@@ -82,6 +94,11 @@
     this.invokeCallback(callback)
   }
 
+  handleError(error: Status): void {
+    this.error = error;
+    this.invokeCallback(() => this.onError(error));
+  }
+
   cancel(): boolean {
     if (this.completed()) {
       return false;
@@ -91,10 +108,14 @@
     return this.rpcs.sendCancel(this.rpc);
   }
 
-  handleError(error: Status): void {
-    this.error = error
-    const callback = () => this.onError(2);
-    this.invokeCallback(callback);
+
+  private checkErrors(): void {
+    if (this.callbackException !== undefined) {
+      throw this.callbackException;
+    }
+    if (this.error !== undefined) {
+      throw new RpcError(this.rpc, this.error);
+    }
   }
 }
 
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index ab9c510..99fdbe2 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -15,13 +15,15 @@
 /* eslint-env browser, jasmine */
 import 'jasmine';
 
+import {Message} from 'google-protobuf';
 import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
-import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Library, MessageCreator} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
 import {Status} from 'pigweed/pw_status/ts/status';
 import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
 
 import {Client} from './client';
-import {Channel} from './descriptors';
+import {Channel, Method} from './descriptors';
+import {MethodStub} from './method';
 import * as packets from './packets';
 
 const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
@@ -129,4 +131,152 @@
     expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
     expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
   });
-})
+});
+
+describe('RPC', () => {
+  let lib: Library;
+  let client: Client;
+  let lastPacketSent: RpcPacket;
+  let requests: RpcPacket[] = [];
+  let nextPackets: [Uint8Array, Status][] = [];
+  let responseLock = false;
+
+  beforeEach(async () => {
+    lib = await Library.fromFileDescriptorSet(
+        TEST_PROTO_PATH, 'test_protos_tspb');
+    const channels = [new Channel(1, handlePacket), new Channel(2, () => {})];
+    client = Client.fromProtoSet(channels, lib);
+  });
+
+  function enqueueResponse(
+      channelId: number,
+      method: Method,
+      status: Status,
+      payload: Uint8Array = new Uint8Array()) {
+    const packet = new RpcPacket();
+    packet.setType(PacketType.RESPONSE);
+    packet.setChannelId(channelId);
+    packet.setServiceId(method.service.id);
+    packet.setMethodId(method.id);
+    packet.setStatus(status)
+    packet.setPayload(payload);
+
+    nextPackets.push([packet.serializeBinary(), Status.OK]);
+  }
+
+  function enqueueServerStream(
+      channelId: number,
+      method: Method,
+      response: Uint8Array,
+      status: Status = Status.OK) {
+    const packet = new RpcPacket();
+    packet.setType(PacketType.SERVER_STREAM);
+    packet.setChannelId(channelId);
+    packet.setServiceId(method.service.id);
+    packet.setMethodId(method.id);
+    packet.setPayload(response);
+    nextPackets.push([packet.serializeBinary(), status]);
+  }
+
+
+  function lastRequest(): RpcPacket {
+    if (requests.length == 0) {
+      throw Error('Tried to fetch request from empty list');
+    }
+    return requests[requests.length - 1];
+  }
+
+  function sentPayload(messageType: typeof Message): any {
+    return messageType.deserializeBinary(lastRequest().getPayload_asU8());
+  }
+
+  function handlePacket(data: Uint8Array): void {
+    requests.push(packets.decode(data));
+
+    if (responseLock == true) {
+      return;
+    }
+
+    // Avoid infinite recursion when processing a packet causes another packet
+    // to send.
+    responseLock = true;
+    for (const [packet, status] of nextPackets) {
+      expect(client.processPacket(packet)).toEqual(status);
+    }
+    nextPackets = [];
+    responseLock = false;
+  }
+
+  describe('Unary', () => {
+    let unaryStub: MethodStub;
+    let request: any;
+    let requestType: MessageCreator;
+    let responseType: MessageCreator;
+
+    beforeEach(async () => {
+      unaryStub = client.channel()?.methodStub(
+          'pw.rpc.test1.TheTestService.SomeUnary')!;
+      requestType = unaryStub.method.requestType;
+      responseType = unaryStub.method.responseType;
+      request = new requestType();
+    });
+
+
+    it('nonblocking call', () => {
+      for (let i = 0; i < 3; i++) {
+        const response: any = new responseType();
+        response.setPayload('hello world');
+        const payload = response.serializeBinary();
+        enqueueResponse(1, unaryStub.method, Status.ABORTED, payload);
+
+        request.setMagicNumber(5);
+
+        const onNext = jasmine.createSpy();
+        const onCompleted = jasmine.createSpy();
+        const onError = jasmine.createSpy();
+        const call = unaryStub.invoke(request, onNext, onCompleted, onError);
+
+        expect(sentPayload(unaryStub.method.requestType).getMagicNumber())
+            .toEqual(5);
+        expect(onNext).toHaveBeenCalledOnceWith(response);
+        expect(onError).not.toHaveBeenCalled();
+        expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
+      }
+    });
+
+    it('nonblocking call cancel', () => {
+      request.setMagicNumber(5);
+
+      let onNext = jasmine.createSpy();
+      const call = unaryStub.invoke(request, onNext, () => {}, () => {});
+
+      expect(requests.length).toBeGreaterThan(0);
+      requests = [];
+
+      expect(call.cancel()).toBeTrue()
+      expect(call.cancel()).toBeFalse()
+      expect(onNext).not.toHaveBeenCalled()
+    });
+
+    it('nonblocking duplicate calls first is cancelled', () => {
+      const firstCall = unaryStub.invoke(request, () => {}, () => {}, () => {});
+      expect(firstCall.completed()).toBeFalse();
+
+      const secondCall =
+          unaryStub.invoke(request, () => {}, () => {}, () => {});
+      expect(firstCall.error).toEqual(Status.CANCELLED);
+      expect(secondCall.completed()).toBeFalse();
+    });
+
+    it('nonblocking exception in callback', () => {
+      const errorCallback = () => {
+        throw Error('Something went wrong!');
+      };
+
+      enqueueResponse(1, unaryStub.method, Status.OK);
+      const call = unaryStub.invoke(request, errorCallback, () => {}, () => {});
+      expect(call.callbackException!.name).toEqual('Error');
+      expect(call.callbackException!.message).toEqual('Something went wrong!');
+    });
+  })
+});
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index a28d4a8..fb8a0ba 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -58,3 +58,37 @@
 
   const channel = client.channel()!;
   const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
+
+Calling an RPC
+==============
+
+Unary RPC
+---------
+Only non blocking calls are currently supported.
+
+.. code-block:: typescript
+
+  unaryStub = client.channel()?.methodStub(
+      'pw.rpc.test1.TheTestService.SomeUnary')!;
+  request = new unaryStub.method.requestType();
+  request.setFooProperty('hello world');
+  const call = unaryStub.invoke(request, (response) => {
+    console.log(response);
+  });
+
+Server Streaming RPC
+--------------------
+Unsupported
+
+Client Streaming RPC
+--------------------
+Unsupported
+
+Bidirectional Stream RPC
+------------------------
+Unsupported
+
+.. attention::
+
+  RPC timeout is currently unsupported on all RPC types.
+
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index 12eb16e..3ea505d 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -14,7 +14,7 @@
 
 import {Message} from 'google-protobuf';
 
-import {Call, Callback} from './call';
+import {Call, Callback, ServerStreamingCall, UnaryCall} from './call';
 import {Channel, Method, MethodType, Service} from './descriptors';
 import {PendingCalls, Rpc} from './rpc_classes';
 
@@ -32,14 +32,12 @@
   }
 }
 
-export abstract class MethodStub {
+export class MethodStub {
   readonly method: Method;
-  private rpcs: PendingCalls;
-  private rpc: Rpc;
+  readonly rpcs: PendingCalls;
+  readonly rpc: Rpc;
   private channel: Channel;
 
-  private callType: typeof Call = Call;
-
   constructor(rpcs: PendingCalls, channel: Channel, method: Method) {
     this.method = method;
     this.rpcs = rpcs;
@@ -47,11 +45,13 @@
     this.rpc = new Rpc(channel, method.service, method)
   }
 
-  abstract invoke(
-      request: Message,
-      onNext: Callback,
-      onCompleted: Callback,
-      onError: Callback): Call;
+  invoke(
+      request?: Message,
+      onNext: Callback = () => {},
+      onCompleted: Callback = () => {},
+      onError: Callback = () => {}): UnaryCall {
+    throw Error('invoke() not implemented');
+  }
 }
 
 class UnaryMethodStub extends MethodStub {
@@ -59,21 +59,24 @@
   // invokeBlocking(request) {...}
 
   invoke(
-      request: Message,
+      request?: Message,
       onNext: Callback = () => {},
       onCompleted: Callback = () => {},
-      onError: Callback = () => {}): Call {
-    throw Error('ServerStreaming invoke() not implemented');
+      onError: Callback = () => {}): UnaryCall {
+    const call =
+        new UnaryCall(this.rpcs, this.rpc, onNext, onCompleted, onError);
+    call.invoke(request!);
+    return call;
   }
 }
 
 class ServerStreamingMethodStub extends MethodStub {
   invoke(
-      request: Message,
+      request?: Message,
       onNext: Callback = () => {},
       onCompleted: Callback = () => {},
-      onError: Callback = () => {}): Call {
-    throw Error('ServerStreaming invoke() not implemented');
+      onError: Callback = () => {}): ServerStreamingCall {
+    throw Error('ClientStreaming invoke() not implemented');
   }
 }