blob: e71f9bff5f002cd4515577d94d1bc3866380c048 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
import {
BidirectionalStreamingCall,
BidirectionalStreamingMethodStub,
ServiceClient,
} from '@pigweed/pw_rpc';
import {Status} from '@pigweed/pw_status';
import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
export class ProgressStats {
constructor(
readonly bytesSent: number,
readonly bytesConfirmedReceived: number,
readonly totalSizeBytes?: number
) {}
get percentReceived(): number {
if (this.totalSizeBytes === undefined) {
return NaN;
}
return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100;
}
toString(): string {
const total =
this.totalSizeBytes === undefined
? 'undefined'
: this.totalSizeBytes.toString();
const percent = this.percentReceived.toFixed(2);
return (
`${percent}% (${this.bytesSent} B sent, ` +
`${this.bytesConfirmedReceived} B received of ${total} B)`
);
}
}
export type ProgressCallback = (stats: ProgressStats) => void;
/** A Timer which invokes a callback after a certain timeout. */
class Timer {
private task?: ReturnType<typeof setTimeout>;
constructor(
readonly timeoutS: number,
private readonly callback: () => any
) {}
/**
* Starts a new timer.
*
* If a timer is already running, it is stopped and a new timer started.
* This can be used to implement watchdog-like behavior, where a callback
* is invoked after some time without a kick.
*/
start() {
this.stop();
this.task = setTimeout(this.callback, this.timeoutS * 1000);
}
/** Terminates a running timer. */
stop() {
if (this.task !== undefined) {
clearTimeout(this.task);
this.task = undefined;
}
}
}
/**
* A client-side data transfer through a Manager.
*
* Subclasses are responsible for implementing all of the logic for their type
* of transfer, receiving messages from the server and sending the appropriate
* messages in response.
*/
export abstract class Transfer {
status: Status = Status.OK;
done: Promise<Status>;
protected data = new Uint8Array();
private retries = 0;
private responseTimer?: Timer;
private resolve?: (value: Status | PromiseLike<Status>) => void;
constructor(
public id: number,
protected sendChunk: (chunk: Chunk) => void,
responseTimeoutS: number,
private maxRetries: number,
private progressCallback?: ProgressCallback
) {
this.responseTimer = new Timer(responseTimeoutS, this.onTimeout);
this.done = new Promise<Status>(resolve => {
this.resolve = resolve!;
});
}
/** Returns the initial chunk to notify the server of the transfer. */
protected abstract get initialChunk(): Chunk;
/** Handles a chunk that contains or requests data. */
protected abstract handleDataChunk(chunk: Chunk): void;
/** Retries after a timeout occurs. */
protected abstract retryAfterTimeout(): void;
/** Handles a timeout while waiting for a chunk. */
private onTimeout = () => {
this.retries += 1;
if (this.retries > this.maxRetries) {
this.finish(Status.DEADLINE_EXCEEDED);
return;
}
console.debug(
`Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`
);
this.retryAfterTimeout();
this.responseTimer?.start();
};
/** Sends an error chunk to the server and finishes the transfer. */
protected sendError(error: Status): void {
const chunk = new Chunk();
chunk.setStatus(error);
chunk.setTransferId(this.id);
chunk.setType(Chunk.Type.COMPLETION);
this.sendChunk(chunk);
this.finish(error);
}
/** Sends the initial chunk of the transfer. */
begin(): void {
this.sendChunk(this.initialChunk);
this.responseTimer?.start();
}
/** Ends the transfer with the specified status. */
finish(status: Status): void {
this.responseTimer?.stop();
this.responseTimer = undefined;
this.status = status;
if (status === Status.OK) {
const totalSize = this.data.length;
this.updateProgress(totalSize, totalSize, totalSize);
}
this.resolve!(this.status);
}
/** Invokes the provided progress callback, if any, with the progress */
updateProgress(
bytesSent: number,
bytesConfirmedReceived: number,
totalSizeBytes?: number
): void {
const stats = new ProgressStats(
bytesSent,
bytesConfirmedReceived,
totalSizeBytes
);
console.debug(`Transfer ${this.id} progress: ${stats}`);
if (this.progressCallback !== undefined) {
this.progressCallback(stats);
}
}
/**
* Processes an incoming chunk from the server.
*
* Handles terminating chunks (i.e. those with a status) and forwards
* non-terminating chunks to handle_data_chunk.
*/
handleChunk(chunk: Chunk): void {
this.responseTimer?.stop();
this.retries = 0; // Received data from service, so reset the retries.
console.debug(`Received chunk:(${chunk})`);
// Status chunks are only used to terminate a transfer. They do not
// contain any data that requires processing.
if (chunk.hasStatus()) {
this.finish(chunk.getStatus());
return;
}
this.handleDataChunk(chunk);
// Start the timeout for the server to send a chunk in response.
this.responseTimer?.start();
}
}
/**
* A client <= server read transfer.
*
* Although typescript can effectively handle an unlimited transfer window, this
* client sets a conservative window and chunk size to avoid overloading the
* device. These are configurable in the constructor.
*/
export class ReadTransfer extends Transfer {
private maxBytesToReceive: number;
private maxChunkSize: number;
private chunkDelayMicroS?: number; // Microseconds
private remainingTransferSize?: number;
private offset = 0;
private pendingBytes: number;
private windowEndOffset: number;
// The fractional position within a window at which a receive transfer should
// extend its window size to minimize the amount of time the transmitter
// spends blocked.
//
// For example, a divisor of 2 will extend the window when half of the
// requested data has been received, a divisor of three will extend at a third
// of the window, and so on.
private static EXTEND_WINDOW_DIVISOR = 2;
data = new Uint8Array();
constructor(
id: number,
sendChunk: (chunk: Chunk) => void,
responseTimeoutS: number,
maxRetries: number,
progressCallback?: ProgressCallback,
maxBytesToReceive = 8192,
maxChunkSize = 1024,
chunkDelayMicroS?: number
) {
super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
this.maxBytesToReceive = maxBytesToReceive;
this.maxChunkSize = maxChunkSize;
this.chunkDelayMicroS = chunkDelayMicroS;
this.pendingBytes = maxBytesToReceive;
this.windowEndOffset = maxBytesToReceive;
}
protected get initialChunk(): Chunk {
return this.transferParameters(Chunk.Type.START);
}
/** Builds an updated transfer parameters chunk to send the server. */
private transferParameters(type: Chunk.TypeMap[keyof Chunk.TypeMap]): Chunk {
this.pendingBytes = this.maxBytesToReceive;
this.windowEndOffset = this.offset + this.maxBytesToReceive;
const chunk = new Chunk();
chunk.setTransferId(this.id);
chunk.setPendingBytes(this.pendingBytes);
chunk.setMaxChunkSizeBytes(this.maxChunkSize);
chunk.setOffset(this.offset);
chunk.setWindowEndOffset(this.windowEndOffset);
chunk.setType(type);
if (this.chunkDelayMicroS !== 0) {
chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
}
return chunk;
}
/**
* Processes an incoming chunk from the server.
*
* In a read transfer, the client receives data chunks from the server.
* Once all pending data is received, the transfer parameters are updated.
*/
protected handleDataChunk(chunk: Chunk): void {
if (chunk.getOffset() != this.offset) {
// Initially, the transfer service only supports in-order transfers.
// If data is received out of order, request that the server
// retransmit from the previous offset.
this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
return;
}
const oldData = this.data;
const chunkData = chunk.getData() as Uint8Array;
this.data = new Uint8Array(chunkData.length + oldData.length);
this.data.set(oldData);
this.data.set(chunkData, oldData.length);
this.pendingBytes -= chunk.getData().length;
this.offset += chunk.getData().length;
if (chunk.hasRemainingBytes()) {
if (chunk.getRemainingBytes() === 0) {
// No more data to read. Acknowledge receipt and finish.
const endChunk = new Chunk();
endChunk.setTransferId(this.id);
endChunk.setStatus(Status.OK);
endChunk.setType(Chunk.Type.COMPLETION);
this.sendChunk(endChunk);
this.finish(Status.OK);
return;
}
this.remainingTransferSize = chunk.getRemainingBytes();
} else if (this.remainingTransferSize !== undefined) {
// Update the remaining transfer size, if it is known.
this.remainingTransferSize -= chunk.getData().length;
if (this.remainingTransferSize <= 0) {
this.remainingTransferSize = undefined;
}
}
if (chunk.getWindowEndOffset() !== 0) {
if (chunk.getWindowEndOffset() < this.offset) {
console.error(
`Transfer ${
this.id
}: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${
this.offset
})`
);
this.sendError(Status.INTERNAL);
return;
}
if (chunk.getWindowEndOffset() < this.offset) {
console.error(
`Transfer ${
this.id
}: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${
this.windowEndOffset
})`
);
this.sendError(Status.INTERNAL);
return;
}
this.windowEndOffset = chunk.getWindowEndOffset();
this.pendingBytes -= chunk.getWindowEndOffset() - this.offset;
}
const remainingWindowSize = this.windowEndOffset - this.offset;
const extendWindow =
remainingWindowSize <=
this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR;
const totalSize =
this.remainingTransferSize === undefined
? undefined
: this.remainingTransferSize + this.offset;
this.updateProgress(this.offset, this.offset, totalSize);
if (this.pendingBytes === 0) {
// All pending data was received. Send out a new parameters chunk
// for the next block.
this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
} else if (extendWindow) {
this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE));
}
}
protected retryAfterTimeout(): void {
this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
}
}
/**
* A client => server write transfer.
*/
export class WriteTransfer extends Transfer {
readonly data: Uint8Array;
private windowId = 0;
offset = 0;
maxChunkSize = 0;
chunkDelayMicroS?: number;
windowEndOffset = 0;
lastChunk: Chunk;
constructor(
id: number,
data: Uint8Array,
sendChunk: (chunk: Chunk) => void,
responseTimeoutS: number,
initialResponseTimeoutS: number,
maxRetries: number,
progressCallback?: ProgressCallback
) {
super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
this.data = data;
this.lastChunk = this.initialChunk;
}
protected get initialChunk(): Chunk {
// TODO(frolv): The session ID should not be set here but assigned by the
// server during an initial handshake.
const chunk = new Chunk();
chunk.setTransferId(this.id);
chunk.setResourceId(this.id);
chunk.setType(Chunk.Type.START);
return chunk;
}
/**
* Processes an incoming chunk from the server.
*
* In a write transfer, the server only sends transfer parameter updates
* to the client. When a message is received, update local parameters and
* send data accordingly.
*/
protected handleDataChunk(chunk: Chunk): void {
this.windowId += 1;
const initialWindowId = this.windowId;
if (!this.handleParametersUpdate(chunk)) {
return;
}
const bytesAknowledged = chunk.getOffset();
let writeChunk: Chunk;
while (true) {
writeChunk = this.nextChunk();
this.offset += writeChunk.getData().length;
const sentRequestedBytes = this.offset === this.windowEndOffset;
this.updateProgress(this.offset, bytesAknowledged, this.data.length);
this.sendChunk(writeChunk);
if (sentRequestedBytes) {
break;
}
}
this.lastChunk = writeChunk;
}
/** Updates transfer state base on a transfer parameters update. */
private handleParametersUpdate(chunk: Chunk): boolean {
let retransmit = true;
if (chunk.hasType()) {
retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT;
}
if (chunk.getOffset() > this.data.length) {
// Bad offset; terminate the transfer.
console.error(
`Transfer ${
this.id
}: server requested invalid offset ${chunk.getOffset()} (size ${
this.data.length
})`
);
this.sendError(Status.OUT_OF_RANGE);
return false;
}
if (chunk.getPendingBytes() === 0) {
console.error(
`Transfer ${this.id}: service requested 0 bytes (invalid); aborting`
);
this.sendError(Status.INTERNAL);
return false;
}
if (retransmit) {
// Check whether the client has sent a previous data offset, which
// indicates that some chunks were lost in transmission.
if (chunk.getOffset() < this.offset) {
console.debug(
`Write transfer ${
this.id
} rolling back to offset ${chunk.getOffset()} from ${this.offset}`
);
}
this.offset = chunk.getOffset();
// Retransmit is the default behavior for older versions of the
// transfer protocol. The window_end_offset field is not guaranteed
// to be set in these version, so it must be calculated.
const maxBytesToSend = Math.min(
chunk.getPendingBytes(),
this.data.length - this.offset
);
this.windowEndOffset = this.offset + maxBytesToSend;
} else {
// Extend the window to the new end offset specified by the server.
this.windowEndOffset = Math.min(
chunk.getWindowEndOffset(),
this.data.length
);
}
if (chunk.hasMaxChunkSizeBytes()) {
this.maxChunkSize = chunk.getMaxChunkSizeBytes();
}
if (chunk.hasMinDelayMicroseconds()) {
this.chunkDelayMicroS = chunk.getMinDelayMicroseconds();
}
return true;
}
/** Returns the next Chunk message to send in the data transfer. */
private nextChunk(): Chunk {
const chunk = new Chunk();
chunk.setTransferId(this.id);
chunk.setOffset(this.offset);
chunk.setType(Chunk.Type.DATA);
const maxBytesInChunk = Math.min(
this.maxChunkSize,
this.windowEndOffset - this.offset
);
chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
// Mark the final chunk of the transfer.
if (this.data.length - this.offset <= maxBytesInChunk) {
chunk.setRemainingBytes(0);
}
return chunk;
}
protected retryAfterTimeout(): void {
this.sendChunk(this.lastChunk);
}
}