blob: a9c9fe734d04149c5d02be3b8083f3ba364c3755 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import dev.pigweed.pw_rpc.ChannelOutputException;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_rpc.TestClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
public final class ManagerTest {
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
private static final int CHANNEL_ID = 1;
private static final String SERVICE = "pw.transfer.Transfer";
private static final ByteString TEST_DATA_SHORT = ByteString.copyFromUtf8("O_o");
private static final ByteString TEST_DATA_100B = range(0, 100);
private static final TransferParameters TRANSFER_PARAMETERS =
TransferParameters.create(50, 30, 0);
private static final int MAX_RETRIES = 2;
private static final int ID = 123;
private boolean shouldAbortFlag = false;
private TestClient client;
Manager manager;
@Mock private Consumer<TransferProgress> progressCallback;
@Captor private ArgumentCaptor<TransferProgress> progress;
@Before
public void setup() {
setPlatformTransferEnabled();
client = new TestClient(ImmutableList.of(TransferService.get()));
manager = createManager(60000, 60000); // Default to a long timeout that should never trigger.
}
@Test
public void read_singleChunk_successful() throws Exception {
ListenableFuture<byte[]> future = manager.read(1);
assertThat(future.isDone()).isFalse();
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0));
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_failedPreconditionError_retries() throws Exception {
ListenableFuture<byte[]> future = manager.read(1, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build());
client.receiveServerError(SERVICE, "Read", Status.FAILED_PRECONDITION);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0));
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_failedPreconditionErrorMaxRetriesTimes_aborts() {
ListenableFuture<byte[]> future = manager.read(1, TRANSFER_PARAMETERS);
for (int i = 0; i < MAX_RETRIES; ++i) {
client.receiveServerError(SERVICE, "Read", Status.FAILED_PRECONDITION);
}
Chunk initialChunk = newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build();
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
client.receiveServerError(SERVICE, "Read", Status.FAILED_PRECONDITION);
assertThat(lastChunks()).isEmpty();
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
ListenableFuture<byte[]> future = manager.read(1);
assertThat(future.isDone()).isFalse();
receiveReadChunks(finalChunk(2, Status.OK),
newChunk(Chunk.Type.TRANSFER_DATA, 0)
.setOffset(0)
.setData(TEST_DATA_100B)
.setRemainingBytes(0),
newChunk(Chunk.Type.TRANSFER_DATA, 3)
.setOffset(0)
.setData(TEST_DATA_100B)
.setRemainingBytes(0));
receiveWriteChunks(finalChunk(1, Status.OK),
newChunk(Chunk.Type.TRANSFER_DATA, 1)
.setOffset(0)
.setData(TEST_DATA_100B)
.setRemainingBytes(0),
newChunk(Chunk.Type.TRANSFER_DATA, 2)
.setOffset(0)
.setData(TEST_DATA_100B)
.setRemainingBytes(0));
assertThat(future.isDone()).isFalse();
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_empty() throws Exception {
ListenableFuture<byte[]> future = manager.read(2);
lastChunks(); // Discard initial chunk (tested elsewhere)
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 2).setRemainingBytes(0));
assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK));
assertThat(future.get()).isEqualTo(new byte[] {});
}
@Test
public void read_sendsTransferParametersFirst() {
ListenableFuture<byte[]> future = manager.read(99, TransferParameters.create(3, 2, 1));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
.setPendingBytes(3)
.setWindowEndOffset(3)
.setMaxChunkSizeBytes(2)
.setMinDelayMicroseconds(1)
.setOffset(0)
.build());
assertThat(future.cancel(true)).isTrue();
}
@Test
public void read_severalChunks() throws Exception {
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(0)
.setData(range(0, 20))
.setRemainingBytes(70),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 40)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(40)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(90)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(40).setData(range(40, 70)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(70)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(120)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(70)
.setData(range(70, 100))
.setRemainingBytes(0));
assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
public void read_progressCallbackIsCalled() {
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS, progressCallback);
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)),
newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(50)
.setData(range(50, 60))
.setRemainingBytes(5),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(60).setData(range(60, 70)),
newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(70)
.setData(range(70, 80))
.setRemainingBytes(20),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(90).setData(range(90, 100)),
newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
.containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(60, 60, 65),
TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(80, 80, 100),
TransferProgress.create(100, 100, 100));
assertThat(future.isDone()).isTrue();
}
@Test
public void read_rewindWhenPacketsSkipped() throws Exception {
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(30, 50)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80)
.setMaxChunkSizeBytes(30)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(50)
.setPendingBytes(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(30)
.build());
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)),
newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130)
.setMaxChunkSizeBytes(30)
.build(),
finalChunk(ID, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
public void read_multipleWithSameId_sequentially_successful() throws Exception {
for (int i = 0; i < 3; ++i) {
ListenableFuture<byte[]> future = manager.read(1);
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
}
@Test
public void read_multipleWithSameId_atSameTime_failsImmediately() {
ListenableFuture<byte[]> first = manager.read(123);
ListenableFuture<byte[]> second = manager.read(123);
assertThat(first.isDone()).isFalse();
assertThat(second.isDone()).isTrue();
ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void read_sendErrorOnFirstPacket_failsImmediately() {
ChannelOutputException exception = new ChannelOutputException("blah");
client.setChannelOutputException(exception);
ListenableFuture<byte[]> future = manager.read(123);
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void read_sendErrorOnLaterPacket_aborts() {
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS);
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 20)));
ChannelOutputException exception = new ChannelOutputException("blah");
client.setChannelOutputException(exception);
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 50)));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void read_cancelFuture_abortsTransfer() {
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS);
assertThat(future.cancel(true)).isTrue();
receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
}
@Test
public void read_protocolError_aborts() {
ListenableFuture<byte[]> future = manager.read(ID);
receiveReadChunks(finalChunk(ID, Status.ALREADY_EXISTS));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void read_rpcError_aborts() {
ListenableFuture<byte[]> future = manager.read(2);
client.receiveServerError(SERVICE, "Read", Status.NOT_FOUND);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void read_timeout() {
manager = createManager(1, 1); // Create a manager with a very short timeout.
ListenableFuture<byte[]> future = manager.read(ID, TRANSFER_PARAMETERS);
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
// read should have retried sending the transfer parameters 2 times, for a total of 3
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
finalChunk(ID, Status.DEADLINE_EXCEEDED));
}
@Test
public void write_singleChunk() throws Exception {
ListenableFuture<Void> future = manager.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_platformTransferDisabled_aborted() {
ListenableFuture<Void> future = manager.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
setPlatformTransferDisabled();
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED);
}
@Test
public void write_failedPreconditionError_retries() throws Exception {
ListenableFuture<Void> future = manager.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size()));
client.receiveServerError(SERVICE, "Write", Status.FAILED_PRECONDITION);
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_failedPreconditionErrorMaxRetriesTimes_aborts() {
ListenableFuture<Void> future = manager.write(2, TEST_DATA_SHORT.toByteArray());
for (int i = 0; i < MAX_RETRIES; ++i) {
client.receiveServerError(SERVICE, "Write", Status.FAILED_PRECONDITION);
}
Chunk initialChunk = initialWriteChunk(2, 2, TEST_DATA_SHORT.size());
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
client.receiveServerError(SERVICE, "Write", Status.FAILED_PRECONDITION);
assertThat(lastChunks()).isEmpty();
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void write_empty() throws Exception {
ListenableFuture<Void> future = manager.write(2, new byte[] {});
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(2, Status.OK));
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, 0));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_severalChunks() throws Exception {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(50)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 75)).build(),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(75).setData(range(75, 90)).build());
receiveWriteChunks(
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_parametersContinue() throws Exception {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80));
// Transfer doesn't roll back to offset 30 but instead continues sending up to 80.
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0)
.build());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(90)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 90)).build());
receiveWriteChunks(
// This stale packet with a window end before the offset should be ignored.
newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(25)
.setPendingBytes(25)
.setWindowEndOffset(50),
// Start from an arbitrary offset before the current, but extend the window to the end.
newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_progressCallbackIsCalled() {
ListenableFuture<Void> future =
manager.write(ID, TEST_DATA_100B.toByteArray(), progressCallback);
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setMaxChunkSizeBytes(30),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(50).setPendingBytes(50),
finalChunk(ID, Status.OK));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
.containsExactly(TransferProgress.create(30, 0, 100),
TransferProgress.create(60, 0, 100),
TransferProgress.create(90, 0, 100),
TransferProgress.create(80, 50, 100),
TransferProgress.create(100, 50, 100),
TransferProgress.create(100, 100, 100));
assertThat(future.isDone()).isTrue();
}
@Test
public void write_asksForFinalOffset_sendsFinalPacket() {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(100)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()),
newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(100).setRemainingBytes(0).build());
assertThat(future.isDone()).isFalse();
}
@Test
public void write_multipleWithSameId_sequentially_successful() throws Exception {
for (int i = 0; i < 3; ++i) {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0).setPendingBytes(50),
finalChunk(ID, Status.OK));
future.get();
}
}
@Test
public void write_multipleWithSameId_atSameTime_failsImmediately() {
ListenableFuture<Void> first = manager.write(123, TEST_DATA_SHORT.toByteArray());
ListenableFuture<Void> second = manager.write(123, TEST_DATA_SHORT.toByteArray());
assertThat(first.isDone()).isFalse();
assertThat(second.isDone()).isTrue();
ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void write_sendErrorOnFirstPacket_failsImmediately() {
ChannelOutputException exception = new ChannelOutputException("blah");
client.setChannelOutputException(exception);
ListenableFuture<Void> future = manager.write(123, TEST_DATA_SHORT.toByteArray());
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void write_serviceRequestsNoData_aborts() throws Exception {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
}
@Test
public void write_invalidOffset_aborts() {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(101)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(
initialWriteChunk(ID, ID, TEST_DATA_100B.size()), finalChunk(ID, Status.OUT_OF_RANGE));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE);
}
@Test
public void write_sendErrorOnLaterPacket_aborts() {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
ChannelOutputException exception = new ChannelOutputException("blah");
client.setChannelOutputException(exception);
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void write_cancelFuture_abortsTransfer() {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_100B.toByteArray());
assertThat(future.cancel(true)).isTrue();
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(50));
assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
}
@Test
public void write_protocolError_aborts() {
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(finalChunk(ID, Status.NOT_FOUND));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND);
}
@Test
public void write_rpcError_aborts() {
ListenableFuture<Void> future = manager.write(2, TEST_DATA_SHORT.toByteArray());
client.receiveServerError(SERVICE, "Write", Status.NOT_FOUND);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void write_timeoutAfterInitialChunk() {
manager = createManager(1, 1); // Create a manager with a very short timeout.
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
// Client should have resent the last chunk (the initial chunk in this case) for each timeout.
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial
initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 1
initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 2
finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
}
@Test
public void write_timeoutAfterIntermediateChunk() {
manager = createManager(1, 1); // Create a manager with a very short timeout.
// Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
enqueueWriteChunks(2,
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setMaxChunkSizeBytes(30));
ListenableFuture<Void> future = manager.write(ID, TEST_DATA_SHORT.toByteArray());
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
Chunk data = newChunk(Chunk.Type.TRANSFER_DATA, ID)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0)
.build();
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial
data, // data chunk
data, // retry 1
data, // retry 2
finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
}
private static ByteString range(int startInclusive, int endExclusive) {
assertThat(startInclusive).isLessThan((int) Byte.MAX_VALUE);
assertThat(endExclusive).isLessThan((int) Byte.MAX_VALUE);
byte[] bytes = new byte[endExclusive - startInclusive];
for (byte i = 0; i < bytes.length; ++i) {
bytes[i] = (byte) (i + startInclusive);
}
return ByteString.copyFrom(bytes);
}
private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) {
return Chunk.newBuilder().setType(type).setSessionId(resourceId);
}
private static Chunk initialWriteChunk(int sessionId, int resourceId, int size) {
return newChunk(Chunk.Type.TRANSFER_START, sessionId)
.setResourceId(resourceId)
.setRemainingBytes(size)
.build();
}
private static Chunk finalChunk(int sessionId, Status status) {
return newChunk(Chunk.Type.TRANSFER_COMPLETION, sessionId).setStatus(status.code()).build();
}
private void receiveReadChunks(ChunkOrBuilder... chunks) {
client.receiveServerStream(SERVICE, "Read", chunks);
}
private void receiveWriteChunks(ChunkOrBuilder... chunks) {
client.receiveServerStream(SERVICE, "Write", chunks);
}
/** Receive these chunks after a chunk is sent. */
private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) {
client.enqueueServerStream(SERVICE, "Write", afterPackets, chunks);
}
private List<Chunk> lastChunks() {
return client.lastClientStreams(Chunk.class);
}
private Manager createManager(int transferTimeoutMillis, int initialTransferTimeoutMillis) {
return new Manager(client.client().method(CHANNEL_ID, SERVICE + "/Read"),
client.client().method(CHANNEL_ID, SERVICE + "/Write"),
Runnable::run, // Run the job immediately on the same thread
transferTimeoutMillis,
initialTransferTimeoutMillis,
MAX_RETRIES,
() -> this.shouldAbortFlag);
}
private void setPlatformTransferDisabled() {
this.shouldAbortFlag = true;
}
private void setPlatformTransferEnabled() {
this.shouldAbortFlag = false;
}
}