| // Copyright 2022 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| package dev.pigweed.pw_transfer; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| import static org.junit.Assert.assertThrows; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.protobuf.ByteString; |
| import dev.pigweed.pw_rpc.ChannelOutputException; |
| import dev.pigweed.pw_rpc.Status; |
| import dev.pigweed.pw_rpc.TestClient; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.ExecutionException; |
| import java.util.function.Consumer; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnit; |
| import org.mockito.junit.MockitoRule; |
| |
| public final class TransferClientTest { |
| @Rule public final MockitoRule mockito = MockitoJUnit.rule(); |
| |
| private static final int CHANNEL_ID = 1; |
| private static final String SERVICE = "pw.transfer.Transfer"; |
| |
| private static final ByteString TEST_DATA_SHORT = ByteString.copyFromUtf8("O_o"); |
| private static final ByteString TEST_DATA_100B = range(0, 100); |
| |
| private static final TransferParameters TRANSFER_PARAMETERS = |
| TransferParameters.create(50, 30, 0); |
| private static final int MAX_RETRIES = 2; |
| |
| private boolean shouldAbortFlag = false; |
| private TestClient rpcClient; |
| private TransferClient transferClient; |
| |
| @Mock private Consumer<TransferProgress> progressCallback; |
| @Captor private ArgumentCaptor<TransferProgress> progress; |
| |
| @Before |
| public void setup() { |
| rpcClient = new TestClient(ImmutableList.of(TransferService.get())); |
| } |
| |
| @After |
| public void tearDown() { |
| try { |
| transferClient.close(); |
| } catch (InterruptedException e) { |
| throw new AssertionError(e); |
| } |
| } |
| |
| @Test |
| public void legacy_read_singleChunk_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(1); |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void legacy_read_failedPreconditionError_retriesInitialPacket() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void legacy_read_failedPreconditionError_abortsAfterInitialPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| TransferParameters params = TransferParameters.create(50, 50, 0); |
| ListenableFuture<byte[]> future = transferClient.read(1, params); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY, params)); |
| |
| receiveReadChunks(legacyDataChunk(1, TEST_DATA_100B, 0, 50)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) |
| .setOffset(50) |
| .setPendingBytes(50) |
| .setWindowEndOffset(100) |
| .setMaxChunkSizeBytes(50) |
| .build()); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_read_failedPreconditionErrorMaxRetriesTimes_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| } |
| |
| Chunk initialChunk = initialReadChunk(1, ProtocolVersion.LEGACY); |
| assertThat(lastChunks()) |
| .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).isEmpty(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(1); |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveReadChunks(legacyFinalChunk(2, Status.OK), |
| newLegacyChunk(Chunk.Type.DATA, 0) |
| .setOffset(0) |
| .setData(TEST_DATA_100B) |
| .setRemainingBytes(0), |
| newLegacyChunk(Chunk.Type.DATA, 3) |
| .setOffset(0) |
| .setData(TEST_DATA_100B) |
| .setRemainingBytes(0)); |
| receiveWriteChunks(legacyFinalChunk(1, Status.OK), |
| newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_100B) |
| .setRemainingBytes(0), |
| newLegacyChunk(Chunk.Type.DATA, 2) |
| .setOffset(0) |
| .setData(TEST_DATA_100B) |
| .setRemainingBytes(0)); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void legacy_read_empty() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(2); |
| lastChunks(); // Discard initial chunk (tested elsewhere) |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 2).setRemainingBytes(0)); |
| |
| assertThat(lastChunks()).containsExactly(legacyFinalChunk(2, Status.OK)); |
| |
| assertThat(future.get()).isEqualTo(new byte[] {}); |
| } |
| |
| @Test |
| public void legacy_read_sendsTransferParametersFirst() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| TransferParameters params = TransferParameters.create(3, 2, 1); |
| ListenableFuture<byte[]> future = transferClient.read(99, params); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(99, ProtocolVersion.LEGACY, params)); |
| assertThat(future.cancel(true)).isTrue(); |
| } |
| |
| @Test |
| public void legacy_read_severalChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(0) |
| .setData(range(0, 20)) |
| .setRemainingBytes(70), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(40) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(30) |
| .setWindowEndOffset(90) |
| .build()); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(70) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(30) |
| .setWindowEndOffset(120) |
| .build()); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(70) |
| .setData(range(70, 100)) |
| .setRemainingBytes(0)); |
| |
| assertThat(lastChunks()).containsExactly(legacyFinalChunk(123, Status.OK)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); |
| } |
| |
| @Test |
| public void legacy_read_progressCallbackIsCalled() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = |
| transferClient.read(123, TRANSFER_PARAMETERS, progressCallback); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)), |
| newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(50) |
| .setData(range(50, 60)) |
| .setRemainingBytes(5), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)), |
| newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(70) |
| .setData(range(70, 80)) |
| .setRemainingBytes(20), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)), |
| newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(80) |
| .setData(range(80, 100)) |
| .setRemainingBytes(0)); |
| |
| verify(progressCallback, times(6)).accept(progress.capture()); |
| assertThat(progress.getAllValues()) |
| .containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(60, 60, 65), |
| TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(80, 80, 100), |
| TransferProgress.create(100, 100, 100)); |
| assertThat(future.isDone()).isTrue(); |
| } |
| |
| @Test |
| public void legacy_read_rewindWhenPacketsSkipped() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setPendingBytes(50) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30) |
| .setOffset(0) |
| .build()); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(30) |
| .setPendingBytes(50) |
| .setWindowEndOffset(80) |
| .setMaxChunkSizeBytes(30) |
| .build()); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(80) |
| .setData(range(80, 100)) |
| .setRemainingBytes(0)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(50) |
| .setPendingBytes(50) |
| .setWindowEndOffset(100) |
| .setMaxChunkSizeBytes(30) |
| .build()); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)), |
| newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(80) |
| .setData(range(80, 100)) |
| .setRemainingBytes(0)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(80) |
| .setPendingBytes(50) |
| .setWindowEndOffset(130) |
| .setMaxChunkSizeBytes(30) |
| .build(), |
| legacyFinalChunk(123, Status.OK)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); |
| } |
| |
| @Test |
| public void legacy_read_multipleWithSameId_sequentially_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| for (int i = 0; i < 3; ++i) { |
| ListenableFuture<byte[]> future = transferClient.read(1); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| } |
| |
| @Test |
| public void legacy_read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> first = transferClient.read(123); |
| ListenableFuture<byte[]> second = transferClient.read(123); |
| |
| assertThat(first.isDone()).isFalse(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, second::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void legacy_read_sendErrorOnFirstPacket_fails() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| ListenableFuture<byte[]> future = transferClient.read(123); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void legacy_read_sendErrorOnLaterPacket_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20))); |
| |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50))); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void legacy_read_cancelFuture_abortsTransfer() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| assertThat(future.cancel(true)).isTrue(); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); |
| assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED)); |
| } |
| |
| @Test |
| public void legacy_read_transferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123); |
| |
| receiveReadChunks(legacyFinalChunk(123, Status.ALREADY_EXISTS)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void legacy_read_rpcError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(2); |
| |
| receiveReadServerError(Status.NOT_FOUND); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_read_timeout() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| // Call future.get() without sending any server-side packets. |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| // read should have retried sending the transfer parameters 2 times, for a total of 3 |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY), |
| initialReadChunk(123, ProtocolVersion.LEGACY), |
| initialReadChunk(123, ProtocolVersion.LEGACY)); |
| } |
| |
| @Test |
| public void legacy_write_singleChunk() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) |
| .setOffset(0) |
| .setPendingBytes(1024) |
| .setMaxChunkSizeBytes(128), |
| legacyFinalChunk(2, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_platformTransferDisabled_aborted() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| assertThat(future.isDone()).isFalse(); |
| |
| shouldAbortFlag = true; |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) |
| .setOffset(0) |
| .setPendingBytes(1024) |
| .setMaxChunkSizeBytes(128), |
| legacyFinalChunk(2, Status.OK)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED); |
| } |
| |
| @Test |
| public void legacy_write_failedPreconditionError_retriesInitialPacket() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); |
| |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) |
| .setOffset(0) |
| .setPendingBytes(1024) |
| .setMaxChunkSizeBytes(128), |
| legacyFinalChunk(2, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_failedPreconditionError_abortsAfterInitialPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) |
| .setOffset(0) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(50)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), |
| legacyDataChunk(2, TEST_DATA_100B, 0, 50)); |
| |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_write_failedPreconditionErrorMaxRetriesTimes_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| } |
| |
| Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()); |
| assertThat(lastChunks()) |
| .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); |
| |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).isEmpty(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_write_empty() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, new byte[] {}); |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(legacyFinalChunk(2, Status.OK)); |
| |
| assertThat(lastChunks()).containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, 0)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_severalChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(30) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(50) |
| .setPendingBytes(40) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build()); |
| |
| receiveWriteChunks( |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setPendingBytes(50)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(90) |
| .setData(range(90, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(legacyFinalChunk(123, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_parametersContinue() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(50) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(30) |
| .setPendingBytes(50) |
| .setWindowEndOffset(80)); |
| |
| // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. |
| assertThat(lastChunks()) |
| .containsExactly( |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(80) |
| .setPendingBytes(50) |
| .setWindowEndOffset(130)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(80) |
| .setData(range(80, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(legacyFinalChunk(123, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(90) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(90) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 90)).build()); |
| |
| receiveWriteChunks( |
| // This stale packet with a window end before the offset should be ignored. |
| newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(25) |
| .setPendingBytes(25) |
| .setWindowEndOffset(50), |
| // Start from an arbitrary offset before the current, but extend the window to the end. |
| newLegacyChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(100)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(90) |
| .setData(range(90, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| receiveWriteChunks(legacyFinalChunk(123, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void legacy_write_progressCallbackIsCalled() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = |
| transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(90) |
| .setMaxChunkSizeBytes(30), |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setPendingBytes(50), |
| legacyFinalChunk(123, Status.OK)); |
| |
| verify(progressCallback, times(6)).accept(progress.capture()); |
| assertThat(progress.getAllValues()) |
| .containsExactly(TransferProgress.create(30, 0, 100), |
| TransferProgress.create(60, 0, 100), |
| TransferProgress.create(90, 0, 100), |
| TransferProgress.create(80, 50, 100), |
| TransferProgress.create(100, 50, 100), |
| TransferProgress.create(100, 100, 100)); |
| assertThat(future.isDone()).isTrue(); |
| } |
| |
| @Test |
| public void legacy_write_asksForFinalOffset_sendsFinalPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(100) |
| .setPendingBytes(40) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), |
| newLegacyChunk(Chunk.Type.DATA, 123).setOffset(100).setRemainingBytes(0).build()); |
| assertThat(future.isDone()).isFalse(); |
| } |
| |
| @Test |
| public void legacy_write_multipleWithSameId_sequentially_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| for (int i = 0; i < 3; ++i) { |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteChunks( |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setPendingBytes(50), |
| legacyFinalChunk(123, Status.OK)); |
| |
| future.get(); |
| } |
| } |
| |
| @Test |
| public void legacy_write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| assertThat(first.isDone()).isFalse(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, second::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void legacy_write_sendErrorOnFirstPacket_failsImmediately() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void legacy_write_serviceRequestsNoData_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); |
| } |
| |
| @Test |
| public void legacy_write_invalidOffset_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(101) |
| .setPendingBytes(40) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), |
| legacyFinalChunk(123, Status.OUT_OF_RANGE)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); |
| } |
| |
| @Test |
| public void legacy_write_sendErrorOnLaterPacket_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(30)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void legacy_write_cancelFuture_abortsTransfer() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(future.cancel(true)).isTrue(); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(50) |
| .setMaxChunkSizeBytes(50)); |
| assertThat(lastChunks()).contains(legacyFinalChunk(123, Status.CANCELLED)); |
| } |
| |
| @Test |
| public void legacy_write_transferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteChunks(legacyFinalChunk(123, Status.NOT_FOUND)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND); |
| } |
| |
| @Test |
| public void legacy_write_rpcError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteServerError(Status.NOT_FOUND); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void legacy_write_timeoutAfterInitialChunk() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| // Call future.get() without sending any server-side packets. |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| // Client should have resent the last chunk (the initial chunk in this case) for each timeout. |
| assertThat(lastChunks()) |
| .containsExactly( |
| initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial |
| initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // retry 1 |
| initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); // retry 2 |
| } |
| |
| @Test |
| public void legacy_write_timeoutAfterSingleChunk() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); |
| |
| // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. |
| enqueueWriteChunks(2, |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setPendingBytes(90) |
| .setMaxChunkSizeBytes(30)); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| Chunk data = newLegacyChunk(Chunk.Type.DATA, 123) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0) |
| .build(); |
| assertThat(lastChunks()) |
| .containsExactly( |
| initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial |
| data, // data chunk |
| data, // retry 1 |
| data); // retry 2 |
| } |
| |
| @Test |
| public void legacy_write_multipleTimeoutsAndRecoveries() throws Exception { |
| createTransferClientThatMayTimeOut(ProtocolVersion.LEGACY); |
| |
| // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. |
| enqueueWriteChunks(2, |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(40) |
| .setMaxChunkSizeBytes(20)); |
| |
| // After the second retry, send more transfer parameters |
| enqueueWriteChunks(4, |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(40) |
| .setWindowEndOffset(120) |
| .setMaxChunkSizeBytes(40)); |
| |
| // After the first retry, send more transfer parameters |
| enqueueWriteChunks(3, |
| newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(80) |
| .setWindowEndOffset(160) |
| .setMaxChunkSizeBytes(10)); |
| |
| // After the second retry, confirm completed |
| enqueueWriteChunks( |
| 4, newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setStatus(Status.OK.code())); |
| |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| // initial |
| initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), |
| // after 2, receive parameters: 40 from 0 by 20 |
| legacyDataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 |
| legacyDataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 |
| legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 1 |
| legacyDataChunk(123, TEST_DATA_100B, 20, 40), // retry 2 |
| // after 4, receive parameters: 80 from 40 by 40 |
| legacyDataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80 |
| legacyDataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 |
| legacyDataChunk(123, TEST_DATA_100B, 80, 100), // retry 1 |
| // after 3, receive parameters: 80 from 80 by 10 |
| legacyDataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90 |
| legacyDataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100 |
| legacyDataChunk(123, TEST_DATA_100B, 90, 100), // retry 1 |
| legacyDataChunk(123, TEST_DATA_100B, 90, 100)); // retry 2 |
| // after 4, receive final OK |
| } |
| |
| @Test |
| public void read_singleChunk_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(3, TRANSFER_PARAMETERS); |
| assertThat(future.isDone()).isFalse(); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(3, ProtocolVersion.VERSION_TWO)); |
| |
| receiveReadChunks(newChunk(Chunk.Type.START_ACK, 321) |
| .setResourceId(3) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()).containsExactly(readStartAckConfirmation(321, TRANSFER_PARAMETERS)); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 321).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(Chunk.newBuilder() |
| .setType(Chunk.Type.COMPLETION) |
| .setSessionId(321) |
| .setStatus(Status.OK.ordinal()) |
| .build()); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, 321)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void read_requestV2ReceiveLegacy() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| assertThat(future.isDone()).isFalse(); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); |
| |
| receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| // No handshake packets since the server responded as legacy. |
| assertThat(lastChunks()).containsExactly(legacyFinalChunk(1, Status.OK)); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void read_failedPreconditionError_retriesInitialPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); |
| } |
| |
| receiveReadChunks(newChunk(Chunk.Type.START_ACK, 54321) |
| .setResourceId(1) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()).containsExactly(readStartAckConfirmation(54321, TRANSFER_PARAMETERS)); |
| } |
| |
| @Test |
| public void read_failedPreconditionError_abortsAfterInitial() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| TransferParameters params = TransferParameters.create(50, 50, 0); |
| ListenableFuture<byte[]> future = transferClient.read(1, params); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); |
| |
| receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) |
| .setResourceId(1) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void read_failedPreconditionError_abortsAfterHandshake() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| TransferParameters params = TransferParameters.create(50, 50, 0); |
| ListenableFuture<byte[]> future = transferClient.read(1, params); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); |
| |
| receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) |
| .setResourceId(1) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()).containsExactly(readStartAckConfirmation(555, params)); |
| |
| receiveReadChunks(dataChunk(555, TEST_DATA_100B, 0, 50)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) |
| .setOffset(50) |
| .setWindowEndOffset(100) |
| .setMaxChunkSizeBytes(50) |
| .build()); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void read_failedPreconditionErrorMaxRetriesTimes_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| } |
| |
| Chunk initialChunk = initialReadChunk(1, ProtocolVersion.VERSION_TWO); |
| assertThat(lastChunks()) |
| .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); |
| |
| receiveReadServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).isEmpty(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1); |
| assertThat(future.isDone()).isFalse(); |
| |
| performReadStartHandshake(1, 99); |
| |
| receiveReadChunks(finalChunk(2, Status.OK), |
| newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), |
| newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); |
| receiveWriteChunks(finalChunk(99, Status.INVALID_ARGUMENT), |
| newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), |
| newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); |
| |
| performReadCompletionHandshake(99, Status.OK); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| |
| @Test |
| public void read_empty() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(2); |
| performReadStartHandshake(2, 5678); |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 5678).setRemainingBytes(0)); |
| |
| performReadCompletionHandshake(5678, Status.OK); |
| |
| assertThat(future.get()).isEqualTo(new byte[] {}); |
| } |
| |
| @Test |
| public void read_sendsTransferParametersFirst() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| TransferParameters params = TransferParameters.create(3, 2, 1); |
| ListenableFuture<byte[]> future = transferClient.read(99, params); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(99, ProtocolVersion.VERSION_TWO, params)); |
| assertThat(future.cancel(true)).isTrue(); |
| } |
| |
| @Test |
| public void read_severalChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(7, TRANSFER_PARAMETERS); |
| |
| performReadStartHandshake(7, 123, TRANSFER_PARAMETERS); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), |
| newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(40) |
| .setMaxChunkSizeBytes(30) |
| .setWindowEndOffset(90) |
| .build()); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(70) |
| .setMaxChunkSizeBytes(30) |
| .setWindowEndOffset(120) |
| .build()); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); |
| |
| performReadCompletionHandshake(123, Status.OK); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); |
| } |
| |
| @Test |
| public void read_onlySendsOneUpdateAfterDrops() throws Exception { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| TransferParameters params = TransferParameters.create(50, 10, 0); |
| |
| // Handshake |
| enqueueReadChunks(2, // Wait for read RPC open & START packet |
| newChunk(Chunk.Type.START_ACK, 99) |
| .setResourceId(7) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| enqueueReadChunks(1, // Ignore the first START_ACK_CONFIRMATION |
| newChunk(Chunk.Type.START_ACK, 99) |
| .setResourceId(7) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| // Window 1: server waits for START_ACK_CONFIRMATION, drops 2nd packet |
| enqueueReadChunks(1, |
| newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(range(0, 10)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); |
| |
| // Window 2: server waits for retransmit, drops 1st packet |
| enqueueReadChunks(1, |
| newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60))); |
| |
| // Window 3: server waits for retransmit, drops last packet |
| enqueueReadChunks(1, |
| newChunk(Chunk.Type.DATA, 99).setOffset(10).setData(range(10, 20)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); |
| |
| // Window 4: server waits for continue and retransmit, normal window. |
| enqueueReadChunks(2, |
| newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(60).setData(range(60, 70)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(70).setData(range(70, 80)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(80).setData(range(80, 90)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); |
| enqueueReadChunks(2, // Ignore continue and retransmit chunks, retry last packet in window |
| newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)), |
| newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); |
| |
| // Window 5: Final packet |
| enqueueReadChunks(2, // Receive two retries, then send final packet |
| newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); |
| enqueueReadChunks(1, // Ignore first COMPLETION packet |
| newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); |
| enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 99)); |
| |
| ListenableFuture<byte[]> future = transferClient.read(7, params); |
| // assertThat(future.get()).isEqualTo(range(0, 110).toByteArray()); |
| while (!future.isDone()) { |
| } |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| // Handshake |
| initialReadChunk(7, ProtocolVersion.VERSION_TWO, params), |
| readStartAckConfirmation(99, params), |
| readStartAckConfirmation(99, params), |
| // Window 1: send one transfer parameters update after the drop |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) |
| .setOffset(10) |
| .setWindowEndOffset(60) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| // Window 2: send one transfer parameters update after the drop |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) |
| .setOffset(10) |
| .setWindowEndOffset(60) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| // Window 3: send one transfer parameters update after the drop, then continue packet |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Not seen by server |
| .setOffset(40) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after timeout |
| .setOffset(50) |
| .setWindowEndOffset(100) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| // Window 4: send one transfer parameters update after the drop, then continue packet |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Ignored by server |
| .setOffset(80) |
| .setWindowEndOffset(130) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after last packet |
| .setOffset(100) |
| .setWindowEndOffset(150) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet |
| .setOffset(100) |
| .setWindowEndOffset(150) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet |
| .setOffset(100) |
| .setWindowEndOffset(150) |
| .setMaxChunkSizeBytes(10) |
| .build(), |
| // Window 5: final packet and closing handshake |
| newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build(), |
| newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build()); |
| } |
| |
| @Test |
| public void read_progressCallbackIsCalled() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = |
| transferClient.read(123, TRANSFER_PARAMETERS, progressCallback); |
| |
| performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), |
| newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)), |
| newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), |
| newChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)), |
| newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), |
| newChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)), |
| newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30))); |
| lastChunks(); // Discard chunks; no need to inspect for this test |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); |
| performReadCompletionHandshake(123, Status.OK); |
| |
| verify(progressCallback, times(6)).accept(progress.capture()); |
| assertThat(progress.getAllValues()) |
| .containsExactly(TransferProgress.create(30, 30, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(50, 50, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(60, 60, 65), |
| TransferProgress.create(70, 70, TransferProgress.UNKNOWN_TRANSFER_SIZE), |
| TransferProgress.create(80, 80, 100), |
| TransferProgress.create(100, 100, 100)); |
| |
| assertThat(future.isDone()).isTrue(); |
| } |
| |
| @Test |
| public void read_rewindWhenPacketsSkipped() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30) |
| .setOffset(0) |
| .build()); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), |
| newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(30) |
| .setWindowEndOffset(80) |
| .setMaxChunkSizeBytes(30) |
| .build()); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(50) |
| .setWindowEndOffset(100) |
| .setMaxChunkSizeBytes(30) |
| .build()); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80))); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) |
| .setOffset(80) |
| .setWindowEndOffset(130) |
| .setMaxChunkSizeBytes(30) |
| .build()); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); |
| |
| performReadCompletionHandshake(123, Status.OK); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); |
| } |
| |
| @Test |
| public void read_multipleWithSameId_sequentially_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| for (int i = 0; i < 3; ++i) { |
| ListenableFuture<byte[]> future = transferClient.read(1); |
| |
| performReadStartHandshake(1, 100 + i); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 100 + i) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0)); |
| |
| performReadCompletionHandshake(100 + i, Status.OK); |
| |
| assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); |
| } |
| } |
| |
| @Test |
| public void read_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> first = transferClient.read(123); |
| ListenableFuture<byte[]> second = transferClient.read(123); |
| |
| assertThat(first.isDone()).isFalse(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, second::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void read_sendErrorOnFirstPacket_fails() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| ListenableFuture<byte[]> future = transferClient.read(123); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void read_sendErrorOnLaterPacket_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1024, TRANSFER_PARAMETERS); |
| |
| performReadStartHandshake(1024, 123, TRANSFER_PARAMETERS); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20))); |
| |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| |
| receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50))); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void read_cancelFuture_abortsTransfer() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); |
| |
| performReadStartHandshake(1, 123, TRANSFER_PARAMETERS); |
| |
| assertThat(future.cancel(true)).isTrue(); |
| |
| assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); |
| } |
| |
| @Test |
| public void read_immediateTransferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(123); |
| |
| // Resource ID will be set since session ID hasn't been assigned yet. |
| receiveReadChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) |
| .setResourceId(123) |
| .setStatus(Status.ALREADY_EXISTS.ordinal())); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void read_laterTransferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(123); |
| |
| performReadStartHandshake(123, 514); |
| |
| receiveReadChunks(finalChunk(514, Status.ALREADY_EXISTS)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void read_rpcError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(2); |
| |
| receiveReadServerError(Status.NOT_FOUND); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void read_serverRespondsWithUnknownVersion_invalidArgument() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(2, ProtocolVersion.VERSION_TWO, TRANSFER_PARAMETERS)); |
| |
| receiveReadChunks( |
| newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); |
| |
| assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); |
| } |
| |
| @Test |
| public void read_timeout() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); |
| |
| // Call future.get() without sending any server-side packets. |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| // read should have retried sending the transfer parameters 2 times, for a total of 3 |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(123, ProtocolVersion.VERSION_TWO), |
| initialReadChunk(123, ProtocolVersion.VERSION_TWO), |
| initialReadChunk(123, ProtocolVersion.VERSION_TWO)); |
| } |
| |
| @Test |
| public void write_singleChunk() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| // Do the start handshake (equivalent to performWriteStartHandshake()). |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 123) |
| .setResourceId(2) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_SHORT.size()) |
| .build()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(1024) |
| .setMaxChunkSizeBytes(128)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 123) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0) |
| .build()); |
| |
| receiveWriteChunks(finalChunk(123, Status.OK)); |
| |
| assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_requestV2ReceiveLegacy() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); |
| |
| receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) |
| .setOffset(0) |
| .setWindowEndOffset(1024) |
| .setMaxChunkSizeBytes(128), |
| legacyFinalChunk(2, Status.OK)); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_platformTransferDisabled_aborted() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| assertThat(future.isDone()).isFalse(); |
| |
| shouldAbortFlag = true; |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ABORTED); |
| } |
| |
| @Test |
| public void write_failedPreconditionError_retriesInitialPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); |
| } |
| |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 54321) |
| .setResourceId(2) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 54321) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_SHORT.size()) |
| .build()); |
| } |
| |
| @Test |
| public void write_failedPreconditionError_abortsAfterInitialPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size())); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 4) |
| .setResourceId(2) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void write_failedPreconditionErrorMaxRetriesTimes_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| for (int i = 0; i < MAX_RETRIES; ++i) { |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| } |
| |
| Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()); |
| assertThat(lastChunks()) |
| .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); |
| |
| receiveWriteServerError(Status.FAILED_PRECONDITION); |
| |
| assertThat(lastChunks()).isEmpty(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void write_empty() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, new byte[] {}); |
| |
| performWriteStartHandshake(2, 123, 0); |
| |
| receiveWriteChunks(finalChunk(123, Status.OK)); |
| |
| assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_severalChunks() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(500, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(500, 123, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), |
| newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(50) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(), |
| newChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build()); |
| |
| receiveWriteChunks( |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setWindowEndOffset(140)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 123) |
| .setOffset(90) |
| .setData(range(90, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(finalChunk(123, Status.OK)); |
| |
| assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_parametersContinue() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(321, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(321, 123, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), |
| newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); |
| |
| receiveWriteChunks( |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(30).setWindowEndOffset(80)); |
| |
| // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. |
| assertThat(lastChunks()) |
| .containsExactly( |
| newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build()); |
| |
| receiveWriteChunks( |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(130)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 123) |
| .setOffset(80) |
| .setData(range(80, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| assertThat(future.isDone()).isFalse(); |
| |
| receiveWriteChunks(finalChunk(123, Status.OK)); |
| |
| assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(123, 555, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) |
| .setOffset(0) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(90) |
| .setMinDelayMicroseconds(1)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 555).setOffset(0).setData(range(0, 90)).build()); |
| |
| receiveWriteChunks( |
| // This stale packet with a window end before the offset should be ignored. |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(25).setWindowEndOffset(50), |
| // Start from an arbitrary offset before the current, but extend the window to the end. |
| newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(80).setWindowEndOffset(100)); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.DATA, 555) |
| .setOffset(90) |
| .setData(range(90, 100)) |
| .setRemainingBytes(0) |
| .build()); |
| |
| receiveWriteChunks(finalChunk(555, Status.OK)); |
| assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 555).build()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| } |
| |
| @Test |
| public void write_progressCallbackIsCalled() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = |
| transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback); |
| |
| performWriteStartHandshake(123, 123, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(30), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setWindowEndOffset(100), |
| finalChunk(123, Status.OK)); |
| |
| verify(progressCallback, times(6)).accept(progress.capture()); |
| assertThat(progress.getAllValues()) |
| .containsExactly(TransferProgress.create(30, 0, 100), |
| TransferProgress.create(60, 0, 100), |
| TransferProgress.create(90, 0, 100), |
| TransferProgress.create(80, 50, 100), |
| TransferProgress.create(100, 50, 100), |
| TransferProgress.create(100, 100, 100)); |
| assertThat(future.isDone()).isTrue(); |
| } |
| |
| @Test |
| public void write_asksForFinalOffset_sendsFinalPacket() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(123, 456, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 456) |
| .setOffset(100) |
| .setWindowEndOffset(140) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newChunk(Chunk.Type.DATA, 456).setOffset(100).setRemainingBytes(0).build()); |
| } |
| |
| @Test |
| public void write_multipleWithSameId_sequentially_successful() throws Exception { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| for (int i = 0; i < 3; ++i) { |
| ListenableFuture<Void> future = transferClient.write(6, TEST_DATA_SHORT.toByteArray()); |
| |
| performWriteStartHandshake(6, 123, TEST_DATA_SHORT.size()); |
| |
| receiveWriteChunks( |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setWindowEndOffset(50), |
| finalChunk(123, Status.OK)); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| newChunk(Chunk.Type.DATA, 123).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(), |
| newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| |
| future.get(); |
| } |
| } |
| |
| @Test |
| public void write_multipleWithSameId_atSameTime_failsWithAlreadyExistsError() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> first = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| ListenableFuture<Void> second = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| assertThat(first.isDone()).isFalse(); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, second::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.ALREADY_EXISTS); |
| } |
| |
| @Test |
| public void write_sendErrorOnFirstPacket_failsImmediately() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void write_serviceRequestsNoData_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); |
| |
| performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0)); |
| |
| assertThat(lastChunks()).containsExactly(finalChunk(123, Status.INVALID_ARGUMENT)); |
| receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); |
| } |
| |
| @Test |
| public void write_invalidOffset_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(101) |
| .setWindowEndOffset(141) |
| .setMaxChunkSizeBytes(25)); |
| |
| assertThat(lastChunks()).containsExactly(finalChunk(123, Status.OUT_OF_RANGE)); |
| receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); |
| } |
| |
| @Test |
| public void write_sendErrorOnLaterPacket_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); |
| |
| performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); |
| |
| ChannelOutputException exception = new ChannelOutputException("blah"); |
| rpcClient.setChannelOutputException(exception); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(30)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| assertThat(thrown).hasCauseThat().hasCauseThat().isSameInstanceAs(exception); |
| } |
| |
| @Test |
| public void write_cancelFuture_abortsTransfer() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); |
| |
| performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); |
| |
| assertThat(future.cancel(true)).isTrue(); |
| assertThat(future.isCancelled()).isTrue(); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(50) |
| .setMaxChunkSizeBytes(50)); |
| |
| assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); |
| receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); |
| } |
| |
| @Test |
| public void write_immediateTransferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) |
| .setResourceId(123) |
| .setStatus(Status.NOT_FOUND.ordinal())); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND); |
| } |
| |
| @Test |
| public void write_laterTransferProtocolError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| performWriteStartHandshake(123, 123, TEST_DATA_SHORT.size()); |
| |
| receiveWriteChunks(finalChunk(123, Status.NOT_FOUND)); |
| |
| ExecutionException thrown = assertThrows(ExecutionException.class, future::get); |
| assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); |
| |
| assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.NOT_FOUND); |
| } |
| |
| @Test |
| public void write_rpcError_aborts() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteServerError(Status.NOT_FOUND); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INTERNAL); |
| } |
| |
| @Test |
| public void write_unknownVersion_invalidArgument() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2).setProtocolVersion(9)); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), |
| finalChunk(3, Status.INVALID_ARGUMENT)); |
| } |
| |
| @Test |
| public void write_serverRespondsWithUnknownVersion_invalidArgument() { |
| createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, 100)); |
| |
| receiveWriteChunks( |
| newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); |
| |
| assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); |
| } |
| |
| @Test |
| public void write_timeoutAfterInitialChunk() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); |
| |
| // Call future.get() without sending any server-side packets. |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| // Client should have resent the last chunk (the initial chunk in this case) for each timeout. |
| assertThat(lastChunks()) |
| .containsExactly( |
| initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial |
| initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // retry 1 |
| initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); // retry 2 |
| } |
| |
| @Test |
| public void write_timeoutAfterSingleChunk() { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| |
| // Wait for two outgoing packets (Write RPC request and first chunk), then do the handshake. |
| enqueueWriteChunks(2, |
| newChunk(Chunk.Type.START_ACK, 123).setResourceId(9), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(90) |
| .setMaxChunkSizeBytes(30)); |
| ListenableFuture<Void> future = transferClient.write(9, TEST_DATA_SHORT.toByteArray()); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| Chunk data = newChunk(Chunk.Type.DATA, 123) |
| .setOffset(0) |
| .setData(TEST_DATA_SHORT) |
| .setRemainingBytes(0) |
| .build(); |
| assertThat(lastChunks()) |
| .containsExactly( |
| initialWriteChunk(9, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial |
| newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_SHORT.size()) |
| .build(), |
| data, // data chunk |
| data, // retry 1 |
| data); // retry 2 |
| } |
| |
| @Test |
| public void write_timeoutAndRecoverDuringHandshakes() throws Exception { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries |
| |
| // Wait for four outgoing packets (Write RPC request and START chunk + retry), then handshake. |
| enqueueWriteChunks(3, |
| newChunk(Chunk.Type.START_ACK, 123) |
| .setResourceId(5) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| // Wait for start ack confirmation + 2 retries, then request three packets. |
| enqueueWriteChunks(3, |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(60) |
| .setMaxChunkSizeBytes(20)); |
| |
| // After two packets, request the remainder of the packets. |
| enqueueWriteChunks( |
| 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(20).setWindowEndOffset(200)); |
| |
| // Wait for last 3 data packets, then 2 final packet retries. |
| enqueueWriteChunks(5, |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(80) |
| .setWindowEndOffset(200) |
| .setMaxChunkSizeBytes(20), |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(80) |
| .setWindowEndOffset(200) |
| .setMaxChunkSizeBytes(20)); |
| |
| // After the retry, confirm completed multiple times; additional packets should be dropped |
| enqueueWriteChunks(1, |
| newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), |
| newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), |
| newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), |
| newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); |
| |
| ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| |
| final Chunk startAckConfirmation = |
| newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_100B.size()) |
| .build(); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| // initial handshake with retries |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| startAckConfirmation, |
| startAckConfirmation, |
| startAckConfirmation, |
| // send all data |
| dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 |
| dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 |
| dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60 |
| dataChunk(123, TEST_DATA_100B, 60, 80), // data 60-80 |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) |
| // retry last packet two times |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) |
| // respond to two PARAMETERS_RETRANSMIT packets |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) |
| // respond to OK packet |
| newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| } |
| |
| @Test |
| public void write_multipleTimeoutsAndRecoveries() throws Exception { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); |
| assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries |
| |
| // Wait for two outgoing packets (Write RPC request and START chunk), then do the handshake. |
| enqueueWriteChunks(2, |
| newChunk(Chunk.Type.START_ACK, 123) |
| .setResourceId(5) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| // Request two packets. |
| enqueueWriteChunks(1, |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(40) |
| .setMaxChunkSizeBytes(20)); |
| |
| // After the second retry, send more transfer parameters |
| enqueueWriteChunks(4, |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(40) |
| .setWindowEndOffset(120) |
| .setMaxChunkSizeBytes(40)); |
| |
| // After the first retry, send more transfer parameters |
| enqueueWriteChunks(3, |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(80) |
| .setWindowEndOffset(160) |
| .setMaxChunkSizeBytes(10)); |
| |
| // After the second retry, confirm completed |
| enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); |
| enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 123)); |
| |
| ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); |
| |
| assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| // initial handshake |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_100B.size()) |
| .build(), |
| // after 2, receive parameters: 40 from 0 by 20 |
| dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 |
| dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 |
| dataChunk(123, TEST_DATA_100B, 20, 40), // retry 1 |
| dataChunk(123, TEST_DATA_100B, 20, 40), // retry 2 |
| // after 4, receive parameters: 80 from 40 by 40 |
| dataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80 |
| dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 |
| dataChunk(123, TEST_DATA_100B, 80, 100), // retry 1 |
| // after 3, receive parameters: 80 from 80 by 10 |
| dataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90 |
| dataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100 |
| dataChunk(123, TEST_DATA_100B, 90, 100), // retry 1 |
| dataChunk(123, TEST_DATA_100B, 90, 100), // retry 2 |
| // after 4, receive final OK |
| newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); |
| } |
| @Test |
| public void write_maxLifetimeRetries() throws Exception { |
| createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO, 5); |
| assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries |
| |
| // Wait for four outgoing packets (Write RPC request and START chunk + 2 retries) |
| enqueueWriteChunks(4, // 2 retries |
| newChunk(Chunk.Type.START_ACK, 123) |
| .setResourceId(5) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| // Wait for start ack confirmation + 2 retries, then request three packets. |
| enqueueWriteChunks(3, // 2 retries |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) |
| .setOffset(0) |
| .setWindowEndOffset(60) |
| .setMaxChunkSizeBytes(20)); |
| |
| // After 3 data packets, wait for two more retries, which should put this over the retry limit. |
| enqueueWriteChunks(5, // 2 retries |
| newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) // This packet should be ignored |
| .setOffset(80) |
| .setWindowEndOffset(200) |
| .setMaxChunkSizeBytes(20)); |
| |
| ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); |
| |
| ExecutionException exception = assertThrows(ExecutionException.class, future::get); |
| assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); |
| |
| final Chunk startAckConfirmation = |
| newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(TEST_DATA_100B.size()) |
| .build(); |
| |
| assertThat(lastChunks()) |
| .containsExactly( |
| // initial chunk and 2 retries |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), |
| // START_ACK_CONFIRMATION and 2 retries |
| startAckConfirmation, |
| startAckConfirmation, |
| startAckConfirmation, |
| // send all data |
| dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 |
| dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 |
| dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60 |
| // last packet retry, then hit the lifetime retry limit and abort |
| dataChunk(123, TEST_DATA_100B, 40, 60)); // data 40-60 |
| } |
| |
| private static ByteString range(int startInclusive, int endExclusive) { |
| assertThat(startInclusive).isLessThan((int) Byte.MAX_VALUE); |
| assertThat(endExclusive).isLessThan((int) Byte.MAX_VALUE); |
| |
| byte[] bytes = new byte[endExclusive - startInclusive]; |
| for (byte i = 0; i < bytes.length; ++i) { |
| bytes[i] = (byte) (i + startInclusive); |
| } |
| return ByteString.copyFrom(bytes); |
| } |
| |
| private static Chunk.Builder newLegacyChunk(Chunk.Type type, int transferId) { |
| return Chunk.newBuilder().setType(type).setTransferId(transferId); |
| } |
| |
| private static Chunk.Builder newChunk(Chunk.Type type, int sessionId) { |
| return Chunk.newBuilder().setType(type).setSessionId(sessionId); |
| } |
| |
| private static Chunk initialReadChunk(int resourceId, ProtocolVersion version) { |
| return initialReadChunk(resourceId, version, TRANSFER_PARAMETERS); |
| } |
| |
| private static Chunk initialReadChunk( |
| int resourceId, ProtocolVersion version, TransferParameters params) { |
| Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) |
| .setResourceId(resourceId) |
| .setPendingBytes(params.maxPendingBytes()) |
| .setWindowEndOffset(params.maxPendingBytes()) |
| .setMaxChunkSizeBytes(params.maxChunkSizeBytes()) |
| .setOffset(0); |
| if (version != ProtocolVersion.LEGACY) { |
| chunk.setProtocolVersion(version.ordinal()); |
| } |
| if (params.chunkDelayMicroseconds() > 0) { |
| chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds()); |
| } |
| return chunk.build(); |
| } |
| |
| private static Chunk readStartAckConfirmation(int sessionId, TransferParameters params) { |
| Chunk.Builder chunk = newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId) |
| .setWindowEndOffset(params.maxPendingBytes()) |
| .setMaxChunkSizeBytes(params.maxChunkSizeBytes()) |
| .setOffset(0) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()); |
| if (params.chunkDelayMicroseconds() > 0) { |
| chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds()); |
| } |
| return chunk.build(); |
| } |
| |
| private static Chunk initialWriteChunk(int resourceId, ProtocolVersion version, int size) { |
| Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) |
| .setResourceId(resourceId) |
| .setRemainingBytes(size); |
| if (version != ProtocolVersion.LEGACY) { |
| chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()); |
| } |
| return chunk.build(); |
| } |
| |
| private static Chunk legacyFinalChunk(int sessionId, Status status) { |
| return newLegacyChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build(); |
| } |
| |
| private static Chunk finalChunk(int sessionId, Status status) { |
| return newChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build(); |
| } |
| |
| private static Chunk legacyDataChunk(int sessionId, ByteString data, int start, int end) { |
| if (start < 0 || end > data.size()) { |
| throw new IndexOutOfBoundsException("Invalid start or end"); |
| } |
| |
| Chunk.Builder chunk = newLegacyChunk(Chunk.Type.DATA, sessionId) |
| .setOffset(start) |
| .setData(data.substring(start, end)); |
| if (end == data.size()) { |
| chunk.setRemainingBytes(0); |
| } |
| return chunk.build(); |
| } |
| |
| private static Chunk dataChunk(int sessionId, ByteString data, int start, int end) { |
| if (start < 0 || end > data.size()) { |
| throw new IndexOutOfBoundsException("Invalid start or end"); |
| } |
| |
| Chunk.Builder chunk = |
| newChunk(Chunk.Type.DATA, sessionId).setOffset(start).setData(data.substring(start, end)); |
| if (end == data.size()) { |
| chunk.setRemainingBytes(0); |
| } |
| return chunk.build(); |
| } |
| |
| /** Runs an action */ |
| private void syncWithTransferThread(Runnable action) { |
| transferClient.waitUntilEventsAreProcessedForTest(); |
| action.run(); |
| transferClient.waitUntilEventsAreProcessedForTest(); |
| } |
| |
| private void receiveReadServerError(Status status) { |
| syncWithTransferThread(() -> rpcClient.receiveServerError(SERVICE, "Read", status)); |
| } |
| |
| private void receiveWriteServerError(Status status) { |
| syncWithTransferThread(() -> rpcClient.receiveServerError(SERVICE, "Write", status)); |
| } |
| |
| private void receiveReadChunks(ChunkOrBuilder... chunks) { |
| for (ChunkOrBuilder chunk : chunks) { |
| syncWithTransferThread(() -> rpcClient.receiveServerStream(SERVICE, "Read", chunk)); |
| } |
| } |
| |
| private void receiveWriteChunks(ChunkOrBuilder... chunks) { |
| for (ChunkOrBuilder chunk : chunks) { |
| syncWithTransferThread(() -> rpcClient.receiveServerStream(SERVICE, "Write", chunk)); |
| } |
| } |
| |
| private void performReadStartHandshake(int resourceId, int sessionId) { |
| performReadStartHandshake( |
| resourceId, sessionId, TransferClient.DEFAULT_READ_TRANSFER_PARAMETERS); |
| } |
| |
| private void performReadStartHandshake(int resourceId, int sessionId, TransferParameters params) { |
| assertThat(lastChunks()) |
| .containsExactly(initialReadChunk(resourceId, ProtocolVersion.VERSION_TWO, params)); |
| |
| receiveReadChunks(newChunk(Chunk.Type.START_ACK, sessionId) |
| .setResourceId(resourceId) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()).containsExactly(readStartAckConfirmation(sessionId, params)); |
| } |
| |
| private void performReadCompletionHandshake(int sessionId, Status status) { |
| assertThat(lastChunks()) |
| .containsExactly(Chunk.newBuilder() |
| .setType(Chunk.Type.COMPLETION) |
| .setSessionId(sessionId) |
| .setStatus(status.ordinal()) |
| .build()); |
| |
| receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, sessionId)); |
| } |
| |
| private void performWriteStartHandshake(int resourceId, int sessionId, int dataSize) { |
| assertThat(lastChunks()) |
| .containsExactly(initialWriteChunk(resourceId, ProtocolVersion.VERSION_TWO, dataSize)); |
| |
| receiveWriteChunks(newChunk(Chunk.Type.START_ACK, sessionId) |
| .setResourceId(resourceId) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); |
| |
| assertThat(lastChunks()) |
| .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId) |
| .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) |
| .setRemainingBytes(dataSize) |
| .build()); |
| } |
| |
| /** Receive these read chunks after a chunk is sent. */ |
| private void enqueueReadChunks(int afterPackets, Chunk.Builder... chunks) { |
| syncWithTransferThread( |
| () -> rpcClient.enqueueServerStream(SERVICE, "Read", afterPackets, chunks)); |
| } |
| |
| /** Receive these write chunks after a chunk is sent. */ |
| private void enqueueWriteChunks(int afterPackets, Chunk.Builder... chunks) { |
| syncWithTransferThread( |
| () -> rpcClient.enqueueServerStream(SERVICE, "Write", afterPackets, chunks)); |
| } |
| |
| private List<Chunk> lastChunks() { |
| transferClient.waitUntilEventsAreProcessedForTest(); |
| return rpcClient.lastClientStreams(Chunk.class); |
| } |
| |
| private void createTransferClientThatMayTimeOut(ProtocolVersion version) { |
| createTransferClientThatMayTimeOut(version, Integer.MAX_VALUE); |
| } |
| |
| private void createTransferClientThatMayTimeOut(ProtocolVersion version, int maxLifetimeRetries) { |
| createTransferClient( |
| version, 1, 1, maxLifetimeRetries, TransferEventHandler::runForTestsThatMustTimeOut); |
| } |
| |
| private void createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion version) { |
| createTransferClient(version, 60000, 60000, Integer.MAX_VALUE, TransferEventHandler::run); |
| } |
| |
| private void createTransferClient(ProtocolVersion version, |
| int transferTimeoutMillis, |
| int initialTransferTimeoutMillis, |
| int maxLifetimeRetries, |
| Consumer<TransferEventHandler> eventHandlerFunction) { |
| if (transferClient != null) { |
| throw new AssertionError("createTransferClient must only be called once!"); |
| } |
| transferClient = new TransferClient(rpcClient.client().method(CHANNEL_ID, SERVICE + "/Read"), |
| rpcClient.client().method(CHANNEL_ID, SERVICE + "/Write"), |
| TransferTimeoutSettings.builder() |
| .setTimeoutMillis(transferTimeoutMillis) |
| .setInitialTimeoutMillis(initialTransferTimeoutMillis) |
| .setMaxRetries(MAX_RETRIES) |
| .setMaxLifetimeRetries(maxLifetimeRetries) |
| .build(), |
| () |
| -> this.shouldAbortFlag, |
| eventHandlerFunction); |
| transferClient.setProtocolVersion(version); |
| } |
| } |