pw_rpc: Process packet logic

Adds the primary logic for handling a packet including supporting
call logic.

Tested with `bazel test //pw_rpc/ts:rpc_test`

Bug: b/194329554
Change-Id: Id931629e778e7059803bb546c83bb99afe07b3cb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60923
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index 3beca86..a14688e 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -77,8 +77,10 @@
     deps = [
         ":rpc",
         ":rpc_utils",  # TODO(jaredweinstein): remove dependency.
+        ":test_protos_tspb",
         "//pw_protobuf_compiler/ts:proto_lib",
         "//pw_rpc:packet_proto_tspb",
+        "//pw_status/ts:pw_status",
         "@npm//@types/jasmine",
         "@npm//@types/node",
     ],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 6ab8b90..9f4dabb 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -12,6 +12,7 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
+import {Message} from 'google-protobuf';
 import {Status} from 'pigweed/pw_status/ts/status';
 
 import {PendingCalls, Rpc} from './rpc_classes';
@@ -27,6 +28,13 @@
   private onCompleted: Callback;
   private onError: Callback;
 
+  // TODO(jaredweinstein): support async timeout.
+  // private timeout: number;
+  private status?: Status;
+  error?: Status;
+  callbackException?: Error;
+
+
   constructor(
       rpcs: PendingCalls,
       rpc: Rpc,
@@ -40,6 +48,54 @@
     this.onCompleted = onCompleted;
     this.onError = onError;
   }
+
+  /* Calls the RPC. This must be called immediately after construction. */
+  invoke(request?: Message): void {
+    const previous = this.rpcs.sendRequest(this.rpc, this, request);
+
+    if (previous !== undefined && !previous.completed()) {
+      previous.handleError(Status.CANCELLED)
+    }
+  }
+
+  private invokeCallback(f: any) {
+    try {
+      f();
+    } catch (err) {
+      console.error(
+          `An exception was raised while invoking a callback: ${err}`);
+      this.callbackException = err
+    }
+  }
+
+  completed(): boolean {
+    return (this.status !== undefined || this.error !== undefined);
+  }
+
+  handleResponse(response: Message): void {
+    const callback = () => this.onNext(response);
+    this.invokeCallback(callback)
+  }
+
+  handleCompletion(status: Status) {
+    const callback = () => this.onCompleted(status);
+    this.invokeCallback(callback)
+  }
+
+  cancel(): boolean {
+    if (this.completed()) {
+      return false;
+    }
+
+    this.error = Status.CANCELLED
+    return this.rpcs.sendCancel(this.rpc);
+  }
+
+  handleError(error: Status): void {
+    this.error = error
+    const callback = () => this.onError(2);
+    this.invokeCallback(callback);
+  }
 }
 
 
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
index 6c34fe0..3dd47ed 100644
--- a/pw_rpc/ts/client.ts
+++ b/pw_rpc/ts/client.ts
@@ -14,11 +14,16 @@
 
 /** Provides a pw_rpc client for TypeScript. */
 
+import {Message} from 'google-protobuf';
+import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
 import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Status} from 'pigweed/pw_status/ts/status';
 
 import {Channel, Service} from './descriptors';
 import {MethodStub, methodStubFactory} from './method';
-import {PendingCalls} from './rpc_classes';
+import * as packets from './packets';
+import {PendingCalls, Rpc} from './rpc_classes';
+
 
 /**
  * Object for managing RPC service and contained methods.
@@ -96,9 +101,9 @@
  * ```
  */
 export class Client {
-  private services = new Map<number, Service>();
   private channelsById = new Map<number, ChannelClient>();
   readonly rpcs: PendingCalls;
+  readonly services = new Map<number, Service>();
 
   constructor(channels: Channel[], services: Service[]) {
     this.rpcs = new PendingCalls();
@@ -145,4 +150,133 @@
     }
     return this.channelsById.get(id);
   }
