blob: 208dbfb3fab5b677e4eb91dfe13d513c11dcdcb8 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
import static java.lang.Math.max;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
class ReadTransfer extends Transfer<byte[]> {
private static final Logger logger = Logger.forClass(ReadTransfer.class);
// The fractional position within a window at which a receive transfer should
// extend its window size to minimize the amount of time the transmitter
// spends blocked.
//
// For example, a divisor of 2 will extend the window when half of the
// requested data has been received, a divisor of three will extend at a third
// of the window, and so on.
private static final int EXTEND_WINDOW_DIVISOR = 2;
// To minimize copies, store the ByteBuffers directly from the chunk protos in a list.
private final List<ByteBuffer> dataChunks = new ArrayList<>();
private int totalDataSize = 0;
private final TransferParameters parameters;
private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE;
private int offset = 0;
private int pendingBytes;
private int windowEndOffset;
ReadTransfer(int id,
ChunkSender sendChunk,
Consumer<Integer> endTransfer,
Timer timer,
int timeoutMillis,
int initialTimeoutMillis,
int maxRetries,
TransferParameters transferParameters,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(id,
sendChunk,
endTransfer,
timer,
timeoutMillis,
initialTimeoutMillis,
maxRetries,
progressCallback,
shouldAbortCallback);
this.parameters = transferParameters;
this.pendingBytes = parameters.maxPendingBytes();
}
@Override
Chunk getInitialChunk() {
return prepareTransferParameters(/*extend=*/false).build();
}
@Override
void retryAfterTimeout() {
sendChunk(prepareTransferParameters(/*extend=*/false));
}
@Override
synchronized boolean handleDataChunk(Chunk chunk) {
if (chunk.getOffset() != offset) {
logger.atFine().log(
"Transfer %d expected offset %d, received %d; resending transfer parameters",
getId(),
offset,
chunk.getOffset());
// For now, only in-order transfers are supported. If data is received out of order,
// discard this data and retransmit from the last received offset.
sendChunk(prepareTransferParameters(/*extend=*/false));
return true;
}
// Add the underlying array(s) to a list to avoid making copies of the data.
dataChunks.addAll(chunk.getData().asReadOnlyByteBufferList());
totalDataSize += chunk.getData().size();
offset += chunk.getData().size();
pendingBytes -= chunk.getData().size();
if (chunk.hasRemainingBytes()) {
if (chunk.getRemainingBytes() == 0) {
sendFinalChunk(Status.OK);
return true;
}
remainingTransferSize = chunk.getRemainingBytes();
} else if (remainingTransferSize != UNKNOWN_TRANSFER_SIZE) {
// If the remaining size was not specified, update based on the most recent estimate, if any.
remainingTransferSize = max(remainingTransferSize - chunk.getData().size(), 0);
}
if (remainingTransferSize == UNKNOWN_TRANSFER_SIZE || remainingTransferSize == 0) {
updateProgress(offset, offset, UNKNOWN_TRANSFER_SIZE);
} else {
updateProgress(offset, offset, offset + remainingTransferSize);
}
int remainingWindowSize = windowEndOffset - offset;
boolean extendWindow =
remainingWindowSize <= parameters.maxPendingBytes() / EXTEND_WINDOW_DIVISOR;
if (pendingBytes == 0) {
logger.atFiner().log(
"Transfer %d received all pending bytes; sending transfer parameters update", getId());
sendChunk(prepareTransferParameters(/*extend=*/false));
} else if (extendWindow) {
sendChunk(prepareTransferParameters(/*extend=*/true));
}
return true;
}
@Override
synchronized void setFutureResult() {
updateProgress(totalDataSize, totalDataSize, totalDataSize);
ByteBuffer result = ByteBuffer.allocate(totalDataSize);
dataChunks.forEach(result::put);
getFuture().set(result.array());
}
private synchronized Chunk.Builder prepareTransferParameters(boolean extend) {
// TODO(frolv): Remove the pendingBytes field.
pendingBytes = parameters.maxPendingBytes();
windowEndOffset = offset + parameters.maxPendingBytes();
Chunk.Type type = extend ? Chunk.Type.PARAMETERS_CONTINUE : Chunk.Type.PARAMETERS_RETRANSMIT;
Chunk.Builder chunk = newChunk(type)
.setPendingBytes(pendingBytes)
.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
.setOffset(offset)
.setWindowEndOffset(windowEndOffset);
if (parameters.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds());
}
return chunk;
}
}