blob: 15e3cd321e1213e3ad63102707c905a3cd0418bf [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
import static java.lang.Math.max;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
class ReadTransfer extends Transfer<byte[]> {
private static final Logger logger = Logger.forClass(ReadTransfer.class);
// The fractional position within a window at which a receive transfer should
// extend its window size to minimize the amount of time the transmitter
// spends blocked.
//
// For example, a divisor of 2 will extend the window when half of the
// requested data has been received, a divisor of three will extend at a third
// of the window, and so on.
private static final int EXTEND_WINDOW_DIVISOR = 2;
// To minimize copies, store the ByteBuffers directly from the chunk protos in a list.
private final List<ByteBuffer> dataChunks = new ArrayList<>();
private int totalDataSize = 0;
private final TransferParameters parameters;
private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE;
private int offset = 0;
private int windowEndOffset = 0;
private int lastReceivedOffset = 0;
ReadTransfer(int resourceId,
ProtocolVersion desiredProtocolVersion,
TransferInterface transferManager,
TransferTimeoutSettings timeoutSettings,
TransferParameters transferParameters,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(resourceId,
desiredProtocolVersion,
transferManager,
timeoutSettings,
progressCallback,
shouldAbortCallback);
this.parameters = transferParameters;
this.windowEndOffset = parameters.maxPendingBytes();
}
@Override
State getWaitingForDataState() {
return new ReceivingData();
}
@Override
void prepareInitialChunk(VersionedChunk.Builder chunk) {
setTransferParameters(chunk);
}
@Override
VersionedChunk getChunkForRetry() {
VersionedChunk chunk = getLastChunkSent();
// If the last chunk sent was transfer parameters, send an updated RETRANSMIT chunk.
if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE
|| chunk.type() == Chunk.Type.PARAMETERS_RETRANSMIT) {
return prepareTransferParameters(/*extend=*/false);
}
return chunk;
}
private class ReceivingData extends ActiveState {
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
// Track the last seen offset so the DropRecovery state can detect retried packets.
lastReceivedOffset = chunk.offset();
if (chunk.offset() != offset) {
logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
ReadTransfer.this,
offset,
chunk.offset());
// For now, only in-order transfers are supported. If data is received out of order,
// discard this data and retransmit from the last received offset.
sendChunk(prepareTransferParameters(/*extend=*/false));
changeState(new DropRecovery());
setNextChunkTimeout();
return;
}
// Add the underlying array(s) to a list to avoid making copies of the data.
dataChunks.addAll(chunk.data().asReadOnlyByteBufferList());
totalDataSize += chunk.data().size();
offset += chunk.data().size();
if (chunk.remainingBytes().isPresent()) {
if (chunk.remainingBytes().getAsLong() == 0) {
setStateTerminatingAndSendFinalChunk(Status.OK);
return;
}
remainingTransferSize = chunk.remainingBytes().getAsLong();
} else if (remainingTransferSize != UNKNOWN_TRANSFER_SIZE) {
// If remaining size was not specified, update based on the most recent estimate, if any.
remainingTransferSize = max(remainingTransferSize - chunk.data().size(), 0);
}
if (remainingTransferSize == UNKNOWN_TRANSFER_SIZE || remainingTransferSize == 0) {
updateProgress(offset, offset, UNKNOWN_TRANSFER_SIZE);
} else {
updateProgress(offset, offset, offset + remainingTransferSize);
}
int remainingWindowSize = windowEndOffset - offset;
boolean extendWindow =
remainingWindowSize <= parameters.maxPendingBytes() / EXTEND_WINDOW_DIVISOR;
if (remainingWindowSize == 0) {
logger.atFinest().log(
"%s received all pending bytes; sending transfer parameters update", ReadTransfer.this);
sendChunk(prepareTransferParameters(/*extend=*/false));
} else if (extendWindow) {
sendChunk(prepareTransferParameters(/*extend=*/true));
}
setNextChunkTimeout();
}
}
/** State for recovering from dropped packets. */
private class DropRecovery extends ActiveState {
@Override
public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
if (chunk.offset() == offset) {
logger.atFine().log(
"%s received expected offset %d, resuming transfer", ReadTransfer.this, offset);
changeState(new ReceivingData()).handleDataChunk(chunk);
return;
}
// To avoid a flood of identical parameters packets, only send one if a retry is detected.
if (chunk.offset() == lastReceivedOffset) {
logger.atFiner().log(
"%s received repeated offset %d: retry detected, resending transfer parameters",
ReadTransfer.this,
lastReceivedOffset);
sendChunk(prepareTransferParameters(/*extend=*/false));
} else {
lastReceivedOffset = chunk.offset();
logger.atFiner().log("%s expecting offset %d, ignoring received offset %d",
ReadTransfer.this,
offset,
chunk.offset());
}
setNextChunkTimeout();
}
}
@Override
void setFutureResult() {
updateProgress(totalDataSize, totalDataSize, totalDataSize);
ByteBuffer result = ByteBuffer.allocate(totalDataSize);
dataChunks.forEach(result::put);
getFuture().set(result.array());
}
private VersionedChunk prepareTransferParameters(boolean extend) {
windowEndOffset = offset + parameters.maxPendingBytes();
Chunk.Type type = extend ? Chunk.Type.PARAMETERS_CONTINUE : Chunk.Type.PARAMETERS_RETRANSMIT;
return setTransferParameters(newChunk(type)).build();
}
private VersionedChunk.Builder setTransferParameters(VersionedChunk.Builder chunk) {
chunk.setWindowEndOffset(offset + parameters.maxPendingBytes())
.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
.setOffset(offset)
.setWindowEndOffset(windowEndOffset);
if (parameters.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds());
}
return chunk;
}
}