+
+  /**
+   * Creates a new RPC object holding channel, method, and service info.
+   * Returns undefined if the service or method does not exist.
+   */
+  private rpc(packet: RpcPacket, channelClient: ChannelClient): Rpc|undefined {
+    const service = this.services.get(packet.getServiceId());
+    if (service == undefined) {
+      return undefined;
+    }
+    const method = service.methods.get(packet.getMethodId());
+    if (method == undefined) {
+      return undefined;
+    }
+    return new Rpc(channelClient.channel, service, method);
+  }
+
+  private decodeStatus(rpc: Rpc, packet: RpcPacket): Status|undefined {
+    if (packet.getType() === PacketType.SERVER_STREAM) {
+      return;
+    }
+    return packet.getStatus();
+  }
+
+  private decodePayload(rpc: Rpc, packet: RpcPacket): Message|undefined {
+    if (packet.getType() === PacketType.SERVER_ERROR) {
+      return undefined;
+    }
+
+    if (packet.getType() === PacketType.RESPONSE &&
+        rpc.method.serverStreaming) {
+      return undefined;
+    }
+
+    const payload = packet.getPayload_asU8();
+    return packets.decodePayload(payload, rpc.method.responseType);
+  }
+
+  private sendClientError(
+      client: ChannelClient, packet: RpcPacket, error: Status) {
+    client.channel.send(packets.encodeClientError(packet, error));
+  }
+
+  /**
+   * Processes an incoming packet.
+   *
+   * @param {Uint8Array} rawPacketData binary data for a pw_rpc packet.
+   * @return {Status} The status of processing the packet.
+   *    - OK: the packet was processed by the client
+   *    - DATA_LOSS: the packet could not be decoded
+   *    - INVALID_ARGUMENT: the packet is for a server, not a client
+   *    - NOT_FOUND: the packet's channel ID is not known to this client
+   */
+  processPacket(rawPacketData: Uint8Array): Status {
+    let packet;
+    try {
+      packet = packets.decode(rawPacketData)
+    } catch (err) {
+      console.warn(`Failed to decode packet: ${err}`);
+      console.debug(`Raw packet: ${rawPacketData}`);
+      return Status.DATA_LOSS;
+    }
+
+    if (packets.forServer(packet)) {
+      return Status.INVALID_ARGUMENT;
+    }
+
+    const channelClient = this.channelsById.get(packet.getChannelId())
+    if (channelClient == undefined) {
+      console.warn(`Unrecognized channel ID: ${packet.getChannelId()}`)
+      return Status.NOT_FOUND;
+    }
+
+    const rpc = this.rpc(packet, channelClient)
+    if (rpc == undefined) {
+      this.sendClientError(channelClient, packet, Status.NOT_FOUND);
+      console.warn('rpc service/method not found');
+      return Status.OK;
+    }
+
+    if (packet.getType() !== PacketType.RESPONSE &&
+        packet.getType() !== PacketType.SERVER_STREAM &&
+        packet.getType() !== PacketType.SERVER_ERROR) {
+      console.error(`${rpc}: Unexpected packet type ${packet.getType()}`)
+      console.debug(`Packet: ${packet}`);
+      return Status.OK;
+    }
+
+    let status = this.decodeStatus(rpc, packet);
+    let payload;
+    try {
+      payload = this.decodePayload(rpc, packet);
+    } catch (error) {
+      this.sendClientError(channelClient, packet, Status.DATA_LOSS);
+      console.warn(`Failed to decode response: ${error}`);
+      console.debug(`Raw payload: ${packet.getPayload()}`);
+
+      // Make this an error packet so the error handler is called.
+      packet.setType(PacketType.SERVER_ERROR);
+      status = Status.DATA_LOSS
+    }
+
+    let call = this.rpcs.getPending(rpc, status);
+    if (call === undefined) {
+      this.sendClientError(channelClient, packet, Status.FAILED_PRECONDITION);
+      console.debug(`Discarding response for ${rpc}, which is not pending`);
+      return Status.OK;
+    }
+
+    if (packet.getType() === PacketType.SERVER_ERROR) {
+      if (status === Status.OK) {
+        throw 'Unexpected OK status on SERVER_ERROR';
+      }
+      if (status === undefined) {
+        throw 'Missing status on SERVER_ERROR';
+      }
+      console.warn(`${rpc}: invocation failed with Status: ${Status[status]}`);
+      call.handleError(status);
+      return Status.OK;
+    }
+
+    if (payload !== undefined) {
+      call.handleResponse(payload);
+    }
+    if (status !== undefined) {
+      call.handleCompletion(status);
+    }
+    return Status.OK;
+  }
 }
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 80cb97a..ab9c510 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -15,25 +15,33 @@
 /* eslint-env browser, jasmine */
 import 'jasmine';
 
-import {RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
+import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
 import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Status} from 'pigweed/pw_status/ts/status';
+import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
 
 import {Client} from './client';
 import {Channel} from './descriptors';
