pw_rpc: Channel, Service, and Method Client
Add logic to support easily finding and invoking RPC methods in the
client library. In the future these objects could have class fields for
each service/method to enable greater discoverability.
The MethodStub is used to invoke RPC calls. It is equivalent to the
MethodClient in the python RPC library.
Bug: b/194329554
No-Docs-Update-Reason: Documentation added in child commit: pwrev/c/60921
Change-Id: I6ea3bffd9ba91224852254c164f41546ee2b6a36
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60921
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index ada0bd3..3beca86 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -21,12 +21,12 @@
ts_library(
name = "rpc_utils",
srcs = [
+ "call.ts",
"descriptors.ts",
"hash.ts",
+ "method.ts",
"packets.ts",
- ],
- data = [
- ":test_protos",
+ "rpc_classes.ts",
],
module_name = "pigweed/pw_rpc/ts",
deps = [
@@ -39,9 +39,15 @@
ts_library(
name = "rpc",
- srcs = [],
+ srcs = ["client.ts"],
module_name = "pigweed/pw_rpc/ts",
- deps = [":rpc_utils"],
+ deps = [
+ ":rpc_utils",
+ "//pw_protobuf_compiler/ts:proto_lib",
+ "//pw_rpc:packet_proto_tspb",
+ "//pw_status/ts:pw_status",
+ "@npm//@types/google-protobuf",
+ ],
)
ts_library(
@@ -60,9 +66,28 @@
],
)
+ts_library(
+ name = "rpc_test_lib",
+ srcs = [
+ "client_test.ts",
+ ],
+ data = [
+ ":test_protos",
+ ],
+ deps = [
+ ":rpc",
+ ":rpc_utils", # TODO(jaredweinstein): remove dependency.
+ "//pw_protobuf_compiler/ts:proto_lib",
+ "//pw_rpc:packet_proto_tspb",
+ "@npm//@types/jasmine",
+ "@npm//@types/node",
+ ],
+)
+
jasmine_node_test(
name = "rpc_test",
srcs = [
+ ":rpc_test_lib",
":rpc_utils_test_lib",
],
)
@@ -71,6 +96,7 @@
name = "test_protos",
srcs = [
"test.proto",
+ "test2.proto",
],
)
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
new file mode 100644
index 0000000..6ab8b90
--- /dev/null
+++ b/pw_rpc/ts/call.ts
@@ -0,0 +1,64 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Status} from 'pigweed/pw_status/ts/status';
+
+import {PendingCalls, Rpc} from './rpc_classes';
+
+export type Callback = (a: any) => any;
+
+/** Represent an in-progress or completed RPC call. */
+export class Call {
+ private rpcs: PendingCalls;
+ private rpc: Rpc;
+
+ private onNext: Callback;
+ private onCompleted: Callback;
+ private onError: Callback;
+
+ constructor(
+ rpcs: PendingCalls,
+ rpc: Rpc,
+ onNext: Callback,
+ onCompleted: Callback,
+ onError: Callback) {
+ this.rpcs = rpcs;
+ this.rpc = rpc;
+
+ this.onNext = onNext;
+ this.onCompleted = onCompleted;
+ this.onError = onError;
+ }
+}
+
+
+/** Tracks the state of a unary RPC call. */
+export class UnaryCall extends Call {
+ // TODO(jaredweinstein): Complete unary invocation logic.
+}
+
+/** Tracks the state of a client streaming RPC call. */
+export class ClientStreamingCall extends Call {
+ // TODO(jaredweinstein): Complete client streaming invocation logic.
+}
+
+/** Tracks the state of a server streaming RPC call. */
+export class ServerStreamingCall extends Call {
+ // TODO(jaredweinstein): Complete server streaming invocation logic.
+}
+
+/** Tracks the state of a bidirectional streaming RPC call. */
+export class BidirectionalStreamingCall extends Call {
+ // TODO(jaredweinstein): Complete bidirectional streaming invocation logic.
+}
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
new file mode 100644
index 0000000..6c34fe0
--- /dev/null
+++ b/pw_rpc/ts/client.ts
@@ -0,0 +1,148 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/** Provides a pw_rpc client for TypeScript. */
+
+import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+
+import {Channel, Service} from './descriptors';
+import {MethodStub, methodStubFactory} from './method';
+import {PendingCalls} from './rpc_classes';
+
+/**
+ * Object for managing RPC service and contained methods.
+ */
+class ServiceClient {
+ private service: Service;
+ private methods: MethodStub[] = [];
+ private methodsByName = new Map<string, MethodStub>();
+
+ constructor(client: Client, channel: Channel, service: Service) {
+ this.service = service;
+ const methods = service.methods;
+ methods.forEach((method) => {
+ const stub = methodStubFactory(client.rpcs, channel, method);
+ this.methods.push(stub);
+ this.methodsByName.set(method.name, stub);
+ });
+ }
+
+ method(methodName: string): MethodStub|undefined {
+ return this.methodsByName.get(methodName);
+ }
+}
+
+/**
+ * Object for managing RPC channel and contained services.
+ */
+class ChannelClient {
+ readonly channel: Channel;
+ private services = new Map<string, ServiceClient>();
+
+ constructor(client: Client, channel: Channel, services: Service[]) {
+ this.channel = channel;
+ services.forEach((service) => {
+ const serviceClient = new ServiceClient(client, this.channel, service);
+ this.services.set(service.name, serviceClient);
+ });
+ }
+
+ private service(serviceName: string): ServiceClient|undefined {
+ return this.services.get(serviceName);
+ }
+
+ /**
+ * Find a method stub via its full name.
+ *
+ * For example:
+ * `method = client.channel().methodStub('the.package.AService.AMethod');`
+ *
+ */
+ methodStub(name: string): MethodStub|undefined {
+ const index = name.lastIndexOf('.');
+ if (index <= 0) {
+ console.error(`Malformed method name: ${name}`);
+ return undefined;
+ }
+ const serviceName = name.slice(0, index);
+ const methodName = name.slice(index + 1);
+ const method = this.service(serviceName)?.method(methodName);
+ if (method === undefined) {
+ console.error(`Method not found: ${name}`);
+ return undefined;
+ }
+ return method;
+ }
+}
+
+/**
+ * RPCs are invoked through a MethodStub. These can be found by name via
+ * methodStub(string name).
+ *
+ * ```
+ * method = client.channel(1).methodStub('the.package.FooService.SomeMethod')
+ * call = method.invoke(request);
+ * ```
+ */
+export class Client {
+ private services = new Map<number, Service>();
+ private channelsById = new Map<number, ChannelClient>();
+ readonly rpcs: PendingCalls;
+
+ constructor(channels: Channel[], services: Service[]) {
+ this.rpcs = new PendingCalls();
+ services.forEach((service) => {
+ this.services.set(service.id, service);
+ });
+
+ channels.forEach((channel) => {
+ this.channelsById.set(
+ channel.id, new ChannelClient(this, channel, services));
+ });
+ }
+
+ /**
+ * Creates a client from a set of Channels and a library of Protos.
+ *
+ * @param {Channel[]} channels List of possible channels to use.
+ * @param {Library} protoSet Library containing protos defining RPC services
+ * and methods.
+ */
+ static fromProtoSet(channels: Channel[], protoSet: Library): Client {
+ let services: Service[] = [];
+ const descriptors = protoSet.fileDescriptorSet.getFileList();
+ descriptors.forEach((fileDescriptor) => {
+ const packageName = fileDescriptor.getPackage()!;
+ fileDescriptor.getServiceList().forEach((serviceDescriptor) => {
+ services = services.concat(
+ new Service(serviceDescriptor, protoSet, packageName));
+ });
+ });
+
+ return new Client(channels, services)
+ }
+
+ /**
+ * Finds the channel with the provided id. Returns undefined if there are no
+ * channels or no channel with a matching id.
+ *
+ * @param {number?} id If no id is specified, returns the first channel.
+ */
+ channel(id?: number): ChannelClient|undefined {
+ if (id === undefined) {
+ return this.channelsById.values().next().value;
+ }
+ return this.channelsById.get(id);
+ }
+}
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
new file mode 100644
index 0000000..80cb97a
--- /dev/null
+++ b/pw_rpc/ts/client_test.ts
@@ -0,0 +1,64 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/* eslint-env browser, jasmine */
+import 'jasmine';
+
+import {RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
+import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+
+import {Client} from './client';
+import {Channel} from './descriptors';
+
+const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
+
+describe('Client', () => {
+ let lib: Library;
+ let client: Client;
+
+ beforeEach(async () => {
+ lib = await Library.fromFileDescriptorSet(
+ TEST_PROTO_PATH, 'test_protos_tspb');
+ const channels = [new Channel(1), new Channel(5)];
+ client = Client.fromProtoSet(channels, lib);
+ });
+
+ it('channel returns undefined for empty list', () => {
+ const channels = Array<Channel>();
+ const emptyChannelClient = Client.fromProtoSet(channels, lib);
+ expect(emptyChannelClient.channel()).toBeUndefined();
+ });
+
+ it('fetches channel or returns undefined', () => {
+ expect(client.channel(1)!.channel.id).toEqual(1);
+ expect(client.channel(5)!.channel.id).toEqual(5);
+ expect(client.channel()!.channel.id).toEqual(1);
+ expect(client.channel(2)).toBeUndefined();
+ });
+
+ it('ChannelClient fetches method by name', () => {
+ const channel = client.channel()!;
+ const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
+ expect(stub.method.name).toEqual('SomeUnary');
+ })
+
+ it('ChannelClient for unknown name returns undefined', () => {
+ const channel = client.channel()!;
+ expect(channel.methodStub('')).toBeUndefined();
+ expect(channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'))
+ .toBeUndefined();
+ expect(channel.methodStub('pw.rpc.test1.TheTestService.Garbage'))
+ .toBeUndefined();
+ })
+})
diff --git a/pw_rpc/ts/descriptors.ts b/pw_rpc/ts/descriptors.ts
index c445787..42389a8 100644
--- a/pw_rpc/ts/descriptors.ts
+++ b/pw_rpc/ts/descriptors.ts
@@ -12,29 +12,58 @@
// License for the specific language governing permissions and limitations under
// the License.
-import {Message} from 'google-protobuf';
import {MethodDescriptorProto, ServiceDescriptorProto} from 'google-protobuf/google/protobuf/descriptor_pb';
import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
import {hash} from './hash';
+interface ChannelOutput {
+ (data: Uint8Array): void;
+}
+
+export class Channel {
+ readonly id: number;
+ private output: ChannelOutput;
+
+ constructor(id: number, output: ChannelOutput = () => {}) {
+ this.id = id;
+ this.output = output;
+ }
+
+ send(data: Uint8Array) {
+ this.output(data);
+ }
+}
+
/** Describes an RPC service. */
export class Service {
readonly name: string;
readonly id: number;
- readonly methods = new Map<string, Method>();
+ readonly methods = new Map<number, Method>();
+ readonly methodsByName = new Map<string, Method>();
- constructor(descriptor: ServiceDescriptorProto, protoLibrary: Library) {
- this.name = descriptor.getName()!;
+ constructor(
+ descriptor: ServiceDescriptorProto,
+ protoLibrary: Library,
+ packageName: string) {
+ this.name = packageName + '.' + descriptor.getName()!;
this.id = hash(this.name);
descriptor.getMethodList().forEach(
(methodDescriptor: MethodDescriptorProto) => {
const method = new Method(methodDescriptor, protoLibrary, this);
- this.methods.set(method.name, method);
+ this.methods.set(method.id, method);
+ this.methodsByName.set(method.name, method);
});
}
}
+export enum MethodType {
+ UNARY,
+ SERVER_STREAMING,
+ CLIENT_STREAMING,
+ BIDIRECTIONAL_STREAMING
+}
+
/** Describes an RPC method. */
export class Method {
readonly service: Service;
@@ -42,8 +71,8 @@
readonly id: number;
readonly clientStreaming: boolean;
readonly serverStreaming: boolean;
- readonly inputType: any;
- readonly outputType: any;
+ readonly requestType: any;
+ readonly responseType: any;
constructor(
descriptor: MethodDescriptorProto,
@@ -55,13 +84,26 @@
this.serverStreaming = descriptor.getServerStreaming()!;
this.clientStreaming = descriptor.getClientStreaming()!;
- const inputTypePath = descriptor.getInputType()!;
- const outputTypePath = descriptor.getOutputType()!;
+ const requestTypePath = descriptor.getInputType()!;
+ const responseTypePath = descriptor.getOutputType()!;
// Remove leading period if it exists.
- this.inputType =
- protoLibrary.getMessageCreator(inputTypePath.replace(/^\./, ''))!;
- this.outputType =
- protoLibrary.getMessageCreator(outputTypePath.replace(/^\./, ''))!;
+ this.requestType =
+ protoLibrary.getMessageCreator(requestTypePath.replace(/^\./, ''))!;
+ this.responseType =
+ protoLibrary.getMessageCreator(responseTypePath.replace(/^\./, ''))!;
+ }
+
+ get type(): MethodType {
+ if (this.clientStreaming && this.serverStreaming) {
+ return MethodType.BIDIRECTIONAL_STREAMING;
+ } else if (this.clientStreaming && !this.serverStreaming) {
+ return MethodType.CLIENT_STREAMING;
+ } else if (!this.clientStreaming && this.serverStreaming) {
+ return MethodType.SERVER_STREAMING;
+ } else if (!this.clientStreaming && !this.serverStreaming) {
+ return MethodType.UNARY;
+ }
+ throw Error('Unhandled streaming condition');
}
}
diff --git a/pw_rpc/ts/descriptors_test.ts b/pw_rpc/ts/descriptors_test.ts
index e2c95af..b52b3c5 100644
--- a/pw_rpc/ts/descriptors_test.ts
+++ b/pw_rpc/ts/descriptors_test.ts
@@ -26,26 +26,27 @@
it('parses from ServiceDescriptor binary', async () => {
const lib = await Library.fromFileDescriptorSet(
TEST_PROTO_PATH, 'test_protos_tspb');
- const sd = lib.fileDescriptorSet.getFileList()[0].getServiceList()[0];
- const service = new descriptors.Service(sd, lib);
+ const fd = lib.fileDescriptorSet.getFileList()[0];
+ const sd = fd.getServiceList()[0];
+ const service = new descriptors.Service(sd, lib, fd.getPackage()!);
- expect(service.name).toEqual('TheTestService')
+ expect(service.name).toEqual('pw.rpc.test1.TheTestService')
expect(service.methods.size).toEqual(4);
- const unaryMethod = service.methods.get('SomeUnary')!;
+ const unaryMethod = service.methodsByName.get('SomeUnary')!;
expect(unaryMethod.name).toEqual('SomeUnary');
expect(unaryMethod.clientStreaming).toBeFalse();
expect(unaryMethod.serverStreaming).toBeFalse();
expect(unaryMethod.service).toEqual(service);
- expect(unaryMethod.inputType).toEqual(SomeMessage);
- expect(unaryMethod.outputType).toEqual(AnotherMessage);
+ expect(unaryMethod.requestType).toEqual(SomeMessage);
+ expect(unaryMethod.responseType).toEqual(AnotherMessage);
- const someBidiStreaming = service.methods.get('SomeBidiStreaming')!;
+ const someBidiStreaming = service.methodsByName.get('SomeBidiStreaming')!;
expect(someBidiStreaming.name).toEqual('SomeBidiStreaming');
expect(someBidiStreaming.clientStreaming).toBeTrue();
expect(someBidiStreaming.serverStreaming).toBeTrue();
expect(someBidiStreaming.service).toEqual(service);
- expect(someBidiStreaming.inputType).toEqual(SomeMessage);
- expect(someBidiStreaming.outputType).toEqual(AnotherMessage);
+ expect(someBidiStreaming.requestType).toEqual(SomeMessage);
+ expect(someBidiStreaming.responseType).toEqual(AnotherMessage);
});
})
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
new file mode 100644
index 0000000..12eb16e
--- /dev/null
+++ b/pw_rpc/ts/method.ts
@@ -0,0 +1,98 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Message} from 'google-protobuf';
+
+import {Call, Callback} from './call';
+import {Channel, Method, MethodType, Service} from './descriptors';
+import {PendingCalls, Rpc} from './rpc_classes';
+
+export function methodStubFactory(
+ rpcs: PendingCalls, channel: Channel, method: Method): MethodStub {
+ switch (method.type) {
+ case MethodType.BIDIRECTIONAL_STREAMING:
+ return new BidirectionStreamingMethodStub(rpcs, channel, method);
+ case MethodType.CLIENT_STREAMING:
+ return new ClientStreamingMethodStub(rpcs, channel, method);
+ case MethodType.SERVER_STREAMING:
+ return new ServerStreamingMethodStub(rpcs, channel, method);
+ case MethodType.UNARY:
+ return new UnaryMethodStub(rpcs, channel, method);
+ }
+}
+
+export abstract class MethodStub {
+ readonly method: Method;
+ private rpcs: PendingCalls;
+ private rpc: Rpc;
+ private channel: Channel;
+
+ private callType: typeof Call = Call;
+
+ constructor(rpcs: PendingCalls, channel: Channel, method: Method) {
+ this.method = method;
+ this.rpcs = rpcs;
+ this.channel = channel;
+ this.rpc = new Rpc(channel, method.service, method)
+ }
+
+ abstract invoke(
+ request: Message,
+ onNext: Callback,
+ onCompleted: Callback,
+ onError: Callback): Call;
+}
+
+class UnaryMethodStub extends MethodStub {
+ // TODO(jaredweinstein): Add blocking invocation.
+ // invokeBlocking(request) {...}
+
+ invoke(
+ request: Message,
+ onNext: Callback = () => {},
+ onCompleted: Callback = () => {},
+ onError: Callback = () => {}): Call {
+ throw Error('ServerStreaming invoke() not implemented');
+ }
+}
+
+class ServerStreamingMethodStub extends MethodStub {
+ invoke(
+ request: Message,
+ onNext: Callback = () => {},
+ onCompleted: Callback = () => {},
+ onError: Callback = () => {}): Call {
+ throw Error('ServerStreaming invoke() not implemented');
+ }
+}
+
+class ClientStreamingMethodStub extends MethodStub {
+ invoke(
+ request: Message,
+ onNext: Callback,
+ onCompleted: Callback,
+ onError: Callback): Call {
+ throw Error('ClientStreaming invoke() not implemented');
+ }
+}
+
+class BidirectionStreamingMethodStub extends MethodStub {
+ invoke(
+ request: Message,
+ onNext: Callback,
+ onCompleted: Callback,
+ onError: Callback): Call {
+ throw Error('BidirectionalStreaming invoke() not implemented');
+ }
+}
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
new file mode 100644
index 0000000..2548ad9
--- /dev/null
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -0,0 +1,130 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {Message} from 'google-protobuf';
+import {Status} from 'pigweed/pw_status/ts/status';
+
+import {Call} from './call';
+import {Channel, Method, Service} from './descriptors';
+import * as packets from './packets';
+
+/** Data class for a pending RPC call. */
+export class Rpc {
+ channel: Channel;
+ service: Service;
+ method: Method;
+
+ constructor(channel: Channel, service: Service, method: Method) {
+ this.channel = channel;
+ this.service = service;
+ this.method = method;
+ }
+
+ /** Returns channel service method id tuple */
+ get idSet(): [number, number, number] {
+ return [this.channel.id, this.service.id, this.method.id];
+ }
+
+ /**
+ * Returns a string sequence to uniquely identify channel, service, and
+ * method. This can be used to hash the Rpc.
+ *
+ * For example: "12346789.23452345.12341234"
+ */
+ get idString(): string {
+ return `${this.channel.id}.${this.service.id}.${this.method.id}`;
+ }
+
+ toString(): string {
+ return `RPC ${this.service.name}.${this.method.name} on channel ` +
+ `${this.channel.id})`;
+ }
+}
+
+/** Tracks pending RPCs and encodes outgoing RPC packets. */
+export class PendingCalls {
+ pending: Map<string, Call> = new Map;
+
+ /** Starts the provided RPC and returns the encoded packet to send. */
+ request(rpc: Rpc, request: Message, call: Call): Uint8Array {
+ this.open(rpc, call)
+ console.log(`Starting ${rpc}`);
+ return packets.encodeRequest(rpc.idSet, request);
+ }
+
+ /** Calls request and sends the resulting packet to the channel. */
+ sendRequest(rpc: Rpc, call: Call, request?: Message): Call|undefined {
+ const previous = this.open(rpc, call);
+ rpc.channel.send(packets.encodeRequest(rpc.idSet, request));
+ return previous;
+ }
+
+ /**
+ * Creates a call for an RPC, but does not invoke it.
+ *
+ * open() can be used to receive streaming responses to an RPC that was not
+ * invoked by this client. For example, a server may stream logs with a
+ * server streaming RPC prior to any clients invoking it.
+ */
+ open(rpc: Rpc, call: Call): Call|undefined {
+ console.debug('Starting %s', rpc);
+ const previous = this.pending.get(rpc.idString);
+ this.pending.set(rpc.idString, call)
+ return previous;
+ }
+
+ sendClientStream() {
+ throw new Error('Method not implemented.');
+ }
+
+ sendClientStreamEnd() {
+ throw new Error('Method not implemented.');
+ }
+
+ /** Cancels the RPC. Returns the CANCEL packet to send. */
+ cancel(rpc: Rpc): Uint8Array|undefined {
+ console.debug(`Cancelling ${rpc}`);
+ this.pending.delete(rpc.idString);
+ if (rpc.method.clientStreaming && rpc.method.serverStreaming) {
+ return undefined;
+ }
+ return packets.encodeCancel(rpc.idSet);
+ }
+
+ /** Calls cancel and sends the cancel packet, if any, to the channel. */
+ sendCancel(rpc: Rpc): boolean {
+ let packet: Uint8Array|undefined;
+ try {
+ packet = this.cancel(rpc);
+ } catch (err) {
+ return false;
+ }
+
+ if (packet !== undefined) {
+ rpc.channel.send(packet);
+ }
+ return true;
+ }
+
+ /** Gets the pending RPC's call. If status is set, clears the RPC. */
+ getPending(rpc: Rpc, status?: Status): Call|undefined {
+ if (status === undefined) {
+ return this.pending.get(rpc.idString);
+ }
+
+ const call = this.pending.get(rpc.idString);
+ this.pending.delete(rpc.idString);
+ return call;
+ }
+}
diff --git a/pw_rpc/ts/test.proto b/pw_rpc/ts/test.proto
index cbd844b..997c7e9 100644
--- a/pw_rpc/ts/test.proto
+++ b/pw_rpc/ts/test.proto
@@ -1,4 +1,4 @@
-// Copyright 2020 The Pigweed Authors
+// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
diff --git a/pw_rpc/ts/test2.proto b/pw_rpc/ts/test2.proto
new file mode 100644
index 0000000..1de8e87
--- /dev/null
+++ b/pw_rpc/ts/test2.proto
@@ -0,0 +1,30 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+syntax = "proto2";
+
+package pw.test2;
+
+message Request {
+ optional float magic_number = 1;
+}
+
+message Response {}
+
+service Alpha {
+ rpc Unary(Request) returns (Response) {}
+}
+
+service Bravo {
+ rpc BidiStreaming(stream Request) returns (stream Response) {}
+}