blob: 5ea4dda31115909d2310ee0105d7b375b5b7df7e [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import dev.pigweed.pw_rpc.ChannelOutputException;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_rpc.TestClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
public final class TransferClientTest {
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
private static final int CHANNEL_ID = 1;
private static final String SERVICE = "pw.transfer.Transfer";
private static final ByteString TEST_DATA_SHORT = ByteString.copyFromUtf8("O_o");
private static final ByteString TEST_DATA_100B = range(0, 100);
private static final TransferParameters TRANSFER_PARAMETERS =
TransferParameters.create(50, 30, 0);
private static final int MAX_RETRIES = 2;
private static final int ID = 123;
private boolean shouldAbortFlag = false;
private TestClient rpcClient;
private TransferClient transferClient;
@Mock private Consumer<TransferProgress> progressCallback;
@Captor private ArgumentCaptor<TransferProgress> progress;
@Before
public void setup() {
rpcClient = new TestClient(ImmutableList.of(TransferService.get()));
// Default to a long timeout that should never trigger.
transferClient = createTransferClient(60000, 60000);
}
@After
public void tearDown() {
try {
transferClient.close();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
@Test
public void read_singleChunk_successful() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(1);
assertThat(future.isDone()).isFalse();
receiveReadChunks(
newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_failedPreconditionError_retriesInitialPacket() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build());
receiveReadServerError(Status.FAILED_PRECONDITION);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build());
receiveReadChunks(
newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_failedPreconditionError_abortsAfterInitialPacket() {
ListenableFuture<byte[]> future = transferClient.read(1, TransferParameters.create(50, 50, 0));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(50)
.build());
receiveReadChunks(dataChunk(1, TEST_DATA_100B, 0, 50));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setOffset(50)
.setPendingBytes(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(50)
.build());
receiveReadServerError(Status.FAILED_PRECONDITION);
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void read_failedPreconditionErrorMaxRetriesTimes_aborts() {
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveReadServerError(Status.FAILED_PRECONDITION);
}
Chunk initialChunk = newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1)
.setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes())
.setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes())
.setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes())
.build();
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
receiveReadServerError(Status.FAILED_PRECONDITION);
assertThat(lastChunks()).isEmpty();
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(1);
assertThat(future.isDone()).isFalse();
receiveReadChunks(finalChunk(2, Status.OK),
newChunk(Chunk.Type.DATA, 0).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
receiveWriteChunks(finalChunk(1, Status.OK),
newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
assertThat(future.isDone()).isFalse();
receiveReadChunks(
newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@Test
public void read_empty() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(2);
lastChunks(); // Discard initial chunk (tested elsewhere)
receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setRemainingBytes(0));
assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK));
assertThat(future.get()).isEqualTo(new byte[] {});
}
@Test
public void read_sendsTransferParametersFirst() {
ListenableFuture<byte[]> future = transferClient.read(99, TransferParameters.create(3, 2, 1));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
.setPendingBytes(3)
.setWindowEndOffset(3)
.setMaxChunkSizeBytes(2)
.setMinDelayMicroseconds(1)
.setOffset(0)
.build());
assertThat(future.cancel(true)).isTrue();
}
@Test
public void read_severalChunks() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(
newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)).setRemainingBytes(70),
newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 40)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(40)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(90)
.build());
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(40).setData(range(40, 70)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(70)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(120)
.build());
receiveReadChunks(
newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 100)).setRemainingBytes(0));
assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
public void read_progressCallbackIsCalled() {
ListenableFuture<byte[]> future =
transferClient.read(ID, TRANSFER_PARAMETERS, progressCallback);
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)),
newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)),
newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 60)).setRemainingBytes(5),
newChunk(Chunk.Type.DATA, ID).setOffset(60).setData(range(60, 70)),
newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 80)).setRemainingBytes(20),
newChunk(Chunk.Type.DATA, ID).setOffset(90).setData(range(90, 100)),
newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
.containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(60, 60, 65),
TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE),
TransferProgress.create(80, 80, 100),
TransferProgress.create(100, 100, 100));
assertThat(future.isDone()).isTrue();
}
@Test
public void read_rewindWhenPacketsSkipped() throws Exception {
ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(30, 50)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)),
newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80)
.setMaxChunkSizeBytes(30)
.build());
receiveReadChunks(
newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(50)
.setPendingBytes(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(30)
.build());
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)),
newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130)
.setMaxChunkSizeBytes(30)
.build(),
finalChunk(ID, Status.OK));
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@Test
public void read_multipleWithSameId_sequentially_successful() throws Exception {
for (int i = 0; i < 3; ++i) {
ListenableFuture<byte[]> future = transferClient.read(1);
receiveReadChunks(
newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
}
@Test
public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
ListenableFuture<byte[]> first = transferClient.read(123);
ListenableFuture<byte[]> second = transferClient.read(123);
assertThat(first.isDone()).isFalse();
ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void read_sendErrorOnFirstPacket_fails() {
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
ListenableFuture<byte[]> future = transferClient.read(123);
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void read_sendErrorOnLaterPacket_aborts() {
ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)));
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 50)));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void read_cancelFuture_abortsTransfer() {
ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
assertThat(future.cancel(true)).isTrue();
receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
}
@Test
public void read_protocolError_aborts() {
ListenableFuture<byte[]> future = transferClient.read(ID);
receiveReadChunks(finalChunk(ID, Status.ALREADY_EXISTS));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void read_rpcError_aborts() {
ListenableFuture<byte[]> future = transferClient.read(2);
receiveReadServerError(Status.NOT_FOUND);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void read_timeout() {
transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout.
ListenableFuture<byte[]> future = transferClient.read(ID, TRANSFER_PARAMETERS);
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
// read should have retried sending the transfer parameters 2 times, for a total of 3
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build(),
finalChunk(ID, Status.DEADLINE_EXCEEDED));
}
@Test
public void write_singleChunk() throws Exception {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_platformTransferDisabled_aborted() {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(future.isDone()).isFalse();
shouldAbortFlag = true;
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED);
}
@Test
public void write_failedPreconditionError_retriesInitialPacket() throws Exception {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size()));
receiveWriteServerError(Status.FAILED_PRECONDITION);
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(1024)
.setMaxChunkSizeBytes(128),
finalChunk(2, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_failedPreconditionError_abortsAfterInitialPacket() {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(50));
assertThat(lastChunks())
.containsExactly(
initialWriteChunk(2, 2, TEST_DATA_100B.size()), dataChunk(2, TEST_DATA_100B, 0, 50));
receiveWriteServerError(Status.FAILED_PRECONDITION);
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void write_failedPreconditionErrorMaxRetriesTimes_aborts() {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveWriteServerError(Status.FAILED_PRECONDITION);
}
Chunk initialChunk = initialWriteChunk(2, 2, TEST_DATA_SHORT.size());
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
receiveWriteServerError(Status.FAILED_PRECONDITION);
assertThat(lastChunks()).isEmpty();
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void write_empty() throws Exception {
ListenableFuture<Void> future = transferClient.write(2, new byte[] {});
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(2, Status.OK));
assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, 0));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_severalChunks() throws Exception {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(),
newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(50)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 75)).build(),
newChunk(Chunk.Type.DATA, ID).setOffset(75).setData(range(75, 90)).build());
receiveWriteChunks(
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_parametersContinue() throws Exception {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(),
newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(30)
.setPendingBytes(50)
.setWindowEndOffset(80));
// Transfer doesn't roll back to offset 30 but instead continues sending up to 80.
assertThat(lastChunks())
.containsExactly(
newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)).build());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(80)
.setPendingBytes(50)
.setWindowEndOffset(130));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID)
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0)
.build());
assertThat(future.isDone()).isFalse();
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()));
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(90)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 90)).build());
receiveWriteChunks(
// This stale packet with a window end before the offset should be ignored.
newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID)
.setOffset(25)
.setPendingBytes(25)
.setWindowEndOffset(50),
// Start from an arbitrary offset before the current, but extend the window to the end.
newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100));
assertThat(lastChunks())
.containsExactly(newChunk(Chunk.Type.DATA, ID)
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
receiveWriteChunks(finalChunk(ID, Status.OK));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@Test
public void write_progressCallbackIsCalled() {
ListenableFuture<Void> future =
transferClient.write(ID, TEST_DATA_100B.toByteArray(), progressCallback);
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setMaxChunkSizeBytes(30),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(50).setPendingBytes(50),
finalChunk(ID, Status.OK));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
.containsExactly(TransferProgress.create(30, 0, 100),
TransferProgress.create(60, 0, 100),
TransferProgress.create(90, 0, 100),
TransferProgress.create(80, 50, 100),
TransferProgress.create(100, 50, 100),
TransferProgress.create(100, 100, 100));
assertThat(future.isDone()).isTrue();
}
@Test
public void write_asksForFinalOffset_sendsFinalPacket() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(100)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()),
newChunk(Chunk.Type.DATA, ID).setOffset(100).setRemainingBytes(0).build());
assertThat(future.isDone()).isFalse();
}
@Test
public void write_multipleWithSameId_sequentially_successful() throws Exception {
for (int i = 0; i < 3; ++i) {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0).setPendingBytes(50),
finalChunk(ID, Status.OK));
future.get();
}
}
@Test
public void write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() {
ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
assertThat(first.isDone()).isFalse();
ExecutionException thrown = assertThrows(ExecutionException.class, second::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS);
}
@Test
public void write_sendErrorOnFirstPacket_failsImmediately() {
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void write_serviceRequestsNoData_aborts() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
}
@Test
public void write_invalidOffset_aborts() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(101)
.setPendingBytes(40)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
.containsExactly(
initialWriteChunk(ID, ID, TEST_DATA_100B.size()), finalChunk(ID, Status.OUT_OF_RANGE));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE);
}
@Test
public void write_sendErrorOnLaterPacket_aborts() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(30));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception);
}
@Test
public void write_cancelFuture_abortsTransfer() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
assertThat(future.cancel(true)).isTrue();
receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(50)
.setMaxChunkSizeBytes(50));
assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED));
}
@Test
public void write_protocolError_aborts() {
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
receiveWriteChunks(finalChunk(ID, Status.NOT_FOUND));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND);
}
@Test
public void write_rpcError_aborts() {
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
receiveWriteServerError(Status.NOT_FOUND);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL);
}
@Test
public void write_timeoutAfterInitialChunk() {
transferClient = createTransferClient(1, 1); // Create a manager with a very short timeout.
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
// Client should have resent the last chunk (the initial chunk in this case) for each timeout.
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial
initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 1
initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 2
finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
}
@Test
public void write_timeoutAfterSingleChunk() throws Exception {
transferClient.close();
transferClient = createTransferClient(10, 10); // Create a manager with a very short timeout.
// Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
enqueueWriteChunks(2,
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(90)
.setMaxChunkSizeBytes(30));
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_SHORT.toByteArray());
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
Chunk data = newChunk(Chunk.Type.DATA, ID)
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0)
.build();
assertThat(lastChunks())
.containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial
data, // data chunk
data, // retry 1
data, // retry 2
finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort
}
@Test
public void write_multipleTimeoutsAndRecoveries() throws Exception {
transferClient.close();
transferClient = createTransferClient(10, 10); // Create a manager with short timeouts.
// Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters.
enqueueWriteChunks(2,
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(0)
.setPendingBytes(40)
.setMaxChunkSizeBytes(20));
// After the second retry, send more transfer parameters
enqueueWriteChunks(4,
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(40)
.setPendingBytes(80)
.setMaxChunkSizeBytes(40));
// After the first retry, send more transfer parameters
enqueueWriteChunks(3,
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID)
.setOffset(80)
.setPendingBytes(80)
.setMaxChunkSizeBytes(10));
// After the second retry, confirm completed
enqueueWriteChunks(
4, newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setStatus(Status.OK.code()));
ListenableFuture<Void> future = transferClient.write(ID, TEST_DATA_100B.toByteArray());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
assertThat(lastChunks())
.containsExactly(
initialWriteChunk(
ID, ID, TEST_DATA_100B.size()), // initial
// after 2, receive parameters: 40 from 0 by 20
dataChunk(ID, TEST_DATA_100B, 0, 20), // data 0-20
dataChunk(ID, TEST_DATA_100B, 20, 40), // data 20-40
dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 1
dataChunk(ID, TEST_DATA_100B, 20, 40), // retry 2
// after 4, receive parameters: 80 from 40 by 40
dataChunk(ID, TEST_DATA_100B, 40, 80), // data 40-80
dataChunk(ID, TEST_DATA_100B, 80, 100), // data 80-100
dataChunk(ID, TEST_DATA_100B, 80, 100), // retry 1
// after 3, receive parameters: 80 from 80 by 10
dataChunk(ID, TEST_DATA_100B, 80, 90), // data 80-90
dataChunk(ID, TEST_DATA_100B, 90, 100), // data 90-100
dataChunk(ID, TEST_DATA_100B, 90, 100), // retry 1
dataChunk(ID, TEST_DATA_100B, 90, 100)); // retry 2
// after 4, receive final OK
}
private static ByteString range(int startInclusive, int endExclusive) {
assertThat(startInclusive).isLessThan((int) Byte.MAX_VALUE);
assertThat(endExclusive).isLessThan((int) Byte.MAX_VALUE);
byte[] bytes = new byte[endExclusive - startInclusive];
for (byte i = 0; i < bytes.length; ++i) {
bytes[i] = (byte) (i + startInclusive);
}
return ByteString.copyFrom(bytes);
}
private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) {
return Chunk.newBuilder().setType(type).setTransferId(resourceId);
}
private static Chunk initialWriteChunk(int sessionId, int resourceId, int size) {
return newChunk(Chunk.Type.START, sessionId)
.setResourceId(resourceId)
.setRemainingBytes(size)
.build();
}
private static Chunk finalChunk(int sessionId, Status status) {
return newChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build();
}
private static Chunk dataChunk(int sessionId, ByteString data, int start, int end) {
if (start < 0 || end > data.size()) {
throw new IndexOutOfBoundsException("Invalid start or end");
}
Chunk.Builder chunk =
newChunk(Chunk.Type.DATA, sessionId).setOffset(start).setData(data.substring(start, end));
if (end == data.size()) {
chunk.setRemainingBytes(0);
}
return chunk.build();
}
/** Runs an action */
private void syncWithTransferThread(Runnable action) {
transferClient.waitUntilEventsAreProcessedForTest();
action.run();
transferClient.waitUntilEventsAreProcessedForTest();
}
private void receiveReadServerError(Status status) {
syncWithTransferThread(() -> rpcClient.receiveServerError(SERVICE, "Read", status));
}
private void receiveWriteServerError(Status status) {
syncWithTransferThread(() -> rpcClient.receiveServerError(SERVICE, "Write", status));
}
private void receiveReadChunks(ChunkOrBuilder... chunks) {
for (ChunkOrBuilder chunk : chunks) {
syncWithTransferThread(() -> rpcClient.receiveServerStream(SERVICE, "Read", chunk));
}
}
private void receiveWriteChunks(ChunkOrBuilder... chunks) {
for (ChunkOrBuilder chunk : chunks) {
syncWithTransferThread(() -> rpcClient.receiveServerStream(SERVICE, "Write", chunk));
}
}
/** Receive these chunks after a chunk is sent. */
private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) {
syncWithTransferThread(
() -> rpcClient.enqueueServerStream(SERVICE, "Write", afterPackets, chunks));
}
private List<Chunk> lastChunks() {
transferClient.waitUntilEventsAreProcessedForTest();
return rpcClient.lastClientStreams(Chunk.class);
}
private TransferClient createTransferClient(
int transferTimeoutMillis, int initialTransferTimeoutMillis) {
return new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"),
rpcClient.client().method(CHANNEL_ID, SERVICE + "/Write"),
transferTimeoutMillis,
initialTransferTimeoutMillis,
MAX_RETRIES,
() -> this.shouldAbortFlag);
}
}