+import * as packets from './packets';
 
 const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
 
 describe('Client', () => {
   let lib: Library;
   let client: Client;
+  let lastPacketSent: RpcPacket;
 
   beforeEach(async () => {
     lib = await Library.fromFileDescriptorSet(
         TEST_PROTO_PATH, 'test_protos_tspb');
-    const channels = [new Channel(1), new Channel(5)];
+    const channels = [new Channel(1, savePacket), new Channel(5)];
     client = Client.fromProtoSet(channels, lib);
   });
 
+  function savePacket(packetBytes: Uint8Array): void {
+    lastPacketSent = RpcPacket.deserializeBinary(packetBytes);
+  }
+
   it('channel returns undefined for empty list', () => {
     const channels = Array<Channel>();
     const emptyChannelClient = Client.fromProtoSet(channels, lib);
@@ -61,4 +69,64 @@
     expect(channel.methodStub('pw.rpc.test1.TheTestService.Garbage'))
         .toBeUndefined();
   })
+
+  it('processPacket with invalid proto data', () => {
+    const textEncoder = new TextEncoder();
+    const data = textEncoder.encode('NOT a packet!');
+    expect(client.processPacket(data)).toEqual(Status.DATA_LOSS)
+  });
+
+  it('processPacket not for client', () => {
+    const packet = new RpcPacket();
+    packet.setType(PacketType.REQUEST);
+    const processStatus = client.processPacket(packet.serializeBinary());
+    expect(processStatus).toEqual(Status.INVALID_ARGUMENT);
+  });
+
+  it('processPacket for unrecognized channel', () => {
+    const packet = packets.encodeResponse([123, 456, 789], new Request());
+    expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND);
+  });
+
+  it('processPacket for unrecognized service', () => {
+    const packet = packets.encodeResponse([1, 456, 789], new Request());
+    const status = client.processPacket(packet);
+    expect(client.processPacket(packet)).toEqual(Status.OK);
+
+    expect(lastPacketSent.getChannelId()).toEqual(1);
+    expect(lastPacketSent.getServiceId()).toEqual(456);
+    expect(lastPacketSent.getMethodId()).toEqual(789);
+    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+    expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
+  });
+
+  it('processPacket for unrecognized method', () => {
+    const service = client.services.values().next().value;
+
+    const packet = packets.encodeResponse([1, service.id, 789], new Request());
+    const status = client.processPacket(packet);
+    expect(client.processPacket(packet)).toEqual(Status.OK);
+
+    expect(lastPacketSent.getChannelId()).toEqual(1);
+    expect(lastPacketSent.getServiceId()).toEqual(service.id);
+    expect(lastPacketSent.getMethodId()).toEqual(789);
+    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+    expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
+  });
+
+  it('processPacket for non-pending method', () => {
+    const service = client.services.values().next().value;
+    const method = service.methods.values().next().value;
+
+    const packet =
+        packets.encodeResponse([1, service.id, method.id], new Request());
+    const status = client.processPacket(packet);
+    expect(client.processPacket(packet)).toEqual(Status.OK);
+
+    expect(lastPacketSent.getChannelId()).toEqual(1);
+    expect(lastPacketSent.getServiceId()).toEqual(service.id);
+    expect(lastPacketSent.getMethodId()).toEqual(method.id);
+    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+    expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
+  });
 })
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index 95db78f..a28d4a8 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -8,3 +8,53 @@
 handling RPCs.
 
 This package is currently a work in progress.
+
+Creating an RPC Client
+======================
+The RPC client is instantiated from a list of channels and a set of protos.
+
+.. code-block:: typescript
+
+  const testProtoPath = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
+  const lib = await Library.fromFileDescriptorSet(
+    testProtoPath, 'test_protos_tspb');
+  const channels = [new Channel(1, savePacket), new Channel(5)];
+  const client = Client.fromProtoSet(channels, lib);
+
+  function savePacket(packetBytes: Uint8Array): void {
+    const packet = RpcPacket.deserializeBinary(packetBytes);
+    ...
+  }
+
+The proto library must match the proto build rules. The first argument
+corresponds with the location of the ``proto_library`` build rule that generates
+a descriptor set for all src protos. The second argument corresponds with the
+name of the ``js_proto_library`` build rule that generates javascript based on
+the descriptor set. For instance, the previous example corresponds with the
+following build file: ``pw_rpc/ts/BUILD.bazel``.
+
+.. code-block::
+
+  proto_library(
+      name = "test_protos",
+      srcs = [
+          "test.proto",
+          "test2.proto",
+      ],
+  )
+
+  js_proto_library(
+      name = "test_protos_tspb",
+      protos = [":test_protos"],
+  )
+
+Finding an RPC Method
+=====================
+Once the client is instantiated with the correct proto library, the target RPC
+method is found by searching based on the full name:
+``{packageName}.{serviceName}.{methodName}``
+
+.. code-block:: typescript
+
+  const channel = client.channel()!;
+  const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;