blob: 97fb96999a9ca06b3b1e47b5f6797ae903b5ede7 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_transfer/transfer.h"
#include "gtest/gtest.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_rpc/thread_testing.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer_private/chunk_testing.h"
namespace pw::transfer::test {
namespace {
using namespace std::chrono_literals;
// TODO(frolv): Have a generic way to obtain a thread for testing on any system.
thread::Options& TransferThreadOptions() {
static thread::stl::Options options;
return options;
}
using internal::Chunk;
using internal::ProtocolVersion;
class TestMemoryReader : public stream::SeekableReader {
public:
constexpr TestMemoryReader(std::span<const std::byte> data)
: memory_reader_(data) {}
Status DoSeek(ptrdiff_t offset, Whence origin) override {
if (seek_status.ok()) {
return memory_reader_.Seek(offset, origin);
}
return seek_status;
}
StatusWithSize DoRead(ByteSpan dest) final {
if (!read_status.ok()) {
return StatusWithSize(read_status, 0);
}
auto result = memory_reader_.Read(dest);
return result.ok() ? StatusWithSize(result->size())
: StatusWithSize(result.status(), 0);
}
Status seek_status;
Status read_status;
private:
stream::MemoryReader memory_reader_;
};
class SimpleReadTransfer final : public ReadOnlyHandler {
public:
SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
: ReadOnlyHandler(session_id),
prepare_read_called(false),
finalize_read_called(false),
finalize_read_status(Status::Unknown()),
reader_(data) {}
Status PrepareRead() final {
prepare_read_called = true;
if (!prepare_read_return_status.ok()) {
return prepare_read_return_status;
}
EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
set_reader(reader_);
return OkStatus();
}
void FinalizeRead(Status status) final {
finalize_read_called = true;
finalize_read_status = status;
}
void set_seek_status(Status status) { reader_.seek_status = status; }
void set_read_status(Status status) { reader_.read_status = status; }
bool prepare_read_called;
bool finalize_read_called;
Status prepare_read_return_status;
Status finalize_read_status;
private:
TestMemoryReader reader_;
};
constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
class ReadTransfer : public ::testing::Test {
protected:
ReadTransfer(size_t max_chunk_size_bytes = 64)
: handler_(3, kData),
transfer_thread_(std::span(data_buffer_).first(max_chunk_size_bytes),
encode_buffer_),
ctx_(transfer_thread_, 64),
system_thread_(TransferThreadOptions(), transfer_thread_) {
ctx_.service().RegisterHandler(handler_);
ASSERT_FALSE(handler_.prepare_read_called);
ASSERT_FALSE(handler_.finalize_read_called);
ctx_.call(); // Open the read stream
transfer_thread_.WaitUntilEventIsProcessed();
}
~ReadTransfer() {
transfer_thread_.Terminate();
system_thread_.join();
}
SimpleReadTransfer handler_;
Thread<1, 1> transfer_thread_;
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
thread::Thread system_thread_;
std::array<std::byte, 64> data_buffer_;
std::array<std::byte, 64> encode_buffer_;
};
TEST_F(ReadTransfer, SingleChunk) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
});
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
// First chunk should have all the read data.
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.payload().size(), kData.size());
EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
0);
// Second chunk should be empty and set remaining_bytes = 0.
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_FALSE(c1.has_payload());
ASSERT_TRUE(c1.remaining_bytes().has_value());
EXPECT_EQ(c1.remaining_bytes().value(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, MultiChunk) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk c0 = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.payload().size(), 16u);
EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
0);
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
.set_session_id(3)
.set_window_end_offset(32)
.set_offset(16)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c1 = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_EQ(c1.offset(), 16u);
ASSERT_EQ(c1.payload().size(), 16u);
EXPECT_EQ(
std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
0);
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
.set_session_id(3)
.set_window_end_offset(48)
.set_offset(32)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(c2.session_id(), 3u);
EXPECT_FALSE(c2.has_payload());
ASSERT_TRUE(c2.remaining_bytes().has_value());
EXPECT_EQ(c2.remaining_bytes().value(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
const auto continue_chunk = EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
.set_session_id(3)
.set_window_end_offset(24)
.set_offset(16));
ctx_.SendClientStream(continue_chunk);
transfer_thread_.WaitUntilEventIsProcessed();
// Resend the CONTINUE packets that don't actually advance the window.
for (int i = 0; i < 3; ++i) {
ctx_.SendClientStream(continue_chunk);
transfer_thread_.WaitUntilEventIsProcessed();
}
ASSERT_EQ(ctx_.total_responses(), 2u); // Only sent one packet
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_EQ(c1.offset(), 16u);
ASSERT_EQ(c1.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
0);
}
TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_TRUE(std::equal(
&kData[0], &kData[16], chunk.payload().begin(), chunk.payload().end()));
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(10)
.set_offset(2)));
transfer_thread_.WaitUntilEventIsProcessed();
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_TRUE(std::equal(
&kData[2], &kData[10], chunk.payload().begin(), chunk.payload().end()));
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(17)));
});
ASSERT_EQ(ctx_.total_responses(), 4u);
Chunk chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_TRUE(std::equal(
&kData[17], kData.end(), chunk.payload().begin(), chunk.payload().end()));
}
TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
handler_.set_seek_status(Status::Unimplemented());
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(10)
.set_offset(2)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
}
TEST_F(ReadTransfer, MaxChunkSize_Client) {
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_max_chunk_size_bytes(8)
.set_offset(0)));
});
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 5u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.payload().size(), 8u);
EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
0);
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_EQ(c1.offset(), 8u);
ASSERT_EQ(c1.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
0);
EXPECT_EQ(c2.session_id(), 3u);
EXPECT_EQ(c2.offset(), 16u);
ASSERT_EQ(c2.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
0);
EXPECT_EQ(c3.session_id(), 3u);
EXPECT_EQ(c3.offset(), 24u);
ASSERT_EQ(c3.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
0);
EXPECT_EQ(c4.session_id(), 3u);
EXPECT_EQ(c4.payload().size(), 0u);
ASSERT_TRUE(c4.remaining_bytes().has_value());
EXPECT_EQ(c4.remaining_bytes().value(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.prepare_read_called);
ASSERT_TRUE(handler_.finalize_read_called);
ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
// Now, clear state and start a second transfer
handler_.prepare_read_return_status = Status::FailedPrecondition();
handler_.prepare_read_called = false;
handler_.finalize_read_called = false;
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
// Prepare failed, so the handler should not have been stored in the context,
// and finalize should not have been called.
ASSERT_TRUE(handler_.prepare_read_called);
ASSERT_FALSE(handler_.finalize_read_called);
}
class ReadTransferMaxChunkSize8 : public ReadTransfer {
protected:
ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
};
TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
// Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
// TODO(frolv): Fix
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
// .set_max_chunk_size_bytes(16)
.set_offset(0)));
});
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 5u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.payload().size(), 8u);
EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
0);
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_EQ(c1.offset(), 8u);
ASSERT_EQ(c1.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
0);
EXPECT_EQ(c2.session_id(), 3u);
EXPECT_EQ(c2.offset(), 16u);
ASSERT_EQ(c2.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
0);
EXPECT_EQ(c3.session_id(), 3u);
EXPECT_EQ(c3.offset(), 24u);
ASSERT_EQ(c3.payload().size(), 8u);
EXPECT_EQ(
std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
0);
EXPECT_EQ(c4.session_id(), 3u);
EXPECT_EQ(c4.payload().size(), 0u);
ASSERT_TRUE(c4.remaining_bytes().has_value());
EXPECT_EQ(c4.remaining_bytes().value(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, ClientError) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
// Send client error.
ctx_.SendClientStream(EncodeChunk(
Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
}
TEST_F(ReadTransfer, UnregisteredHandler) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(11)
.set_window_end_offset(32)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 11u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::NotFound());
}
TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(32)
.set_offset(3)));
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(3)
.set_payload(std::span(kData).first(10))
.set_offset(3)));
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
// Only start transfer for an initial packet.
EXPECT_FALSE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
}
TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
handler_.prepare_read_called = false; // Reset so can check if called again.
ctx_.SendClientStream( // Resend starting chunk
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
handler_.finalize_read_called = false; // Reset so can check later
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(32)
.set_offset(16)));
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(0)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
}
TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
// Make the next read appear to be the end of the stream.
handler_.set_read_status(Status::OutOfRange());
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(0)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 3u);
EXPECT_EQ(chunk.remaining_bytes(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
});
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
// First chunk should have all the read data.
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
ASSERT_EQ(c0.payload().size(), kData.size());
EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
0);
// Second chunk should be empty and set remaining_bytes = 0.
EXPECT_EQ(c1.session_id(), 3u);
EXPECT_FALSE(c1.has_payload());
ASSERT_TRUE(c1.remaining_bytes().has_value());
EXPECT_EQ(c1.remaining_bytes().value(), 0u);
ctx_.SendClientStream(
EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
// At this point the transfer should be in a completed state. Send a
// non-initial chunk as a continuation of the transfer.
handler_.finalize_read_called = false;
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(16)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
ASSERT_TRUE(c2.status().has_value());
EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());
// FinalizeRead should not be called again.
EXPECT_FALSE(handler_.finalize_read_called);
}
class SimpleWriteTransfer final : public WriteOnlyHandler {
public:
SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
: WriteOnlyHandler(session_id),
prepare_write_called(false),
finalize_write_called(false),
finalize_write_status(Status::Unknown()),
writer_(data) {}
Status PrepareWrite() final {
EXPECT_EQ(OkStatus(), writer_.Seek(0));
set_writer(writer_);
prepare_write_called = true;
return OkStatus();
}
Status FinalizeWrite(Status status) final {
finalize_write_called = true;
finalize_write_status = status;
return finalize_write_return_status_;
}
void set_finalize_write_return(Status status) {
finalize_write_return_status_ = status;
}
bool prepare_write_called;
bool finalize_write_called;
Status finalize_write_status;
private:
Status finalize_write_return_status_;
stream::MemoryWriter writer_;
};
class WriteTransfer : public ::testing::Test {
protected:
WriteTransfer(size_t max_bytes_to_receive = 64)
: buffer{},
handler_(7, buffer),
transfer_thread_(data_buffer_, encode_buffer_),
system_thread_(TransferThreadOptions(), transfer_thread_),
ctx_(transfer_thread_,
max_bytes_to_receive,
// Use a long timeout to avoid accidentally triggering timeouts.
std::chrono::minutes(1)) {
ctx_.service().RegisterHandler(handler_);
ASSERT_FALSE(handler_.prepare_write_called);
ASSERT_FALSE(handler_.finalize_write_called);
ctx_.call(); // Open the write stream
transfer_thread_.WaitUntilEventIsProcessed();
}
~WriteTransfer() {
transfer_thread_.Terminate();
system_thread_.join();
}
std::array<std::byte, kData.size()> buffer;
SimpleWriteTransfer handler_;
Thread<1, 1> transfer_thread_;
thread::Thread system_thread_;
std::array<std::byte, 64> data_buffer_;
std::array<std::byte, 64> encode_buffer_;
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
};
TEST_F(WriteTransfer, SingleChunk) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransfer, FinalizeFails) {
// Return an error when FinalizeWrite is called.
handler_.set_finalize_write_return(Status::FailedPrecondition());
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::DataLoss());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, SendingFinalPacketFails) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ctx_.output().set_send_status(Status::Unknown());
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
// Should only have sent the transfer parameters.
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
// When FinalizeWrite() was called, the transfer was considered successful.
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, MultiChunk) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(8)
.set_payload(std::span(kData).subspan(8))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransfer, WriteFailsOnRetry) {
// Skip one packet to fail on a retry.
ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
// Wait for 3 packets: initial params, retry attempt, final error
rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
// Send only one client packet so the service times out.
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
});
// Attempted to send 3 packets, but the 2nd packet was dropped.
// Check that the last packet is an INTERNAL error from the RPC write failure.
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::Internal());
}
TEST_F(WriteTransfer, TimeoutInRecoveryState) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
constexpr std::span data(kData);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(data.first(8))));
// Skip offset 8 to enter a recovery state.
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(12)
.set_payload(data.subspan(12, 4))));
transfer_thread_.WaitUntilEventIsProcessed();
// Recovery parameters should be sent for offset 8.
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 8u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
// Timeout while in the recovery state.
transfer_thread_.SimulateServerTimeout(7);
transfer_thread_.WaitUntilEventIsProcessed();
// Same recovery parameters should be re-sent.
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 8u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
}
TEST_F(WriteTransfer, ExtendWindow) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
// Window starts at 32 bytes and should extend when half of that is sent.
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(4))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(4)
.set_payload(std::span(kData).subspan(4, 4))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(8)
.set_payload(std::span(kData).subspan(8, 4))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(12)
.set_payload(std::span(kData).subspan(12, 4))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
// Extend parameters chunk.
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
EXPECT_EQ(chunk.legacy_type(), Chunk::Type::kParametersContinue);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(16)
.set_payload(std::span(kData).subspan(16))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
class WriteTransferMaxBytes16 : public WriteTransfer {
protected:
WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
};
TEST_F(WriteTransfer, TransmitterReducesWindow) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
// Send only 12 bytes and set that as the new end offset.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_window_end_offset(12)
.set_payload(std::span(kData).first(12))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
// Receiver should respond immediately with a retransmit chunk as the end of
// the window has been reached.
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 12u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
EXPECT_EQ(chunk.legacy_type(), Chunk::Type::kParametersRetransmit);
}
TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
// Larger window end offset than the receiver's.
.set_window_end_offset(48)
.set_payload(std::span(kData).first(16))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::Internal());
}
TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 16u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 8u);
EXPECT_EQ(chunk.window_end_offset(), 24u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(8)
.set_payload(std::span(kData).subspan(8, 8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 16u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(16)
.set_payload(std::span(kData).subspan(16, 8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 4u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 24u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(24)
.set_payload(std::span(kData).subspan(24))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 5u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
// Default max bytes is smaller than buffer.
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 16u);
}
TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
// Buffer is smaller than constructor's default max bytes.
std::array<std::byte, 8> small_buffer = {};
SimpleWriteTransfer handler_(987, small_buffer);
ctx_.service().RegisterHandler(handler_);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(987)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 987u);
EXPECT_EQ(chunk.window_end_offset(), 8u);
}
TEST_F(WriteTransfer, UnexpectedOffset) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(4) // incorrect
.set_payload(std::span(kData).subspan(8))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 8u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(8) // correct
.set_payload(std::span(kData).subspan(8))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransferMaxBytes16, TooMuchData) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 16u);
// window_end_offset = 16, but send 24 bytes of data.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(24))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::Internal());
}
TEST_F(WriteTransfer, UnregisteredHandler) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(999)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 999u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::NotFound());
}
TEST_F(WriteTransfer, ClientError) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(EncodeChunk(
Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
}
TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
constexpr std::span data(kData);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(data.first(1))));
// Drop offset 1, then send the rest of the data.
for (uint32_t i = 2; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(i)
.set_payload(data.subspan(i, 1))));
}
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 1u);
// Send the remaining data.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(1)
.set_payload(data.subspan(1, 31))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
constexpr std::span data(kData);
// Skip offset 0, then send the rest of the data.
for (uint32_t i = 1; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(i)
.set_payload(data.subspan(i, 1))));
}
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
const auto last_chunk =
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(kData.size() - 1)
.set_payload(data.last(1)));
ctx_.SendClientStream<64>(last_chunk);
transfer_thread_.WaitUntilEventIsProcessed();
// Resent transfer parameters since the packet is repeated
ASSERT_EQ(ctx_.total_responses(), 3u);
ctx_.SendClientStream<64>(last_chunk);
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 4u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
// Resumes normal operation when correct offset is sent.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());
}
TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(3)));
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(10))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
// Only start transfer for initial packet.
EXPECT_FALSE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
}
TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(8))));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.prepare_write_called);
ASSERT_FALSE(handler_.finalize_write_called);
handler_.prepare_write_called = false; // Reset to check it's called again.
// Simulate client disappearing then restarting the transfer.
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
handler_.finalize_write_called = false; // Reset to check it's called again.
ASSERT_EQ(ctx_.total_responses(), 2u);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 3u);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
public:
SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
: ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}
Status PrepareRead() final {
if ((call_count_++ % 2) == 0) {
return Status::Unavailable();
}
set_reader(reader_);
return OkStatus();
}
private:
stream::MemoryReader reader_;
int call_count_;
};
TEST_F(ReadTransfer, PrepareError) {
SometimesUnavailableReadHandler unavailable_handler(88, kData);
ctx_.service().RegisterHandler(unavailable_handler);
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(88)
.set_window_end_offset(128)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 88u);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), Status::DataLoss());
// Try starting the transfer again. It should work this time.
// TODO(frolv): This won't work until completion ACKs are supported.
if (false) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(88)
.set_window_end_offset(128)
.set_offset(0)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 88u);
ASSERT_EQ(chunk.payload().size(), kData.size());
EXPECT_EQ(std::memcmp(
chunk.payload().data(), kData.data(), chunk.payload().size()),
0);
}
}
TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
.set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
// First parameters chunk has the default window end offset of 16.
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.window_end_offset(), 16u);
// Update the pending bytes value.
ctx_.service().set_max_pending_bytes(12);
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
.set_session_id(7)
.set_offset(0)
.set_payload(std::span(kData).first(8))));
transfer_thread_.WaitUntilEventIsProcessed();
// Second parameters chunk should use the new max pending bytes.
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), 7u);
EXPECT_EQ(chunk.offset(), 8u);
EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
}
} // namespace
} // namespace pw::transfer::test