pw_rpc: Channel, Service, and Method Client

Add logic to support easily finding and invoking RPC methods in the
client library. In the future these objects could have class fields for
each service/method to enable greater discoverability.

The MethodStub is used to invoke RPC calls. It is equivalent to the
MethodClient in the python RPC library.

Bug: b/194329554
No-Docs-Update-Reason: Documentation added in child commit: pwrev/c/60921
Change-Id: I6ea3bffd9ba91224852254c164f41546ee2b6a36
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60921
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index ada0bd3..3beca86 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -21,12 +21,12 @@
 ts_library(
     name = "rpc_utils",
     srcs = [
+        "call.ts",
         "descriptors.ts",
         "hash.ts",
+        "method.ts",
         "packets.ts",
-    ],
-    data = [
-        ":test_protos",
+        "rpc_classes.ts",
     ],
     module_name = "pigweed/pw_rpc/ts",
     deps = [
@@ -39,9 +39,15 @@
 
 ts_library(
     name = "rpc",
-    srcs = [],
+    srcs = ["client.ts"],
     module_name = "pigweed/pw_rpc/ts",
-    deps = [":rpc_utils"],
+    deps = [
+        ":rpc_utils",
+        "//pw_protobuf_compiler/ts:proto_lib",
+        "//pw_rpc:packet_proto_tspb",
+        "//pw_status/ts:pw_status",
+        "@npm//@types/google-protobuf",
+    ],
 )
 
 ts_library(
@@ -60,9 +66,28 @@
     ],
 )
 
+ts_library(
+    name = "rpc_test_lib",
+    srcs = [
+        "client_test.ts",
+    ],
+    data = [
+        ":test_protos",
+    ],
+    deps = [
+        ":rpc",
+        ":rpc_utils",  # TODO(jaredweinstein): remove dependency.
+        "//pw_protobuf_compiler/ts:proto_lib",
+        "//pw_rpc:packet_proto_tspb",
+        "@npm//@types/jasmine",
+        "@npm//@types/node",
+    ],
+)
+
 jasmine_node_test(
     name = "rpc_test",
     srcs = [
+        ":rpc_test_lib",
         ":rpc_utils_test_lib",
     ],
 )
@@ -71,6 +96,7 @@
     name = "test_protos",
     srcs = [
         "test.proto",
+        "test2.proto",
     ],
 )
 
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
new file mode 100644
index 0000000..6ab8b90
--- /dev/null
+++ b/pw_rpc/ts/call.ts
@@ -0,0 +1,64 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Status} from 'pigweed/pw_status/ts/status';
+
+import {PendingCalls, Rpc} from './rpc_classes';
+
+export type Callback = (a: any) => any;
+
+/** Represent an in-progress or completed RPC call. */
+export class Call {
+  private rpcs: PendingCalls;
+  private rpc: Rpc;
+
+  private onNext: Callback;
+  private onCompleted: Callback;
+  private onError: Callback;
+
+  constructor(
+      rpcs: PendingCalls,
+      rpc: Rpc,
+      onNext: Callback,
+      onCompleted: Callback,
+      onError: Callback) {
+    this.rpcs = rpcs;
+    this.rpc = rpc;
+
+    this.onNext = onNext;
+    this.onCompleted = onCompleted;
+    this.onError = onError;
+  }
+}
+
+
+/** Tracks the state of a unary RPC call. */
+export class UnaryCall extends Call {
+  // TODO(jaredweinstein): Complete unary invocation logic.
+}
+
+/** Tracks the state of a client streaming RPC call. */
+export class ClientStreamingCall extends Call {
+  // TODO(jaredweinstein): Complete client streaming invocation logic.
+}
+
+/** Tracks the state of a server streaming RPC call. */
+export class ServerStreamingCall extends Call {
+  // TODO(jaredweinstein): Complete server streaming invocation logic.
+}
+
+/** Tracks the state of a bidirectional streaming RPC call. */
+export class BidirectionalStreamingCall extends Call {
+  // TODO(jaredweinstein): Complete bidirectional streaming invocation logic.
+}
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
new file mode 100644
index 0000000..6c34fe0
--- /dev/null
+++ b/pw_rpc/ts/client.ts
@@ -0,0 +1,148 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/** Provides a pw_rpc client for TypeScript. */
+
+import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+
+import {Channel, Service} from './descriptors';
+import {MethodStub, methodStubFactory} from './method';
+import {PendingCalls} from './rpc_classes';
+
+/**
+ * Object for managing RPC service and contained methods.
+ */
+class ServiceClient {
+  private service: Service;
+  private methods: MethodStub[] = [];
+  private methodsByName = new Map<string, MethodStub>();
+
+  constructor(client: Client, channel: Channel, service: Service) {
+    this.service = service;
+    const methods = service.methods;
+    methods.forEach((method) => {
+      const stub = methodStubFactory(client.rpcs, channel, method);
+      this.methods.push(stub);
+      this.methodsByName.set(method.name, stub);
+    });
+  }
+
+  method(methodName: string): MethodStub|undefined {
+    return this.methodsByName.get(methodName);
+  }
+}
+
+/**
+ * Object for managing RPC channel and contained services.
+ */
+class ChannelClient {
+  readonly channel: Channel;
+  private services = new Map<string, ServiceClient>();
+
+  constructor(client: Client, channel: Channel, services: Service[]) {
+    this.channel = channel;
+    services.forEach((service) => {
+      const serviceClient = new ServiceClient(client, this.channel, service);
+      this.services.set(service.name, serviceClient);
+    });
+  }
+
+  private service(serviceName: string): ServiceClient|undefined {
+    return this.services.get(serviceName);
+  }
+
+  /**
+   * Find a method stub via its full name.
+   *
+   * For example:
+   * `method = client.channel().methodStub('the.package.AService.AMethod');`
+   *
+   */
+  methodStub(name: string): MethodStub|undefined {
+    const index = name.lastIndexOf('.');
+    if (index <= 0) {
+      console.error(`Malformed method name: ${name}`);
+      return undefined;
+    }
+    const serviceName = name.slice(0, index);
+    const methodName = name.slice(index + 1);
+    const method = this.service(serviceName)?.method(methodName);
+    if (method === undefined) {
+      console.error(`Method not found: ${name}`);
+      return undefined;
+    }
+    return method;
+  }
+}
+
+/**
+ * RPCs are invoked through a MethodStub. These can be found by name via
+ * methodStub(string name).
+ *
+ * ```
+ * method = client.channel(1).methodStub('the.package.FooService.SomeMethod')
+ * call = method.invoke(request);
+ * ```
+ */
+export class Client {
+  private services = new Map<number, Service>();
+  private channelsById = new Map<number, ChannelClient>();
+  readonly rpcs: PendingCalls;
+
+  constructor(channels: Channel[], services: Service[]) {
+    this.rpcs = new PendingCalls();
+    services.forEach((service) => {
+      this.services.set(service.id, service);
+    });
+
+    channels.forEach((channel) => {
+      this.channelsById.set(
+          channel.id, new ChannelClient(this, channel, services));
+    });
+  }
+
+  /**
+   * Creates a client from a set of Channels and a library of Protos.
+   *
+   * @param {Channel[]} channels List of possible channels to use.
+   * @param {Library} protoSet Library containing protos defining RPC services
+   * and methods.
+   */
+  static fromProtoSet(channels: Channel[], protoSet: Library): Client {
+    let services: Service[] = [];
+    const descriptors = protoSet.fileDescriptorSet.getFileList();
+    descriptors.forEach((fileDescriptor) => {
+      const packageName = fileDescriptor.getPackage()!;
+      fileDescriptor.getServiceList().forEach((serviceDescriptor) => {
+        services = services.concat(
+            new Service(serviceDescriptor, protoSet, packageName));
+      });
+    });
+
+    return new Client(channels, services)
+  }
+
+  /**
+   * Finds the channel with the provided id. Returns undefined if there are no
+   * channels or no channel with a matching id.
+   *
+   * @param {number?} id If no id is specified, returns the first channel.
+   */
+  channel(id?: number): ChannelClient|undefined {
+    if (id === undefined) {
+      return this.channelsById.values().next().value;
+    }
+    return this.channelsById.get(id);
+  }
+}
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
new file mode 100644
index 0000000..80cb97a
--- /dev/null
+++ b/pw_rpc/ts/client_test.ts
@@ -0,0 +1,64 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/* eslint-env browser, jasmine */
+import 'jasmine';
+
+import {RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
+import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+
+import {Client} from './client';
+import {Channel} from './descriptors';
+
+const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
+
+describe('Client', () => {
+  let lib: Library;
+  let client: Client;
+
+  beforeEach(async () => {
+    lib = await Library.fromFileDescriptorSet(
+        TEST_PROTO_PATH, 'test_protos_tspb');
+    const channels = [new Channel(1), new Channel(5)];
+    client = Client.fromProtoSet(channels, lib);
+  });
+
+  it('channel returns undefined for empty list', () => {
+    const channels = Array<Channel>();
+    const emptyChannelClient = Client.fromProtoSet(channels, lib);
+    expect(emptyChannelClient.channel()).toBeUndefined();
+  });
+
+  it('fetches channel or returns undefined', () => {
+    expect(client.channel(1)!.channel.id).toEqual(1);
+    expect(client.channel(5)!.channel.id).toEqual(5);
+    expect(client.channel()!.channel.id).toEqual(1);
+    expect(client.channel(2)).toBeUndefined();
+  });
+
+  it('ChannelClient fetches method by name', () => {
+    const channel = client.channel()!;
+    const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
+    expect(stub.method.name).toEqual('SomeUnary');
+  })
+
+  it('ChannelClient for unknown name returns undefined', () => {
+    const channel = client.channel()!;
+    expect(channel.methodStub('')).toBeUndefined();
+    expect(channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'))
+        .toBeUndefined();
+    expect(channel.methodStub('pw.rpc.test1.TheTestService.Garbage'))
+        .toBeUndefined();
+  })
+})
diff --git a/pw_rpc/ts/descriptors.ts b/pw_rpc/ts/descriptors.ts
index c445787..42389a8 100644
--- a/pw_rpc/ts/descriptors.ts
+++ b/pw_rpc/ts/descriptors.ts
@@ -12,29 +12,58 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
-import {Message} from 'google-protobuf';
 import {MethodDescriptorProto, ServiceDescriptorProto} from 'google-protobuf/google/protobuf/descriptor_pb';
 import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
 
 import {hash} from './hash';
 
+interface ChannelOutput {
+  (data: Uint8Array): void;
+}
+
+export class Channel {
+  readonly id: number;
+  private output: ChannelOutput;
+
+  constructor(id: number, output: ChannelOutput = () => {}) {
+    this.id = id;
+    this.output = output;
+  }
+
+  send(data: Uint8Array) {
+    this.output(data);
+  }
+}
+
 /** Describes an RPC service. */
 export class Service {
   readonly name: string;
   readonly id: number;
-  readonly methods = new Map<string, Method>();
+  readonly methods = new Map<number, Method>();
+  readonly methodsByName = new Map<string, Method>();
 
-  constructor(descriptor: ServiceDescriptorProto, protoLibrary: Library) {
-    this.name = descriptor.getName()!;
+  constructor(
+      descriptor: ServiceDescriptorProto,
+      protoLibrary: Library,
+      packageName: string) {
+    this.name = packageName + '.' + descriptor.getName()!;
     this.id = hash(this.name);
     descriptor.getMethodList().forEach(
         (methodDescriptor: MethodDescriptorProto) => {
           const method = new Method(methodDescriptor, protoLibrary, this);
-          this.methods.set(method.name, method);
+          this.methods.set(method.id, method);
+          this.methodsByName.set(method.name, method);
         });
   }
 }
 
+export enum MethodType {
+  UNARY,
+  SERVER_STREAMING,
+  CLIENT_STREAMING,
+  BIDIRECTIONAL_STREAMING
+}
+
 /** Describes an RPC method. */
 export class Method {
   readonly service: Service;
@@ -42,8 +71,8 @@
   readonly id: number;
   readonly clientStreaming: boolean;
   readonly serverStreaming: boolean;
-  readonly inputType: any;
-  readonly outputType: any;
+  readonly requestType: any;
+  readonly responseType: any;
 
   constructor(
       descriptor: MethodDescriptorProto,
@@ -55,13 +84,26 @@
     this.serverStreaming = descriptor.getServerStreaming()!;
     this.clientStreaming = descriptor.getClientStreaming()!;
 
-    const inputTypePath = descriptor.getInputType()!;
-    const outputTypePath = descriptor.getOutputType()!;
+    const requestTypePath = descriptor.getInputType()!;
+    const responseTypePath = descriptor.getOutputType()!;
 
     // Remove leading period if it exists.
-    this.inputType =
-        protoLibrary.getMessageCreator(inputTypePath.replace(/^\./, ''))!;
-    this.outputType =
-        protoLibrary.getMessageCreator(outputTypePath.replace(/^\./, ''))!;
+    this.requestType =
+        protoLibrary.getMessageCreator(requestTypePath.replace(/^\./, ''))!;
+    this.responseType =
+        protoLibrary.getMessageCreator(responseTypePath.replace(/^\./, ''))!;
+  }
+
+  get type(): MethodType {
+    if (this.clientStreaming && this.serverStreaming) {
+      return MethodType.BIDIRECTIONAL_STREAMING;
+    } else if (this.clientStreaming && !this.serverStreaming) {
+      return MethodType.CLIENT_STREAMING;
+    } else if (!this.clientStreaming && this.serverStreaming) {
+      return MethodType.SERVER_STREAMING;
+    } else if (!this.clientStreaming && !this.serverStreaming) {
+      return MethodType.UNARY;
+    }
+    throw Error('Unhandled streaming condition');
   }
 }
diff --git a/pw_rpc/ts/descriptors_test.ts b/pw_rpc/ts/descriptors_test.ts
index e2c95af..b52b3c5 100644
--- a/pw_rpc/ts/descriptors_test.ts
+++ b/pw_rpc/ts/descriptors_test.ts
@@ -26,26 +26,27 @@
   it('parses from ServiceDescriptor binary', async () => {
     const lib = await Library.fromFileDescriptorSet(
         TEST_PROTO_PATH, 'test_protos_tspb');
-    const sd = lib.fileDescriptorSet.getFileList()[0].getServiceList()[0];
-    const service = new descriptors.Service(sd, lib);
+    const fd = lib.fileDescriptorSet.getFileList()[0];
+    const sd = fd.getServiceList()[0];
+    const service = new descriptors.Service(sd, lib, fd.getPackage()!);
 
-    expect(service.name).toEqual('TheTestService')
+    expect(service.name).toEqual('pw.rpc.test1.TheTestService')
     expect(service.methods.size).toEqual(4);
 
-    const unaryMethod = service.methods.get('SomeUnary')!;
+    const unaryMethod = service.methodsByName.get('SomeUnary')!;
     expect(unaryMethod.name).toEqual('SomeUnary');
     expect(unaryMethod.clientStreaming).toBeFalse();
     expect(unaryMethod.serverStreaming).toBeFalse();
     expect(unaryMethod.service).toEqual(service);
-    expect(unaryMethod.inputType).toEqual(SomeMessage);
-    expect(unaryMethod.outputType).toEqual(AnotherMessage);
+    expect(unaryMethod.requestType).toEqual(SomeMessage);
+    expect(unaryMethod.responseType).toEqual(AnotherMessage);
 
-    const someBidiStreaming = service.methods.get('SomeBidiStreaming')!;
+    const someBidiStreaming = service.methodsByName.get('SomeBidiStreaming')!;
     expect(someBidiStreaming.name).toEqual('SomeBidiStreaming');
     expect(someBidiStreaming.clientStreaming).toBeTrue();
     expect(someBidiStreaming.serverStreaming).toBeTrue();
     expect(someBidiStreaming.service).toEqual(service);
-    expect(someBidiStreaming.inputType).toEqual(SomeMessage);
-    expect(someBidiStreaming.outputType).toEqual(AnotherMessage);
+    expect(someBidiStreaming.requestType).toEqual(SomeMessage);
+    expect(someBidiStreaming.responseType).toEqual(AnotherMessage);
   });
 })
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
new file mode 100644
index 0000000..12eb16e
--- /dev/null
+++ b/pw_rpc/ts/method.ts
@@ -0,0 +1,98 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Message} from 'google-protobuf';
+
+import {Call, Callback} from './call';
+import {Channel, Method, MethodType, Service} from './descriptors';
+import {PendingCalls, Rpc} from './rpc_classes';
+
+export function methodStubFactory(
+    rpcs: PendingCalls, channel: Channel, method: Method): MethodStub {
+  switch (method.type) {
+    case MethodType.BIDIRECTIONAL_STREAMING:
+      return new BidirectionStreamingMethodStub(rpcs, channel, method);
+    case MethodType.CLIENT_STREAMING:
+      return new ClientStreamingMethodStub(rpcs, channel, method);
+    case MethodType.SERVER_STREAMING:
+      return new ServerStreamingMethodStub(rpcs, channel, method);
+    case MethodType.UNARY:
+      return new UnaryMethodStub(rpcs, channel, method);
+  }
+}
+
+export abstract class MethodStub {
+  readonly method: Method;
+  private rpcs: PendingCalls;
+  private rpc: Rpc;
+  private channel: Channel;
+
+  private callType: typeof Call = Call;
+
+  constructor(rpcs: PendingCalls, channel: Channel, method: Method) {
+    this.method = method;
+    this.rpcs = rpcs;
+    this.channel = channel;
+    this.rpc = new Rpc(channel, method.service, method)
+  }
+
+  abstract invoke(
+      request: Message,
+      onNext: Callback,
+      onCompleted: Callback,
+      onError: Callback): Call;
+}
+
+class UnaryMethodStub extends MethodStub {
+  // TODO(jaredweinstein): Add blocking invocation.
+  // invokeBlocking(request) {...}
+
+  invoke(
+      request: Message,
+      onNext: Callback = () => {},
+      onCompleted: Callback = () => {},
+      onError: Callback = () => {}): Call {
+    throw Error('ServerStreaming invoke() not implemented');
+  }
+}
+
+class ServerStreamingMethodStub extends MethodStub {
+  invoke(
+      request: Message,
+      onNext: Callback = () => {},
+      onCompleted: Callback = () => {},
+      onError: Callback = () => {}): Call {
+    throw Error('ServerStreaming invoke() not implemented');
+  }
+}
+
+class ClientStreamingMethodStub extends MethodStub {
+  invoke(
+      request: Message,
+      onNext: Callback,
+      onCompleted: Callback,
+      onError: Callback): Call {
+    throw Error('ClientStreaming invoke() not implemented');
+  }
+}
+
+class BidirectionStreamingMethodStub extends MethodStub {
+  invoke(
+      request: Message,
+      onNext: Callback,
+      onCompleted: Callback,
+      onError: Callback): Call {
+    throw Error('BidirectionalStreaming invoke() not implemented');
+  }
+}
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
new file mode 100644
index 0000000..2548ad9
--- /dev/null
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -0,0 +1,130 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Message} from 'google-protobuf';
+import {Status} from 'pigweed/pw_status/ts/status';
+
+import {Call} from './call';
+import {Channel, Method, Service} from './descriptors';
+import * as packets from './packets';
+
+/** Data class for a pending RPC call. */
+export class Rpc {
+  channel: Channel;
+  service: Service;
+  method: Method;
+
+  constructor(channel: Channel, service: Service, method: Method) {
+    this.channel = channel;
+    this.service = service;
+    this.method = method;
+  }
+
+  /** Returns channel service method id tuple */
+  get idSet(): [number, number, number] {
+    return [this.channel.id, this.service.id, this.method.id];
+  }
+
+  /**
+   * Returns a string sequence to uniquely identify channel, service, and
+   * method. This can be used to hash the Rpc.
+   *
+   * For example: "12346789.23452345.12341234"
+   */
+  get idString(): string {
+    return `${this.channel.id}.${this.service.id}.${this.method.id}`;
+  }
+
+  toString(): string {
+    return `RPC ${this.service.name}.${this.method.name} on channel ` +
+        `${this.channel.id})`;
+  }
+}
+
+/** Tracks pending RPCs and encodes outgoing RPC packets. */
+export class PendingCalls {
+  pending: Map<string, Call> = new Map;
+
+  /** Starts the provided RPC and returns the encoded packet to send. */
+  request(rpc: Rpc, request: Message, call: Call): Uint8Array {
+    this.open(rpc, call)
+    console.log(`Starting ${rpc}`);
+    return packets.encodeRequest(rpc.idSet, request);
+  }
+
+  /** Calls request and sends the resulting packet to the channel. */
+  sendRequest(rpc: Rpc, call: Call, request?: Message): Call|undefined {
+    const previous = this.open(rpc, call);
+    rpc.channel.send(packets.encodeRequest(rpc.idSet, request));
+    return previous;
+  }
+
+  /**
+   * Creates a call for an RPC, but does not invoke it.
+   *
+   * open() can be used to receive streaming responses to an RPC that was not
+   * invoked by this client. For example, a server may stream logs with a
+   * server streaming RPC prior to any clients invoking it.
+   */
+  open(rpc: Rpc, call: Call): Call|undefined {
+    console.debug('Starting %s', rpc);
+    const previous = this.pending.get(rpc.idString);
+    this.pending.set(rpc.idString, call)
+    return previous;
+  }
+
+  sendClientStream() {
+    throw new Error('Method not implemented.');
+  }
+
+  sendClientStreamEnd() {
+    throw new Error('Method not implemented.');
+  }
+
+  /** Cancels the RPC. Returns the CANCEL packet to send. */
+  cancel(rpc: Rpc): Uint8Array|undefined {
+    console.debug(`Cancelling ${rpc}`);
+    this.pending.delete(rpc.idString);
+    if (rpc.method.clientStreaming && rpc.method.serverStreaming) {
+      return undefined;
+    }
+    return packets.encodeCancel(rpc.idSet);
+  }
+
+  /** Calls cancel and sends the cancel packet, if any, to the channel. */
+  sendCancel(rpc: Rpc): boolean {
+    let packet: Uint8Array|undefined;
+    try {
+      packet = this.cancel(rpc);
+    } catch (err) {
+      return false;
+    }
+
+    if (packet !== undefined) {
+      rpc.channel.send(packet);
+    }
+    return true;
+  }
+
+  /** Gets the pending RPC's call. If status is set, clears the RPC. */
+  getPending(rpc: Rpc, status?: Status): Call|undefined {
+    if (status === undefined) {
+      return this.pending.get(rpc.idString);
+    }
+
+    const call = this.pending.get(rpc.idString);
+    this.pending.delete(rpc.idString);
+    return call;
+  }
+}
diff --git a/pw_rpc/ts/test.proto b/pw_rpc/ts/test.proto
index cbd844b..997c7e9 100644
--- a/pw_rpc/ts/test.proto
+++ b/pw_rpc/ts/test.proto
@@ -1,4 +1,4 @@
-// Copyright 2020 The Pigweed Authors
+// Copyright 2021 The Pigweed Authors
 //
 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
 // use this file except in compliance with the License. You may obtain a copy of
diff --git a/pw_rpc/ts/test2.proto b/pw_rpc/ts/test2.proto
new file mode 100644
index 0000000..1de8e87
--- /dev/null
+++ b/pw_rpc/ts/test2.proto
@@ -0,0 +1,30 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+syntax = "proto2";
+
+package pw.test2;
+
+message Request {
+  optional float magic_number = 1;
+}
+
+message Response {}
+
+service Alpha {
+  rpc Unary(Request) returns (Response) {}
+}
+
+service Bravo {
+  rpc BidiStreaming(stream Request) returns (stream Response) {}
+}