blob: daf5213f769859463fc964d6bb30e1b126c9508f [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static java.lang.Math.min;
import com.google.protobuf.ByteString;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
class WriteTransfer extends Transfer<Void> {
private static final Logger logger = Logger.forClass(WriteTransfer.class);
// Short chunk delays often turn into much longer delays. Ignore delays <10ms to avoid impacting
// performance.
private static final int MIN_CHUNK_DELAY_TO_SLEEP_MICROS = 10000;
private int maxChunkSizeBytes = 0;
private int minChunkDelayMicros = 0;
private int sentOffset;
private long totalDroppedBytes;
private final byte[] data;
protected WriteTransfer(int resourceId,
ProtocolVersion desiredProtocolVersion,
TransferInterface transferManager,
TransferTimeoutSettings timeoutSettings,
byte[] data,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(resourceId,
desiredProtocolVersion,
transferManager,
timeoutSettings,
progressCallback,
shouldAbortCallback);
this.data = data;
}
@Override
void prepareInitialChunk(VersionedChunk.Builder chunk) {
chunk.setRemainingBytes(data.length);
}
@Override
State getWaitingForDataState() {
return new WaitingForTransferParameters();
}
private class WaitingForTransferParameters extends ActiveState {
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
updateTransferParameters(chunk);
}
}
/** Transmitting a transfer window. */
private class Transmitting extends ActiveState {
private final int windowStartOffset;
private final int windowEndOffset;
Transmitting(int windowStartOffset, int windowEndOffset) {
this.windowStartOffset = windowStartOffset;
this.windowEndOffset = windowEndOffset;
}
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
updateTransferParameters(chunk);
}
@Override
public void handleTimeout() throws TransferAbortedException {
ByteString chunkData = ByteString.copyFrom(
data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));
if (VERBOSE_LOGGING) {
logger.atFinest().log("%s sending bytes %d-%d (%d B chunk, max size %d B)",
WriteTransfer.this,
sentOffset,
sentOffset + chunkData.size() - 1,
chunkData.size(),
maxChunkSizeBytes);
}
sendChunk(buildDataChunk(chunkData));
sentOffset += chunkData.size();
updateProgress(sentOffset, windowStartOffset, data.length);
if (sentOffset < windowEndOffset) {
setTimeoutMicros(minChunkDelayMicros);
return; // Keep transmitting packets
}
setNextChunkTimeout();
changeState(new WaitingForTransferParameters());
}
}
@Override
VersionedChunk getChunkForRetry() {
// The service should resend transfer parameters if there was a timeout. In case the service
// doesn't support timeouts and to avoid unnecessary waits, resend the last chunk. If there
// were drops, this will trigger a transfer parameters update.
return getLastChunkSent();
}
@Override
void setFutureResult() {
updateProgress(data.length, data.length, data.length);
getFuture().set(null);
}
private void updateTransferParameters(VersionedChunk chunk) throws TransferAbortedException {
logger.atFiner().log("%s received new chunk %s", this, chunk);
if (chunk.offset() > data.length) {
setStateTerminatingAndSendFinalChunk(Status.OUT_OF_RANGE);
return;
}
int windowEndOffset = min(chunk.windowEndOffset(), data.length);
if (chunk.requestsTransmissionFromOffset()) {
long droppedBytes = sentOffset - chunk.offset();
if (droppedBytes > 0) {
totalDroppedBytes += droppedBytes;
logger.atFine().log("%s retransmitting %d B (%d retransmitted of %d sent)",
this,
droppedBytes,
totalDroppedBytes,
sentOffset);
}
sentOffset = chunk.offset();
} else if (windowEndOffset <= sentOffset) {
logger.atFiner().log("%s ignoring old rolling window packet", this);
setNextChunkTimeout();
return; // Received an old rolling window packet, ignore it.
}
// Update transfer parameters if they're set.
chunk.maxChunkSizeBytes().ifPresent(size -> maxChunkSizeBytes = size);
chunk.minDelayMicroseconds().ifPresent(delay -> {
if (delay > MIN_CHUNK_DELAY_TO_SLEEP_MICROS) {
minChunkDelayMicros = delay;
}
});
if (maxChunkSizeBytes == 0) {
if (windowEndOffset == sentOffset) {
logger.atWarning().log("%s server requested 0 bytes; aborting", this);
setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
return;
}
// Default to sending the entire window if the max chunk size is not specified (or is 0).
maxChunkSizeBytes = windowEndOffset - sentOffset;
}
// Enter the transmitting state and immediately send the first packet
changeState(new Transmitting(chunk.offset(), windowEndOffset)).handleTimeout();
}
private VersionedChunk buildDataChunk(ByteString chunkData) {
VersionedChunk.Builder chunk =
newChunk(Chunk.Type.DATA).setOffset(sentOffset).setData(chunkData);
// If this is the last data chunk, setRemainingBytes to 0.
if (sentOffset + chunkData.size() == data.length) {
logger.atFiner().log("%s sending final chunk with %d B", this, chunkData.size());
chunk.setRemainingBytes(0);
}
return chunk.build();
}
}