blob: 69bed5bd051c0f0f18177d2f382eacd072ebeb18 [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_transfer/transfer.h"
#include "gtest/gtest.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer_private/chunk_testing.h"
namespace pw::transfer::test {
namespace {
PW_MODIFY_DIAGNOSTICS_PUSH();
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
using internal::Chunk;
class TestMemoryReader : public stream::SeekableReader {
public:
constexpr TestMemoryReader(std::span<const std::byte> data)
: memory_reader_(data) {}
Status DoSeek(ptrdiff_t offset, Whence origin) override {
if (seek_status.ok()) {
return memory_reader_.Seek(offset, origin);
}
return seek_status;
}
StatusWithSize DoRead(ByteSpan dest) final {
if (!read_status.ok()) {
return StatusWithSize(read_status, 0);
}
auto result = memory_reader_.Read(dest);
return result.ok() ? StatusWithSize(result->size())
: StatusWithSize(result.status(), 0);
}
Status seek_status;
Status read_status;
private:
stream::MemoryReader memory_reader_;
};
class SimpleReadTransfer final : public ReadOnlyHandler {
public:
SimpleReadTransfer(uint32_t transfer_id, ConstByteSpan data)
: ReadOnlyHandler(transfer_id),
prepare_read_called(false),
finalize_read_called(false),
finalize_read_status(Status::Unknown()),
reader_(data) {}
Status PrepareRead() final {
reader_.Seek(0);
set_reader(reader_);
prepare_read_called = true;
return OkStatus();
}
void FinalizeRead(Status status) final {
finalize_read_called = true;
finalize_read_status = status;
}
void set_seek_status(Status status) { reader_.seek_status = status; }
void set_read_status(Status status) { reader_.read_status = status; }
bool prepare_read_called;
bool finalize_read_called;
Status finalize_read_status;
private:
TestMemoryReader reader_;
};
constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
class ReadTransfer : public ::testing::Test {
protected:
ReadTransfer(size_t max_chunk_size_bytes = 64)
: handler_(3, kData),
ctx_(work_queue_,
std::span(data_buffer_).first(max_chunk_size_bytes),
64) {
ctx_.service().RegisterHandler(handler_);
ASSERT_FALSE(handler_.prepare_read_called);
ASSERT_FALSE(handler_.finalize_read_called);
ctx_.call(); // Open the read stream
}
SimpleReadTransfer handler_;
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
std::array<std::byte, 64> data_buffer_;
// Not currently used in the tests, so left uninitialized.
work_queue::WorkQueueWithBuffer<1> work_queue_;
};
TEST_F(ReadTransfer, SingleChunk) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3,
.window_end_offset = 64,
.pending_bytes = 64,
.offset = 0,
.type = Chunk::Type::kParametersRetransmit}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
// First chunk should have all the read data.
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), kData.size());
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
// Second chunk should be empty and set remaining_bytes = 0.
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.data.size(), 0u);
ASSERT_TRUE(c1.remaining_bytes.has_value());
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
// First chunk should have all the read data.
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), kData.size());
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
// Second chunk should be empty and set remaining_bytes = 0.
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.data.size(), 0u);
ASSERT_TRUE(c1.remaining_bytes.has_value());
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, MultiChunk) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3,
.window_end_offset = 16,
.pending_bytes = 16,
.offset = 0,
.type = Chunk::Type::kParametersRetransmit}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), 16u);
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3,
.window_end_offset = 32,
.pending_bytes = 16,
.offset = 16,
.type = Chunk::Type::kParametersContinue}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.offset, 16u);
ASSERT_EQ(c1.data.size(), 16u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3,
.window_end_offset = 48,
.pending_bytes = 16,
.offset = 32,
.type = Chunk::Type::kParametersContinue}));
ASSERT_EQ(ctx_.total_responses(), 3u);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(c2.transfer_id, 3u);
EXPECT_EQ(c2.data.size(), 0u);
ASSERT_TRUE(c2.remaining_bytes.has_value());
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, PendingBytes_MultiChunk) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), 16u);
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.offset, 16u);
ASSERT_EQ(c1.data.size(), 16u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 32}));
ASSERT_EQ(ctx_.total_responses(), 3u);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(c2.transfer_id, 3u);
EXPECT_EQ(c2.data.size(), 0u);
ASSERT_TRUE(c2.remaining_bytes.has_value());
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_TRUE(
std::equal(&kData[0], &kData[16], chunk.data.begin(), chunk.data.end()));
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_TRUE(
std::equal(&kData[2], &kData[10], chunk.data.begin(), chunk.data.end()));
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 17}));
ASSERT_EQ(ctx_.total_responses(), 4u);
chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_TRUE(std::equal(
&kData[17], kData.end(), chunk.data.begin(), chunk.data.end()));
}
TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
handler_.set_seek_status(Status::Unimplemented());
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.status, Status::Unimplemented());
}
TEST_F(ReadTransfer, MaxChunkSize_Client) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
.pending_bytes = 64,
.max_chunk_size_bytes = 8,
.offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 5u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), 8u);
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.offset, 8u);
ASSERT_EQ(c1.data.size(), 8u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
EXPECT_EQ(c2.transfer_id, 3u);
EXPECT_EQ(c2.offset, 16u);
ASSERT_EQ(c2.data.size(), 8u);
EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
EXPECT_EQ(c3.transfer_id, 3u);
EXPECT_EQ(c3.offset, 24u);
ASSERT_EQ(c3.data.size(), 8u);
EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
EXPECT_EQ(c4.transfer_id, 3u);
EXPECT_EQ(c4.data.size(), 0u);
ASSERT_TRUE(c4.remaining_bytes.has_value());
EXPECT_EQ(c4.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
class ReadTransferMaxChunkSize8 : public ReadTransfer {
protected:
ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunK_size_bytes=*/8) {}
};
TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
// Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
.pending_bytes = 64,
.max_chunk_size_bytes = 16,
.offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 5u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), 8u);
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.offset, 8u);
ASSERT_EQ(c1.data.size(), 8u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
EXPECT_EQ(c2.transfer_id, 3u);
EXPECT_EQ(c2.offset, 16u);
ASSERT_EQ(c2.data.size(), 8u);
EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
EXPECT_EQ(c3.transfer_id, 3u);
EXPECT_EQ(c3.offset, 24u);
ASSERT_EQ(c3.data.size(), 8u);
EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
EXPECT_EQ(c4.transfer_id, 3u);
EXPECT_EQ(c4.data.size(), 0u);
ASSERT_TRUE(c4.remaining_bytes.has_value());
EXPECT_EQ(c4.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, ClientError) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
// Send client error.
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .status = Status::OutOfRange()}));
ASSERT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
}
TEST_F(ReadTransfer, MalformedParametersChunk) {
// pending_bytes is required in a parameters chunk.
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::InvalidArgument());
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 3u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::InvalidArgument());
}
TEST_F(ReadTransfer, UnregisteredHandler) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 11, .pending_bytes = 32, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 11u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::NotFound());
}
TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .offset = 3}));
ctx_.SendClientStream(EncodeChunk(
{.transfer_id = 3, .offset = 0, .data = std::span(kData).first(10)}));
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
// Only start transfer for initial packet.
EXPECT_FALSE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
}
TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
handler_.prepare_read_called = false; // Reset so can check if called again.
ctx_.SendClientStream( // Resend starting chunk
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
handler_.finalize_read_called = false; // Reset so can check later
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
ASSERT_EQ(ctx_.total_responses(), 3u);
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.status, Status::ResourceExhausted());
}
TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
// Make the next read appear to be the end of the stream.
handler_.set_read_status(Status::OutOfRange());
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.transfer_id, 3u);
EXPECT_EQ(chunk.remaining_bytes, 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 0}));
EXPECT_TRUE(handler_.prepare_read_called);
EXPECT_FALSE(handler_.finalize_read_called);
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
// First chunk should have all the read data.
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.data.size(), kData.size());
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
// Second chunk should be empty and set remaining_bytes = 0.
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.data.size(), 0u);
ASSERT_TRUE(c1.remaining_bytes.has_value());
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_read_called);
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
// At this point the transfer should be in a completed state. Send a
// non-initial chunk as a continuation of the transfer.
handler_.finalize_read_called = false;
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 3, .pending_bytes = 48, .offset = 16}));
ASSERT_EQ(ctx_.total_responses(), 3u);
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
ASSERT_TRUE(c2.status.has_value());
EXPECT_EQ(c2.status.value(), Status::FailedPrecondition());
// FinalizeRead should not be called again.
EXPECT_FALSE(handler_.finalize_read_called);
}
class SimpleWriteTransfer final : public WriteOnlyHandler {
public:
SimpleWriteTransfer(uint32_t transfer_id, ByteSpan data)
: WriteOnlyHandler(transfer_id),
prepare_write_called(false),
finalize_write_called(false),
finalize_write_status(Status::Unknown()),
writer_(data) {}
Status PrepareWrite() final {
writer_.Seek(0);
set_writer(writer_);
prepare_write_called = true;
return OkStatus();
}
Status FinalizeWrite(Status status) final {
finalize_write_called = true;
finalize_write_status = status;
return finalize_write_return_status_;
}
void set_finalize_write_return(Status status) {
finalize_write_return_status_ = status;
}
bool prepare_write_called;
bool finalize_write_called;
Status finalize_write_status;
private:
Status finalize_write_return_status_;
stream::MemoryWriter writer_;
};
class WriteTransfer : public ::testing::Test {
protected:
WriteTransfer(size_t max_bytes_to_receive = 64)
: buffer{},
handler_(7, buffer),
ctx_(work_queue_, data_buffer_, max_bytes_to_receive) {
ctx_.service().RegisterHandler(handler_);
ASSERT_FALSE(handler_.prepare_write_called);
ASSERT_FALSE(handler_.finalize_write_called);
ctx_.call(); // Open the write stream
}
std::array<std::byte, kData.size()> buffer;
SimpleWriteTransfer handler_;
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
std::array<std::byte, 64> data_buffer_;
// Not currently used in the tests, so left uninitialized.
work_queue::WorkQueueWithBuffer<1> work_queue_;
};
TEST_F(WriteTransfer, SingleChunk) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransfer, FinalizeFails) {
// Return an error when FinalizeWrite is called.
handler_.set_finalize_write_return(Status::FailedPrecondition());
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::DataLoss());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, MultiChunk) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 8,
.data = std::span(kData).subspan(8),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransfer, ExtendWindow) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.window_end_offset, 32u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
// Window starts at 32 bytes and should extend when half of that is sent.
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(4)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 4, .data = std::span(kData).subspan(4, 4)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 4)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
EncodeChunk({.transfer_id = 7,
.offset = 12,
.data = std::span(kData).subspan(12, 4)}));
ASSERT_EQ(ctx_.total_responses(), 2u);
// Extend parameters chunk.
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.window_end_offset, 32u);
EXPECT_EQ(chunk.type, Chunk::Type::kParametersContinue);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 16,
.data = std::span(kData).subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
class WriteTransferMaxBytes16 : public WriteTransfer {
protected:
WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
};
TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 8u);
EXPECT_EQ(chunk.window_end_offset, 24u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 8)}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 16u);
EXPECT_EQ(chunk.window_end_offset, 32u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
ctx_.SendClientStream<64>(
EncodeChunk({.transfer_id = 7,
.offset = 16,
.data = std::span(kData).subspan(16, 8)}));
ASSERT_EQ(ctx_.total_responses(), 4u);
chunk = DecodeChunk(ctx_.responses()[3]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 24u);
EXPECT_EQ(chunk.window_end_offset, 32u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 8u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 24,
.data = std::span(kData).subspan(24),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 5u);
chunk = DecodeChunk(ctx_.responses()[4]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransferMaxBytes16, SetsDefaultPendingBytes) {
// Default max bytes is smaller than buffer.
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
}
TEST_F(WriteTransfer, SetsWriterPendingBytes) {
// Buffer is smaller than constructor's default max bytes.
std::array<std::byte, 8> small_buffer = {};
SimpleWriteTransfer handler_(987, small_buffer);
ctx_.service().RegisterHandler(handler_);
ctx_.SendClientStream(EncodeChunk({.transfer_id = 987}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 987u);
EXPECT_EQ(chunk.pending_bytes.value(), 8u);
}
TEST_F(WriteTransfer, UnexpectedOffset) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 0u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 4, // incorrect
.data = std::span(kData).subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 8u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 24u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 8, // correct
.data = std::span(kData).subspan(8),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses()[2]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
TEST_F(WriteTransferMaxBytes16, TooMuchData) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
// pending_bytes = 16 but send 24
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(24)}));
ASSERT_EQ(ctx_.total_responses(), 2u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::Internal());
}
TEST_F(WriteTransfer, UnregisteredHandler) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 999}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 999u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::NotFound());
}
TEST_F(WriteTransfer, ClientError) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 7u);
ASSERT_TRUE(chunk.pending_bytes.has_value());
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
ctx_.SendClientStream<64>(
EncodeChunk({.transfer_id = 7, .status = Status::DataLoss()}));
EXPECT_EQ(ctx_.total_responses(), 1u);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
}
TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
constexpr std::span data(kData);
ctx_.SendClientStream<64>(
EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(1)}));
// Drop offset 1, then send the rest of the data.
for (uint32_t i = 2; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
}
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 1u);
// Send the remaining data and the final status.
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 1,
.data = data.subspan(1, 31),
.status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
constexpr std::span data(kData);
// Skip offset 0, then send the rest of the data.
for (uint32_t i = 1; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
}
ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
const auto last_chunk = EncodeChunk(
{.transfer_id = 7, .offset = kData.size() - 1, .data = data.last(1)});
ctx_.SendClientStream<64>(last_chunk);
// Resent transfer parameters since the packet is repeated
ASSERT_EQ(ctx_.total_responses(), 3u);
ctx_.SendClientStream<64>(last_chunk);
ASSERT_EQ(ctx_.total_responses(), 4u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.transfer_id, 7u);
EXPECT_EQ(chunk.offset, 0u);
EXPECT_TRUE(chunk.pending_bytes.has_value());
// Resumes normal operation when correct offset is sent.
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = kData, .status = OkStatus()}));
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}
TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
}
TEST_F(WriteTransfer, RejectsNonFinalChunksAfterCompleted) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 2u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), OkStatus());
ctx_.SendClientStream<64>( // Don't set remaining_bytes=0
EncodeChunk({.transfer_id = 7, .offset = 0, .data = std::span(kData)}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses().back());
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::FailedPrecondition());
}
TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .offset = 3}));
ctx_.SendClientStream(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(10)}));
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .status = OkStatus()}));
// Only start transfer for initial packet.
EXPECT_FALSE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);
}
TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(EncodeChunk(
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
ASSERT_EQ(ctx_.total_responses(), 1u);
ASSERT_TRUE(handler_.prepare_write_called);
ASSERT_FALSE(handler_.finalize_write_called);
handler_.prepare_write_called = false; // Reset to check it's called again.
// Simulate client disappearing then restarting the transfer.
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
handler_.finalize_write_called = false; // Reset to check it's called again.
ASSERT_EQ(ctx_.total_responses(), 2u);
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
.offset = 0,
.data = std::span(kData),
.remaining_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 3u);
EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}
class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
public:
SometimesUnavailableReadHandler(uint32_t transfer_id, ConstByteSpan data)
: ReadOnlyHandler(transfer_id), reader_(data), call_count_(0) {}
Status PrepareRead() final {
if ((call_count_++ % 2) == 0) {
return Status::Unavailable();
}
set_reader(reader_);
return OkStatus();
}
private:
stream::MemoryReader reader_;
int call_count_;
};
TEST_F(ReadTransfer, PrepareError) {
SometimesUnavailableReadHandler unavailable_handler(88, kData);
ctx_.service().RegisterHandler(unavailable_handler);
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
EXPECT_EQ(chunk.transfer_id, 88u);
ASSERT_TRUE(chunk.status.has_value());
EXPECT_EQ(chunk.status.value(), Status::DataLoss());
// Try starting the transfer again. It should work this time.
ctx_.SendClientStream(
EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
ASSERT_EQ(ctx_.total_responses(), 3u);
chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.transfer_id, 88u);
ASSERT_EQ(chunk.data.size(), kData.size());
EXPECT_EQ(std::memcmp(chunk.data.data(), kData.data(), chunk.data.size()), 0);
}
PW_MODIFY_DIAGNOSTICS_POP();
} // namespace
} // namespace pw::transfer::test