blob: 40feed24b30b42e8a69d94e6478b7399a87c4fca [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import com.google.protobuf.ByteString;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import java.util.Timer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
class WriteTransfer extends Transfer<Void> {
private static final Logger logger = Logger.forClass(WriteTransfer.class);
// Short chunk delays often turn into much longer delays. Ignore delays <10ms to avoid impacting
// performance.
private static final int MIN_CHUNK_DELAY_TO_SLEEP_MICROS = 10000;
private final AtomicInteger receivedOffset = new AtomicInteger();
private int maxChunkSizeBytes = 0;
private int minChunkDelayMicros = 0;
private int sentOffset;
private long totalDroppedBytes;
private Chunk lastChunk;
private final byte[] data;
protected WriteTransfer(int id,
ChunkSender sendChunk,
Consumer<Integer> endTransfer,
Timer timer,
int timeoutMillis,
int initialTimeoutMillis,
int maxRetries,
byte[] data,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
super(id,
sendChunk,
endTransfer,
timer,
timeoutMillis,
initialTimeoutMillis,
maxRetries,
progressCallback,
shouldAbortCallback);
this.data = data;
this.lastChunk = getInitialChunk();
}
@Override
synchronized Chunk getInitialChunk() {
return newChunk(Chunk.Type.TRANSFER_START)
.setResourceId(getId())
.setRemainingBytes(data.length)
.build();
}
@Override
boolean handleDataChunk(Chunk chunk) {
// Track the window count and abandon this window if another transfer parameters update arrives
// while handling the current one. This outer method is not synchronized to enable the more
// recent thread to coordinate without blocking via the atomic int. Then any other concurrently
// executing handleChunk() instances can bail once they finish with their current chunk.
logger.atFiner().log("Transfer %d received new chunk (type=%s, offset=%d, windowEndOffset=%d)",
getId(),
chunk.getType(),
chunk.getOffset(),
chunk.getWindowEndOffset());
// The chunk's offset is always the largest offset received by the device.
// Record the largest offset seen to avoid backtracking if a stale retransmit packet arrives.
receivedOffset.updateAndGet(currentValue -> max(currentValue, (int) chunk.getOffset()));
if (windowIsStale(chunk)) {
return false;
}
return doHandleDataChunk(chunk);
}
private synchronized boolean doHandleDataChunk(Chunk chunk) {
if (chunk.getOffset() > data.length) {
sendFinalChunk(Status.OUT_OF_RANGE);
return true;
}
// Check if a newer chunk has arrived while this thread waited to acquire the lock.
if (windowIsStale(chunk)) {
return false;
}
int windowEndOffset = getWindowEndOffset(chunk, data.length);
if (isRetransmit(chunk)) {
long droppedBytes = sentOffset - chunk.getOffset();
if (droppedBytes > 0) {
totalDroppedBytes += droppedBytes;
logger.atFine().log("Transfer %d retransmitting %d B (%d retransmitted of %d sent)",
getId(),
droppedBytes,
totalDroppedBytes,
sentOffset);
}
sentOffset = (int) chunk.getOffset();
} else if (windowEndOffset <= sentOffset) {
logger.atFiner().log("Transfer %d: ignoring old rolling window packet", getId());
return true; // Received an old rolling window packet, ignore it.
}
// Update transfer parameters if they're set.
if (chunk.hasMaxChunkSizeBytes()) {
maxChunkSizeBytes = chunk.getMaxChunkSizeBytes();
}
if (chunk.hasMinDelayMicroseconds()) {
minChunkDelayMicros = chunk.getMinDelayMicroseconds();
}
if (maxChunkSizeBytes == 0) {
if (windowEndOffset == sentOffset) {
logger.atWarning().log("Server requested 0 bytes in write transfer %d; aborting", getId());
sendFinalChunk(Status.INVALID_ARGUMENT);
return true;
}
// Default to sending the entire window if the max chunk size is not specified (or is 0).
maxChunkSizeBytes = windowEndOffset - sentOffset;
}
Chunk chunkToSend;
do {
// Pause for the minimum delay, if requested by the server.
if (minChunkDelayMicros > MIN_CHUNK_DELAY_TO_SLEEP_MICROS) {
try {
MICROSECONDS.sleep(minChunkDelayMicros);
} catch (InterruptedException e) {
// Ignore the exception. It shouldn't matter if this is interrupted.
}
}
if (handleCancellation()) {
return true;
}
if (windowIsStale(chunk)) {
// The false return prevents the surrounding transfer from scheduling the timeout, which
// avoids unnecessary timeout scheduling. The interrupting thread/chunk will schedule a
// timeout instead.
return false;
}
ByteString chunkData = ByteString.copyFrom(
data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));
logger.atFiner().log("Transfer %d: sending bytes %d-%d (%d B chunk, max size %d B)",
getId(),
sentOffset,
sentOffset + chunkData.size() - 1,
chunkData.size(),
maxChunkSizeBytes);
chunkToSend = buildDataChunk(chunkData);
// If there's a timeout, resending this will trigger a transfer parameters update.
lastChunk = chunkToSend;
if (!sendChunk(chunkToSend)) {
return true;
}
sentOffset += chunkData.size();
updateProgress(sentOffset, chunk.getOffset(), data.length);
} while (sentOffset < windowEndOffset);
return true;
}
private boolean windowIsStale(Chunk chunk) {
int newReceivedOffset = receivedOffset.get();
if (chunk.getOffset() < newReceivedOffset) {
logger.atFine().log(
"Transfer %d: abandoning stale write window (old offset %d, new offset %d)",
getId(),
chunk.getOffset(),
newReceivedOffset);
return true;
}
return false;
}
@Override
void retryAfterTimeout() {
// The service should resend transfer parameters if there was a timeout. In case the service
// doesn't support timeouts and to avoid unnecessary waits, resend the last chunk. If there
// were drops, this will trigger a transfer parameters update.
sendChunk(lastChunk);
}
@Override
void setFutureResult() {
updateProgress(data.length, data.length, data.length);
getFuture().set(null);
}
private static boolean isRetransmit(Chunk chunk) {
// Retransmit is the default behavior for older versions of the transfer protocol, which don't
// have a type field.
return !chunk.hasType()
|| (chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT)
|| chunk.getType().equals(Chunk.Type.TRANSFER_START));
}
private static int getWindowEndOffset(Chunk chunk, int dataLength) {
if (isRetransmit(chunk)) {
// A retransmit chunk may use an older version of the transfer protocol, in which the
// window_end_offset field is not guaranteed to exist.
return min((int) chunk.getOffset() + chunk.getPendingBytes(), dataLength);
}
return min(chunk.getWindowEndOffset(), dataLength);
}
private Chunk buildDataChunk(ByteString chunkData) {
Chunk.Builder chunk =
newChunk(Chunk.Type.TRANSFER_DATA).setOffset(sentOffset).setData(chunkData);
// If this is the last data chunk, setRemainingBytes to 0.
if (sentOffset + chunkData.size() == data.length) {
logger.atFiner().log("Transfer %d sending final chunk with %d B", getId(), chunkData.size());
chunk.setRemainingBytes(0);
}
return chunk.build();
}
}