pw_rpc: Process packet logic
Adds the primary logic for handling a packet including supporting
call logic.
Tested with `bazel test //pw_rpc/ts:rpc_test`
Bug: b/194329554
Change-Id: Id931629e778e7059803bb546c83bb99afe07b3cb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60923
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index 3beca86..a14688e 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -77,8 +77,10 @@
deps = [
":rpc",
":rpc_utils", # TODO(jaredweinstein): remove dependency.
+ ":test_protos_tspb",
"//pw_protobuf_compiler/ts:proto_lib",
"//pw_rpc:packet_proto_tspb",
+ "//pw_status/ts:pw_status",
"@npm//@types/jasmine",
"@npm//@types/node",
],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 6ab8b90..9f4dabb 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -12,6 +12,7 @@
// License for the specific language governing permissions and limitations under
// the License.
+import {Message} from 'google-protobuf';
import {Status} from 'pigweed/pw_status/ts/status';
import {PendingCalls, Rpc} from './rpc_classes';
@@ -27,6 +28,13 @@
private onCompleted: Callback;
private onError: Callback;
+ // TODO(jaredweinstein): support async timeout.
+ // private timeout: number;
+ private status?: Status;
+ error?: Status;
+ callbackException?: Error;
+
+
constructor(
rpcs: PendingCalls,
rpc: Rpc,
@@ -40,6 +48,54 @@
this.onCompleted = onCompleted;
this.onError = onError;
}
+
+ /* Calls the RPC. This must be called immediately after construction. */
+ invoke(request?: Message): void {
+ const previous = this.rpcs.sendRequest(this.rpc, this, request);
+
+ if (previous !== undefined && !previous.completed()) {
+ previous.handleError(Status.CANCELLED)
+ }
+ }
+
+ private invokeCallback(f: any) {
+ try {
+ f();
+ } catch (err) {
+ console.error(
+ `An exception was raised while invoking a callback: ${err}`);
+ this.callbackException = err
+ }
+ }
+
+ completed(): boolean {
+ return (this.status !== undefined || this.error !== undefined);
+ }
+
+ handleResponse(response: Message): void {
+ const callback = () => this.onNext(response);
+ this.invokeCallback(callback)
+ }
+
+ handleCompletion(status: Status) {
+ const callback = () => this.onCompleted(status);
+ this.invokeCallback(callback)
+ }
+
+ cancel(): boolean {
+ if (this.completed()) {
+ return false;
+ }
+
+ this.error = Status.CANCELLED
+ return this.rpcs.sendCancel(this.rpc);
+ }
+
+ handleError(error: Status): void {
+ this.error = error
+ const callback = () => this.onError(2);
+ this.invokeCallback(callback);
+ }
}
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
index 6c34fe0..3dd47ed 100644
--- a/pw_rpc/ts/client.ts
+++ b/pw_rpc/ts/client.ts
@@ -14,11 +14,16 @@
/** Provides a pw_rpc client for TypeScript. */
+import {Message} from 'google-protobuf';
+import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Status} from 'pigweed/pw_status/ts/status';
import {Channel, Service} from './descriptors';
import {MethodStub, methodStubFactory} from './method';
-import {PendingCalls} from './rpc_classes';
+import * as packets from './packets';
+import {PendingCalls, Rpc} from './rpc_classes';
+
/**
* Object for managing RPC service and contained methods.
@@ -96,9 +101,9 @@
* ```
*/
export class Client {
- private services = new Map<number, Service>();
private channelsById = new Map<number, ChannelClient>();
readonly rpcs: PendingCalls;
+ readonly services = new Map<number, Service>();
constructor(channels: Channel[], services: Service[]) {
this.rpcs = new PendingCalls();
@@ -145,4 +150,133 @@
}
return this.channelsById.get(id);
}
+
+ /**
+ * Creates a new RPC object holding channel, method, and service info.
+ * Returns undefined if the service or method does not exist.
+ */
+ private rpc(packet: RpcPacket, channelClient: ChannelClient): Rpc|undefined {
+ const service = this.services.get(packet.getServiceId());
+ if (service == undefined) {
+ return undefined;
+ }
+ const method = service.methods.get(packet.getMethodId());
+ if (method == undefined) {
+ return undefined;
+ }
+ return new Rpc(channelClient.channel, service, method);
+ }
+
+ private decodeStatus(rpc: Rpc, packet: RpcPacket): Status|undefined {
+ if (packet.getType() === PacketType.SERVER_STREAM) {
+ return;
+ }
+ return packet.getStatus();
+ }
+
+ private decodePayload(rpc: Rpc, packet: RpcPacket): Message|undefined {
+ if (packet.getType() === PacketType.SERVER_ERROR) {
+ return undefined;
+ }
+
+ if (packet.getType() === PacketType.RESPONSE &&
+ rpc.method.serverStreaming) {
+ return undefined;
+ }
+
+ const payload = packet.getPayload_asU8();
+ return packets.decodePayload(payload, rpc.method.responseType);
+ }
+
+ private sendClientError(
+ client: ChannelClient, packet: RpcPacket, error: Status) {
+ client.channel.send(packets.encodeClientError(packet, error));
+ }
+
+ /**
+ * Processes an incoming packet.
+ *
+ * @param {Uint8Array} rawPacketData binary data for a pw_rpc packet.
+ * @return {Status} The status of processing the packet.
+ * - OK: the packet was processed by the client
+ * - DATA_LOSS: the packet could not be decoded
+ * - INVALID_ARGUMENT: the packet is for a server, not a client
+ * - NOT_FOUND: the packet's channel ID is not known to this client
+ */
+ processPacket(rawPacketData: Uint8Array): Status {
+ let packet;
+ try {
+ packet = packets.decode(rawPacketData)
+ } catch (err) {
+ console.warn(`Failed to decode packet: ${err}`);
+ console.debug(`Raw packet: ${rawPacketData}`);
+ return Status.DATA_LOSS;
+ }
+
+ if (packets.forServer(packet)) {
+ return Status.INVALID_ARGUMENT;
+ }
+
+ const channelClient = this.channelsById.get(packet.getChannelId())
+ if (channelClient == undefined) {
+ console.warn(`Unrecognized channel ID: ${packet.getChannelId()}`)
+ return Status.NOT_FOUND;
+ }
+
+ const rpc = this.rpc(packet, channelClient)
+ if (rpc == undefined) {
+ this.sendClientError(channelClient, packet, Status.NOT_FOUND);
+ console.warn('rpc service/method not found');
+ return Status.OK;
+ }
+
+ if (packet.getType() !== PacketType.RESPONSE &&
+ packet.getType() !== PacketType.SERVER_STREAM &&
+ packet.getType() !== PacketType.SERVER_ERROR) {
+ console.error(`${rpc}: Unexpected packet type ${packet.getType()}`)
+ console.debug(`Packet: ${packet}`);
+ return Status.OK;
+ }
+
+ let status = this.decodeStatus(rpc, packet);
+ let payload;
+ try {
+ payload = this.decodePayload(rpc, packet);
+ } catch (error) {
+ this.sendClientError(channelClient, packet, Status.DATA_LOSS);
+ console.warn(`Failed to decode response: ${error}`);
+ console.debug(`Raw payload: ${packet.getPayload()}`);
+
+ // Make this an error packet so the error handler is called.
+ packet.setType(PacketType.SERVER_ERROR);
+ status = Status.DATA_LOSS
+ }
+
+ let call = this.rpcs.getPending(rpc, status);
+ if (call === undefined) {
+ this.sendClientError(channelClient, packet, Status.FAILED_PRECONDITION);
+ console.debug(`Discarding response for ${rpc}, which is not pending`);
+ return Status.OK;
+ }
+
+ if (packet.getType() === PacketType.SERVER_ERROR) {
+ if (status === Status.OK) {
+ throw 'Unexpected OK status on SERVER_ERROR';
+ }
+ if (status === undefined) {
+ throw 'Missing status on SERVER_ERROR';
+ }
+ console.warn(`${rpc}: invocation failed with Status: ${Status[status]}`);
+ call.handleError(status);
+ return Status.OK;
+ }
+
+ if (payload !== undefined) {
+ call.handleResponse(payload);
+ }
+ if (status !== undefined) {
+ call.handleCompletion(status);
+ }
+ return Status.OK;
+ }
}
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 80cb97a..ab9c510 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -15,25 +15,33 @@
/* eslint-env browser, jasmine */
import 'jasmine';
-import {RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
+import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Status} from 'pigweed/pw_status/ts/status';
+import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
import {Client} from './client';
import {Channel} from './descriptors';
+import * as packets from './packets';
const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
describe('Client', () => {
let lib: Library;
let client: Client;
+ let lastPacketSent: RpcPacket;
beforeEach(async () => {
lib = await Library.fromFileDescriptorSet(
TEST_PROTO_PATH, 'test_protos_tspb');
- const channels = [new Channel(1), new Channel(5)];
+ const channels = [new Channel(1, savePacket), new Channel(5)];
client = Client.fromProtoSet(channels, lib);
});
+ function savePacket(packetBytes: Uint8Array): void {
+ lastPacketSent = RpcPacket.deserializeBinary(packetBytes);
+ }
+
it('channel returns undefined for empty list', () => {
const channels = Array<Channel>();
const emptyChannelClient = Client.fromProtoSet(channels, lib);
@@ -61,4 +69,64 @@
expect(channel.methodStub('pw.rpc.test1.TheTestService.Garbage'))
.toBeUndefined();
})
+
+ it('processPacket with invalid proto data', () => {
+ const textEncoder = new TextEncoder();
+ const data = textEncoder.encode('NOT a packet!');
+ expect(client.processPacket(data)).toEqual(Status.DATA_LOSS)
+ });
+
+ it('processPacket not for client', () => {
+ const packet = new RpcPacket();
+ packet.setType(PacketType.REQUEST);
+ const processStatus = client.processPacket(packet.serializeBinary());
+ expect(processStatus).toEqual(Status.INVALID_ARGUMENT);
+ });
+
+ it('processPacket for unrecognized channel', () => {
+ const packet = packets.encodeResponse([123, 456, 789], new Request());
+ expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND);
+ });
+
+ it('processPacket for unrecognized service', () => {
+ const packet = packets.encodeResponse([1, 456, 789], new Request());
+ const status = client.processPacket(packet);
+ expect(client.processPacket(packet)).toEqual(Status.OK);
+
+ expect(lastPacketSent.getChannelId()).toEqual(1);
+ expect(lastPacketSent.getServiceId()).toEqual(456);
+ expect(lastPacketSent.getMethodId()).toEqual(789);
+ expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+ expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
+ });
+
+ it('processPacket for unrecognized method', () => {
+ const service = client.services.values().next().value;
+
+ const packet = packets.encodeResponse([1, service.id, 789], new Request());
+ const status = client.processPacket(packet);
+ expect(client.processPacket(packet)).toEqual(Status.OK);
+
+ expect(lastPacketSent.getChannelId()).toEqual(1);
+ expect(lastPacketSent.getServiceId()).toEqual(service.id);
+ expect(lastPacketSent.getMethodId()).toEqual(789);
+ expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+ expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
+ });
+
+ it('processPacket for non-pending method', () => {
+ const service = client.services.values().next().value;
+ const method = service.methods.values().next().value;
+
+ const packet =
+ packets.encodeResponse([1, service.id, method.id], new Request());
+ const status = client.processPacket(packet);
+ expect(client.processPacket(packet)).toEqual(Status.OK);
+
+ expect(lastPacketSent.getChannelId()).toEqual(1);
+ expect(lastPacketSent.getServiceId()).toEqual(service.id);
+ expect(lastPacketSent.getMethodId()).toEqual(method.id);
+ expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
+ expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
+ });
})
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index 95db78f..a28d4a8 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -8,3 +8,53 @@
handling RPCs.
This package is currently a work in progress.
+
+Creating an RPC Client
+======================
+The RPC client is instantiated from a list of channels and a set of protos.
+
+.. code-block:: typescript
+
+ const testProtoPath = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
+ const lib = await Library.fromFileDescriptorSet(
+ testProtoPath, 'test_protos_tspb');
+ const channels = [new Channel(1, savePacket), new Channel(5)];
+ const client = Client.fromProtoSet(channels, lib);
+
+ function savePacket(packetBytes: Uint8Array): void {
+ const packet = RpcPacket.deserializeBinary(packetBytes);
+ ...
+ }
+
+The proto library must match the proto build rules. The first argument
+corresponds with the location of the ``proto_library`` build rule that generates
+a descriptor set for all src protos. The second argument corresponds with the
+name of the ``js_proto_library`` build rule that generates javascript based on
+the descriptor set. For instance, the previous example corresponds with the
+following build file: ``pw_rpc/ts/BUILD.bazel``.
+
+.. code-block::
+
+ proto_library(
+ name = "test_protos",
+ srcs = [
+ "test.proto",
+ "test2.proto",
+ ],
+ )
+
+ js_proto_library(
+ name = "test_protos_tspb",
+ protos = [":test_protos"],
+ )
+
+Finding an RPC Method
+=====================
+Once the client is instantiated with the correct proto library, the target RPC
+method is found by searching based on the full name:
+``{packageName}.{serviceName}.{methodName}``
+
+.. code-block:: typescript
+
+ const channel = client.channel()!;
+ const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;