blob: 2548ad9ae3a8d96bdc59451ab7e1c6add34ce8af [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
import {Message} from 'google-protobuf';
import {Status} from 'pigweed/pw_status/ts/status';
import {Call} from './call';
import {Channel, Method, Service} from './descriptors';
import * as packets from './packets';
/** Data class for a pending RPC call. */
export class Rpc {
channel: Channel;
service: Service;
method: Method;
constructor(channel: Channel, service: Service, method: Method) {
this.channel = channel;
this.service = service;
this.method = method;
}
/** Returns channel service method id tuple */
get idSet(): [number, number, number] {
return [this.channel.id, this.service.id, this.method.id];
}
/**
* Returns a string sequence to uniquely identify channel, service, and
* method. This can be used to hash the Rpc.
*
* For example: "12346789.23452345.12341234"
*/
get idString(): string {
return `${this.channel.id}.${this.service.id}.${this.method.id}`;
}
toString(): string {
return `RPC ${this.service.name}.${this.method.name} on channel ` +
`${this.channel.id})`;
}
}
/** Tracks pending RPCs and encodes outgoing RPC packets. */
export class PendingCalls {
pending: Map<string, Call> = new Map;
/** Starts the provided RPC and returns the encoded packet to send. */
request(rpc: Rpc, request: Message, call: Call): Uint8Array {
this.open(rpc, call)
console.log(`Starting ${rpc}`);
return packets.encodeRequest(rpc.idSet, request);
}
/** Calls request and sends the resulting packet to the channel. */
sendRequest(rpc: Rpc, call: Call, request?: Message): Call|undefined {
const previous = this.open(rpc, call);
rpc.channel.send(packets.encodeRequest(rpc.idSet, request));
return previous;
}
/**
* Creates a call for an RPC, but does not invoke it.
*
* open() can be used to receive streaming responses to an RPC that was not
* invoked by this client. For example, a server may stream logs with a
* server streaming RPC prior to any clients invoking it.
*/
open(rpc: Rpc, call: Call): Call|undefined {
console.debug('Starting %s', rpc);
const previous = this.pending.get(rpc.idString);
this.pending.set(rpc.idString, call)
return previous;
}
sendClientStream() {
throw new Error('Method not implemented.');
}
sendClientStreamEnd() {
throw new Error('Method not implemented.');
}
/** Cancels the RPC. Returns the CANCEL packet to send. */
cancel(rpc: Rpc): Uint8Array|undefined {
console.debug(`Cancelling ${rpc}`);
this.pending.delete(rpc.idString);
if (rpc.method.clientStreaming && rpc.method.serverStreaming) {
return undefined;
}
return packets.encodeCancel(rpc.idSet);
}
/** Calls cancel and sends the cancel packet, if any, to the channel. */
sendCancel(rpc: Rpc): boolean {
let packet: Uint8Array|undefined;
try {
packet = this.cancel(rpc);
} catch (err) {
return false;
}
if (packet !== undefined) {
rpc.channel.send(packet);
}
return true;
}
/** Gets the pending RPC's call. If status is set, clears the RPC. */
getPending(rpc: Rpc, status?: Status): Call|undefined {
if (status === undefined) {
return this.pending.get(rpc.idString);
}
const call = this.pending.get(rpc.idString);
this.pending.delete(rpc.idString);
return call;
}
}