blob: c2885d51e8a8fc56271de35e61645224ee462516 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static java.lang.Math.min;
import com.google.protobuf.ByteString;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
class WriteTransfer extends Transfer<Void> {
private static final Logger logger = Logger.forClass(WriteTransfer.class);
// Short chunk delays often turn into much longer delays. Ignore delays <10ms to avoid impacting
// performance.
private static final int MIN_CHUNK_DELAY_TO_SLEEP_MICROS = 10000;
private int maxChunkSizeBytes = 0;
private int minChunkDelayMicros = 0;
private int sentOffset;
private long totalDroppedBytes;
private Chunk lastChunk;
private final byte[] data;
protected WriteTransfer(int resourceId,
TransferInterface transferManager,
int timeoutMillis,
int initialTimeoutMillis,
int maxRetries,
byte[] data,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(resourceId,
transferManager,
timeoutMillis,
initialTimeoutMillis,
maxRetries,
progressCallback,
shouldAbortCallback);
this.data = data;
this.lastChunk = newChunk(Chunk.Type.START)
.setResourceId(getSessionId())
.setRemainingBytes(data.length)
.build();
}
@Override
State getWaitingForDataState() {
return new WaitingForTransferParameters();
}
private class WaitingForTransferParameters extends State {
@Override
void handleTimeout() {
Recovery recoveryState = new Recovery();
setState(recoveryState);
recoveryState.handleTimeout();
}
@Override
void handleDataChunk(Chunk chunk) {
updateTransferParameters(chunk);
}
}
/** Transmitting a transfer window. */
private class Transmitting extends State {
private final int windowStartOffset;
private final int windowEndOffset;
Transmitting(int windowStartOffset, int windowEndOffset) {
this.windowStartOffset = windowStartOffset;
this.windowEndOffset = windowEndOffset;
}
@Override
void handleDataChunk(Chunk chunk) {
updateTransferParameters(chunk);
}
@Override
void handleTimeout() {
ByteString chunkData = ByteString.copyFrom(
data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));
logger.atFiner().log("Transfer %d: sending bytes %d-%d (%d B chunk, max size %d B)",
getSessionId(),
sentOffset,
sentOffset + chunkData.size() - 1,
chunkData.size(),
maxChunkSizeBytes);
Chunk chunkToSend = buildDataChunk(chunkData);
// If there's a timeout, resending this will trigger a transfer parameters update.
lastChunk = chunkToSend;
if (!sendChunk(chunkToSend)) {
setState(new Completed());
return;
}
sentOffset += chunkData.size();
updateProgress(sentOffset, windowStartOffset, data.length);
if (sentOffset < windowEndOffset) {
setTimeoutMicros(minChunkDelayMicros);
return; // Keep transmitting packets
}
setNextChunkTimeout();
setState(new WaitingForTransferParameters());
}
}
@Override
Chunk getChunkForRetry() {
// The service should resend transfer parameters if there was a timeout. In case the service
// doesn't support timeouts and to avoid unnecessary waits, resend the last chunk. If there
// were drops, this will trigger a transfer parameters update.
return lastChunk;
}
@Override
void setFutureResult() {
updateProgress(data.length, data.length, data.length);
getFuture().set(null);
}
private void updateTransferParameters(Chunk chunk) {
logger.atFiner().log("Transfer %d received new chunk (type=%s, offset=%d, windowEndOffset=%d)",
getSessionId(),
chunk.getType(),
chunk.getOffset(),
chunk.getWindowEndOffset());
if (chunk.getOffset() > data.length) {
sendFinalChunk(Status.OUT_OF_RANGE);
setState(new Completed());
return;
}
int windowEndOffset = getWindowEndOffset(chunk, data.length);
if (isRetransmit(chunk)) {
long droppedBytes = sentOffset - chunk.getOffset();
if (droppedBytes > 0) {
totalDroppedBytes += droppedBytes;
logger.atFine().log("Transfer %d retransmitting %d B (%d retransmitted of %d sent)",
getSessionId(),
droppedBytes,
totalDroppedBytes,
sentOffset);
}
sentOffset = (int) chunk.getOffset();
} else if (windowEndOffset <= sentOffset) {
logger.atFiner().log("Transfer %d: ignoring old rolling window packet", getSessionId());
setNextChunkTimeout();
return; // Received an old rolling window packet, ignore it.
}
// Update transfer parameters if they're set.
if (chunk.hasMaxChunkSizeBytes()) {
maxChunkSizeBytes = chunk.getMaxChunkSizeBytes();
}
if (chunk.hasMinDelayMicroseconds()) {
if (chunk.getMinDelayMicroseconds() > MIN_CHUNK_DELAY_TO_SLEEP_MICROS) {
minChunkDelayMicros = chunk.getMinDelayMicroseconds();
}
}
if (maxChunkSizeBytes == 0) {
if (windowEndOffset == sentOffset) {
logger.atWarning().log(
"Server requested 0 bytes in write transfer %d; aborting", getSessionId());
sendFinalChunk(Status.INVALID_ARGUMENT);
setState(new Completed());
return;
}
// Default to sending the entire window if the max chunk size is not specified (or is 0).
maxChunkSizeBytes = windowEndOffset - sentOffset;
}
Transmitting transmittingState = new Transmitting((int) chunk.getOffset(), windowEndOffset);
setState(transmittingState);
transmittingState.handleTimeout(); // Immediately send the first packet
}
private static boolean isRetransmit(Chunk chunk) {
// Retransmit is the default behavior for older versions of the transfer protocol, which don't
// have a type field.
return !chunk.hasType()
|| (chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT)
|| chunk.getType().equals(Chunk.Type.START));
}
private static int getWindowEndOffset(Chunk chunk, int dataLength) {
if (isRetransmit(chunk)) {
// A retransmit chunk may use an older version of the transfer protocol, in which the
// window_end_offset field is not guaranteed to exist.
return min((int) chunk.getOffset() + chunk.getPendingBytes(), dataLength);
}
return min(chunk.getWindowEndOffset(), dataLength);
}
private Chunk buildDataChunk(ByteString chunkData) {
Chunk.Builder chunk = newChunk(Chunk.Type.DATA).setOffset(sentOffset).setData(chunkData);
// If this is the last data chunk, setRemainingBytes to 0.
if (sentOffset + chunkData.size() == data.length) {
logger.atFiner().log(
"Transfer %d sending final chunk with %d B", getSessionId(), chunkData.size());
chunk.setRemainingBytes(0);
}
return chunk.build();
}
}