pw_rpc: Non-blocking unary RPC
Tests: `bazel test //pw_rpc/ts:rpc_test`
Bug: b/194329554
Change-Id: Ie8777c18f063c5746277ff9be77691773b2fda29
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60924
Reviewed-by: Alexei Frolov <frolv@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/ts/BUILD.bazel b/pw_rpc/ts/BUILD.bazel
index a14688e..066dc46 100644
--- a/pw_rpc/ts/BUILD.bazel
+++ b/pw_rpc/ts/BUILD.bazel
@@ -81,6 +81,7 @@
"//pw_protobuf_compiler/ts:proto_lib",
"//pw_rpc:packet_proto_tspb",
"//pw_status/ts:pw_status",
+ "@npm//@types/google-protobuf",
"@npm//@types/jasmine",
"@npm//@types/node",
],
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 9f4dabb..82f59c1 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -19,6 +19,19 @@
export type Callback = (a: any) => any;
+class RpcError extends Error {
+ constructor(rpc: Rpc, status: Status) {
+ let message = '';
+ if (status === Status.NOT_FOUND) {
+ message = ': the RPC server does not support this RPC';
+ } else if (status === Status.DATA_LOSS) {
+ message = ': an error occurred while decoding the RPC payload';
+ }
+
+ super(`${rpc.method} failed with error ${Status[status]}${message}`);
+ }
+}
+
/** Represent an in-progress or completed RPC call. */
export class Call {
private rpcs: PendingCalls;
@@ -34,7 +47,6 @@
error?: Status;
callbackException?: Error;
-
constructor(
rpcs: PendingCalls,
rpc: Rpc,
@@ -82,6 +94,11 @@
this.invokeCallback(callback)
}
+ handleError(error: Status): void {
+ this.error = error;
+ this.invokeCallback(() => this.onError(error));
+ }
+
cancel(): boolean {
if (this.completed()) {
return false;
@@ -91,10 +108,14 @@
return this.rpcs.sendCancel(this.rpc);
}
- handleError(error: Status): void {
- this.error = error
- const callback = () => this.onError(2);
- this.invokeCallback(callback);
+
+ private checkErrors(): void {
+ if (this.callbackException !== undefined) {
+ throw this.callbackException;
+ }
+ if (this.error !== undefined) {
+ throw new RpcError(this.rpc, this.error);
+ }
}
}
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index ab9c510..99fdbe2 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -15,13 +15,15 @@
/* eslint-env browser, jasmine */
import 'jasmine';
+import {Message} from 'google-protobuf';
import {PacketType, RpcPacket} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'
-import {Library} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
+import {Library, MessageCreator} from 'pigweed/pw_protobuf_compiler/ts/proto_lib';
import {Status} from 'pigweed/pw_status/ts/status';
import {Request} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test2_pb'
import {Client} from './client';
-import {Channel} from './descriptors';
+import {Channel, Method} from './descriptors';
+import {MethodStub} from './method';
import * as packets from './packets';
const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
@@ -129,4 +131,152 @@
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
});
-})
+});
+
+describe('RPC', () => {
+ let lib: Library;
+ let client: Client;
+ let lastPacketSent: RpcPacket;
+ let requests: RpcPacket[] = [];
+ let nextPackets: [Uint8Array, Status][] = [];
+ let responseLock = false;
+
+ beforeEach(async () => {
+ lib = await Library.fromFileDescriptorSet(
+ TEST_PROTO_PATH, 'test_protos_tspb');
+ const channels = [new Channel(1, handlePacket), new Channel(2, () => {})];
+ client = Client.fromProtoSet(channels, lib);
+ });
+
+ function enqueueResponse(
+ channelId: number,
+ method: Method,
+ status: Status,
+ payload: Uint8Array = new Uint8Array()) {
+ const packet = new RpcPacket();
+ packet.setType(PacketType.RESPONSE);
+ packet.setChannelId(channelId);
+ packet.setServiceId(method.service.id);
+ packet.setMethodId(method.id);
+ packet.setStatus(status)
+ packet.setPayload(payload);
+
+ nextPackets.push([packet.serializeBinary(), Status.OK]);
+ }
+
+ function enqueueServerStream(
+ channelId: number,
+ method: Method,
+ response: Uint8Array,
+ status: Status = Status.OK) {
+ const packet = new RpcPacket();
+ packet.setType(PacketType.SERVER_STREAM);
+ packet.setChannelId(channelId);
+ packet.setServiceId(method.service.id);
+ packet.setMethodId(method.id);
+ packet.setPayload(response);
+ nextPackets.push([packet.serializeBinary(), status]);
+ }
+
+
+ function lastRequest(): RpcPacket {
+ if (requests.length == 0) {
+ throw Error('Tried to fetch request from empty list');
+ }
+ return requests[requests.length - 1];
+ }
+
+ function sentPayload(messageType: typeof Message): any {
+ return messageType.deserializeBinary(lastRequest().getPayload_asU8());
+ }
+
+ function handlePacket(data: Uint8Array): void {
+ requests.push(packets.decode(data));
+
+ if (responseLock == true) {
+ return;
+ }
+
+ // Avoid infinite recursion when processing a packet causes another packet
+ // to send.
+ responseLock = true;
+ for (const [packet, status] of nextPackets) {
+ expect(client.processPacket(packet)).toEqual(status);
+ }
+ nextPackets = [];
+ responseLock = false;
+ }
+
+ describe('Unary', () => {
+ let unaryStub: MethodStub;
+ let request: any;
+ let requestType: MessageCreator;
+ let responseType: MessageCreator;
+
+ beforeEach(async () => {
+ unaryStub = client.channel()?.methodStub(
+ 'pw.rpc.test1.TheTestService.SomeUnary')!;
+ requestType = unaryStub.method.requestType;
+ responseType = unaryStub.method.responseType;
+ request = new requestType();
+ });
+
+
+ it('nonblocking call', () => {
+ for (let i = 0; i < 3; i++) {
+ const response: any = new responseType();
+ response.setPayload('hello world');
+ const payload = response.serializeBinary();
+ enqueueResponse(1, unaryStub.method, Status.ABORTED, payload);
+
+ request.setMagicNumber(5);
+
+ const onNext = jasmine.createSpy();
+ const onCompleted = jasmine.createSpy();
+ const onError = jasmine.createSpy();
+ const call = unaryStub.invoke(request, onNext, onCompleted, onError);
+
+ expect(sentPayload(unaryStub.method.requestType).getMagicNumber())
+ .toEqual(5);
+ expect(onNext).toHaveBeenCalledOnceWith(response);
+ expect(onError).not.toHaveBeenCalled();
+ expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
+ }
+ });
+
+ it('nonblocking call cancel', () => {
+ request.setMagicNumber(5);
+
+ let onNext = jasmine.createSpy();
+ const call = unaryStub.invoke(request, onNext, () => {}, () => {});
+
+ expect(requests.length).toBeGreaterThan(0);
+ requests = [];
+
+ expect(call.cancel()).toBeTrue()
+ expect(call.cancel()).toBeFalse()
+ expect(onNext).not.toHaveBeenCalled()
+ });
+
+ it('nonblocking duplicate calls first is cancelled', () => {
+ const firstCall = unaryStub.invoke(request, () => {}, () => {}, () => {});
+ expect(firstCall.completed()).toBeFalse();
+
+ const secondCall =
+ unaryStub.invoke(request, () => {}, () => {}, () => {});
+ expect(firstCall.error).toEqual(Status.CANCELLED);
+ expect(secondCall.completed()).toBeFalse();
+ });
+
+ it('nonblocking exception in callback', () => {
+ const errorCallback = () => {
+ throw Error('Something went wrong!');
+ };
+
+ enqueueResponse(1, unaryStub.method, Status.OK);
+ const call = unaryStub.invoke(request, errorCallback, () => {}, () => {});
+ expect(call.callbackException!.name).toEqual('Error');
+ expect(call.callbackException!.message).toEqual('Something went wrong!');
+ });
+ })
+});
diff --git a/pw_rpc/ts/docs.rst b/pw_rpc/ts/docs.rst
index a28d4a8..fb8a0ba 100644
--- a/pw_rpc/ts/docs.rst
+++ b/pw_rpc/ts/docs.rst
@@ -58,3 +58,37 @@
const channel = client.channel()!;
const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
+
+Calling an RPC
+==============
+
+Unary RPC
+---------
+Only non blocking calls are currently supported.
+
+.. code-block:: typescript
+
+ unaryStub = client.channel()?.methodStub(
+ 'pw.rpc.test1.TheTestService.SomeUnary')!;
+ request = new unaryStub.method.requestType();
+ request.setFooProperty('hello world');
+ const call = unaryStub.invoke(request, (response) => {
+ console.log(response);
+ });
+
+Server Streaming RPC
+--------------------
+Unsupported
+
+Client Streaming RPC
+--------------------
+Unsupported
+
+Bidirectional Stream RPC
+------------------------
+Unsupported
+
+.. attention::
+
+ RPC timeout is currently unsupported on all RPC types.
+
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index 12eb16e..3ea505d 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -14,7 +14,7 @@
import {Message} from 'google-protobuf';
-import {Call, Callback} from './call';
+import {Call, Callback, ServerStreamingCall, UnaryCall} from './call';
import {Channel, Method, MethodType, Service} from './descriptors';
import {PendingCalls, Rpc} from './rpc_classes';
@@ -32,14 +32,12 @@
}
}
-export abstract class MethodStub {
+export class MethodStub {
readonly method: Method;
- private rpcs: PendingCalls;
- private rpc: Rpc;
+ readonly rpcs: PendingCalls;
+ readonly rpc: Rpc;
private channel: Channel;
- private callType: typeof Call = Call;
-
constructor(rpcs: PendingCalls, channel: Channel, method: Method) {
this.method = method;
this.rpcs = rpcs;
@@ -47,11 +45,13 @@
this.rpc = new Rpc(channel, method.service, method)
}
- abstract invoke(
- request: Message,
- onNext: Callback,
- onCompleted: Callback,
- onError: Callback): Call;
+ invoke(
+ request?: Message,
+ onNext: Callback = () => {},
+ onCompleted: Callback = () => {},
+ onError: Callback = () => {}): UnaryCall {
+ throw Error('invoke() not implemented');
+ }
}
class UnaryMethodStub extends MethodStub {
@@ -59,21 +59,24 @@
// invokeBlocking(request) {...}
invoke(
- request: Message,
+ request?: Message,
onNext: Callback = () => {},
onCompleted: Callback = () => {},
- onError: Callback = () => {}): Call {
- throw Error('ServerStreaming invoke() not implemented');
+ onError: Callback = () => {}): UnaryCall {
+ const call =
+ new UnaryCall(this.rpcs, this.rpc, onNext, onCompleted, onError);
+ call.invoke(request!);
+ return call;
}
}
class ServerStreamingMethodStub extends MethodStub {
invoke(
- request: Message,
+ request?: Message,
onNext: Callback = () => {},
onCompleted: Callback = () => {},
- onError: Callback = () => {}): Call {
- throw Error('ServerStreaming invoke() not implemented');
+ onError: Callback = () => {}): ServerStreamingCall {
+ throw Error('ClientStreaming invoke() not implemented');
}
}