blob: 2303bb826735703af381f587438ec44b020621f7 [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_transfer/client.h"
#include <cstring>
#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_rpc/raw/client_testing.h"
#include "pw_thread/sleep.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer_private/chunk_testing.h"
namespace pw::transfer::test {
namespace {
using internal::Chunk;
using pw_rpc::raw::Transfer;
thread::Options& WorkQueueThreadOptions() {
static thread::stl::Options options;
return options;
}
class ReadTransfer : public ::testing::Test {
protected:
ReadTransfer(size_t max_bytes_to_receive = 0)
: client_(context_.client(),
context_.channel().id(),
work_queue_,
data_buffer_,
max_bytes_to_receive),
work_queue_thread_(WorkQueueThreadOptions(), work_queue_) {}
~ReadTransfer() {
work_queue_.RequestStop();
work_queue_thread_.join();
}
rpc::RawClientTestContext<> context_;
Client client_;
std::array<std::byte, 64> data_buffer_;
work_queue::WorkQueueWithBuffer<4> work_queue_;
thread::Thread work_queue_thread_;
};
constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
TEST_F(ReadTransfer, SingleChunk) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(3, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 3u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
{.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 3u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);
}
TEST_F(ReadTransfer, MultiChunk) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(4, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 4u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)}));
ASSERT_EQ(payloads.size(), 1u);
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 4u,
.offset = 16,
.data = data.subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 4u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);
}
TEST_F(ReadTransfer, MultipleTransfers) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
client_.Read(3, writer, [&transfer_status](Status status) {
transfer_status = status;
}));
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
{.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
ASSERT_EQ(transfer_status, OkStatus());
transfer_status = Status::Unknown();
ASSERT_EQ(OkStatus(),
client_.Read(3, writer, [&transfer_status](Status status) {
transfer_status = status;
}));
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
{.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(ReadTransfer, BusyTransferReturnsAlreadyExists) {
stream::MemoryWriterBuffer<64> writer;
ASSERT_EQ(OkStatus(), client_.Read(3, writer, [](Status) {}));
EXPECT_EQ(Status::AlreadyExists(), client_.Read(3, writer, [](Status) {}));
}
class ReadTransferMaxBytes32 : public ReadTransfer {
protected:
ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
};
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
stream::MemoryWriterBuffer<64> writer;
client_.Read(5, writer, [](Status) {});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 5u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 32u);
}
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
stream::MemoryWriterBuffer<16> small_writer;
client_.Read(5, small_writer, [](Status) {});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 5u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 16u);
}
TEST_F(ReadTransferMaxBytes32, MultiParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(6, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 6u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 32u);
constexpr ConstByteSpan data(kData64);
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)}));
ASSERT_EQ(payloads.size(), 2u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Second parameters chunk.
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 6u);
EXPECT_EQ(c1.offset, 32u);
ASSERT_EQ(c1.pending_bytes.value(), 32u);
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 6u,
.offset = 32,
.data = data.subspan(32),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 6u);
ASSERT_TRUE(c2.status.has_value());
EXPECT_EQ(c2.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
}
TEST_F(ReadTransfer, UnexpectedOffset) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(7, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 7u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)}));
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send a chunk with an incorrect offset. The client should resend parameters.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 7u,
.offset = 8, // wrong!
.data = data.subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 2u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 7u);
EXPECT_EQ(c1.offset, 16u);
EXPECT_EQ(c1.pending_bytes.value(), 48u);
// Send the correct chunk, completing the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 7u,
.offset = 16,
.data = data.subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 7u);
ASSERT_TRUE(c2.status.has_value());
EXPECT_EQ(c2.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);
}
TEST_F(ReadTransferMaxBytes32, TooMuchData) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(8, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 8u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 32u);
constexpr ConstByteSpan data(kData64);
// pending_bytes == 32
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)}));
// pending_bytes == 16
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
{.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)}));
// pending_bytes == 8, send 16 instead.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
{.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)}));
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 8u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), Status::Internal());
EXPECT_EQ(transfer_status, Status::Internal());
}
TEST_F(ReadTransfer, ServerError) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(9, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 9u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 64u);
// Server sends an error. Client should not respond and terminate the
// transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()}));
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::NotFound());
}
TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(10, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 10u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 64u);
constexpr ConstByteSpan data(kData64);
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)}));
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 10u,
.offset = offset,
.data = data.subspan(offset, 8)}));
}
// Only one parameters update should be sent, with the offset of the initial
// dropped packet.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 10u);
EXPECT_EQ(c1.offset, 8u);
ASSERT_EQ(c1.pending_bytes.value(), 56u);
// Send the remaining data to complete the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 10u,
.offset = 8,
.data = data.subspan(8, 56),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 10u);
ASSERT_TRUE(c2.status.has_value());
EXPECT_EQ(c2.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(11, writer, [&transfer_status](Status status) {
transfer_status = status;
});
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 11u);
EXPECT_EQ(c0.offset, 0u);
ASSERT_EQ(c0.pending_bytes.value(), 64u);
constexpr ConstByteSpan data(kData64);
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)}));
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 11u,
.offset = offset,
.data = data.subspan(offset, 8)}));
}
// Only one parameters update should be sent, with the offset of the initial
// dropped packet.
ASSERT_EQ(payloads.size(), 2u);
const Chunk last_chunk = {
.transfer_id = 11u, .offset = 56, .data = data.subspan(56)};
// Re-send the final chunk of the block.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
// The original drop parameters should be re-sent.
ASSERT_EQ(payloads.size(), 3u);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 11u);
EXPECT_EQ(c2.offset, 8u);
ASSERT_EQ(c2.pending_bytes.value(), 56u);
// Do it again.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
EXPECT_EQ(c3.transfer_id, 11u);
EXPECT_EQ(c3.offset, 8u);
ASSERT_EQ(c3.pending_bytes.value(), 56u);
// Finish the transfer normally.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 11u,
.offset = 8,
.data = data.subspan(8, 56),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads[4]);
EXPECT_EQ(c4.transfer_id, 11u);
ASSERT_TRUE(c4.status.has_value());
EXPECT_EQ(c4.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
constexpr chrono::SystemClock::duration kTestTimeout =
std::chrono::milliseconds(50);
constexpr chrono::SystemClock::duration kWaitForTimeout =
kTestTimeout + std::chrono::milliseconds(10);
constexpr uint8_t kTestRetries = 3;
TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(
12,
writer,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 12u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
// Wait for the timeout to expire without doing anything. The client should
// resend its parameters chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.transfer_id, 12u);
EXPECT_EQ(c.offset, 0u);
EXPECT_EQ(c.pending_bytes.value(), 64u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
// Finish the transfer following the timeout.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 12u,
.offset = 0,
.data = kData32,
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 3u);
Chunk c4 = DecodeChunk(payloads.back());
EXPECT_EQ(c4.transfer_id, 12u);
ASSERT_TRUE(c4.status.has_value());
EXPECT_EQ(c4.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(
13,
writer,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 13u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
constexpr ConstByteSpan data(kData32);
// Send some data, but not everything.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)}));
ASSERT_EQ(payloads.size(), 1u);
// Wait for the timeout to expire without sending more data. The client should
// send an updated parameters chunk, accounting for the data already received.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.transfer_id, 13u);
EXPECT_EQ(c.offset, 16u);
EXPECT_EQ(c.pending_bytes.value(), 48u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the rest of the data, finishing the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk({.transfer_id = 13u,
.offset = 16,
.data = data.subspan(16),
.remaining_bytes = 0}));
ASSERT_EQ(payloads.size(), 3u);
Chunk c4 = DecodeChunk(payloads.back());
EXPECT_EQ(c4.transfer_id, 13u);
ASSERT_TRUE(c4.status.has_value());
EXPECT_EQ(c4.status.value(), OkStatus());
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();
client_.Read(
14,
writer,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// First transfer parameters chunk is sent.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 14u);
EXPECT_EQ(c0.offset, 0u);
EXPECT_EQ(c0.pending_bytes.value(), 64u);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
// resend its parameters chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), retry + 1);
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.transfer_id, 14u);
EXPECT_EQ(c.offset, 0u);
EXPECT_EQ(c.pending_bytes.value(), 64u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
// Sleep one more time after the final retry. The client should cancel the
// transfer at this point and send a DEADLINE_EXCEEDED chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads.back());
EXPECT_EQ(c4.transfer_id, 14u);
ASSERT_TRUE(c4.status.has_value());
EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
// After finishing the transfer, nothing else should be sent. Verify this by
// waiting for a bit.
this_thread::sleep_for(kTestTimeout * 4);
ASSERT_EQ(payloads.size(), 5u);
}
class WriteTransfer : public ::testing::Test {
protected:
WriteTransfer()
: client_(context_.client(),
context_.channel().id(),
work_queue_,
data_buffer_),
work_queue_thread_(WorkQueueThreadOptions(), work_queue_) {}
~WriteTransfer() {
work_queue_.RequestStop();
work_queue_thread_.join();
}
rpc::RawClientTestContext<> context_;
Client client_;
std::array<std::byte, 64> data_buffer_;
work_queue::WorkQueueWithBuffer<4> work_queue_;
thread::Thread work_queue_thread_;
};
TEST_F(WriteTransfer, SingleChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(3, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 3u);
// Send transfer parameters.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 3,
.pending_bytes = 64,
.max_chunk_size_bytes = 32,
.offset = 0}));
// Client should send a data chunk and the final chunk.
ASSERT_EQ(payloads.size(), 3u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 3u);
EXPECT_EQ(c1.offset, 0u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 3u);
ASSERT_TRUE(c2.remaining_bytes.has_value());
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
EXPECT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(WriteTransfer, MultiChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(4, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 4u);
// Send transfer parameters with a chunk size smaller than the data.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 4,
.pending_bytes = 64,
.max_chunk_size_bytes = 16,
.offset = 0}));
// Client should send two data chunks and the final chunk.
ASSERT_EQ(payloads.size(), 4u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 4u);
EXPECT_EQ(c1.offset, 0u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 4u);
EXPECT_EQ(c2.offset, 16u);
EXPECT_EQ(
std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()),
0);
Chunk c3 = DecodeChunk(payloads[3]);
EXPECT_EQ(c3.transfer_id, 4u);
ASSERT_TRUE(c3.remaining_bytes.has_value());
EXPECT_EQ(c3.remaining_bytes.value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 4, .status = OkStatus()}));
EXPECT_EQ(payloads.size(), 4u);
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(5, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 5u);
// Send transfer parameters with a nonzero offset, requesting a seek.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 5,
.pending_bytes = 64,
.max_chunk_size_bytes = 32,
.offset = 16}));
// Client should send a data chunk and the final chunk.
ASSERT_EQ(payloads.size(), 3u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 5u);
EXPECT_EQ(c1.offset, 16u);
EXPECT_EQ(
std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()),
0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 5u);
ASSERT_TRUE(c2.remaining_bytes.has_value());
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
EXPECT_EQ(transfer_status, Status::Unknown());
// Send the final status chunk to complete the transfer.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 5, .status = OkStatus()}));
EXPECT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, OkStatus());
}
class FakeNonSeekableReader final : public stream::NonSeekableReader {
public:
FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
private:
StatusWithSize DoRead(ByteSpan out) final {
if (position_ == data_.size()) {
return StatusWithSize::OutOfRange();
}
size_t to_copy = std::min(out.size(), data_.size() - position_);
std::memcpy(out.data(), data_.data() + position_, to_copy);
position_ += to_copy;
return StatusWithSize(to_copy);
}
ConstByteSpan data_;
size_t position_;
};
TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
FakeNonSeekableReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(6, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 6u);
// Send transfer parameters with a nonzero offset, requesting a seek.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 6,
.pending_bytes = 64,
.max_chunk_size_bytes = 32,
.offset = 16}));
// Client should send a status chunk and end the transfer.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 6u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), Status::Unimplemented());
EXPECT_EQ(transfer_status, Status::Unimplemented());
}
TEST_F(WriteTransfer, ServerError) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(7, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 7u);
// Send an error from the server.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 7, .status = Status::NotFound()}));
// Client should not respond and terminate the transfer.
EXPECT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::NotFound());
}
TEST_F(WriteTransfer, MalformedParametersChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(8, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 8u);
// Send an invalid transfer parameters chunk without pending_bytes.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32}));
// Client should send a status chunk and end the transfer.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 8u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), Status::InvalidArgument());
EXPECT_EQ(transfer_status, Status::InvalidArgument());
}
TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(9, reader, [&transfer_status](Status status) {
transfer_status = status;
});
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.transfer_id, 9u);
// Send an invalid transfer parameters chunk with 0 pending_bytes.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
{.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32}));
// Client should send a status chunk and end the transfer.
ASSERT_EQ(payloads.size(), 2u);
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 9u);
ASSERT_TRUE(c1.status.has_value());
EXPECT_EQ(c1.status.value(), Status::Internal());
EXPECT_EQ(transfer_status, Status::Internal());
}
TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(
10,
reader,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 10u);
// Wait for the timeout to expire without doing anything. The client should
// resend the initial transmit chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 2u);
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.transfer_id, 10u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(
11,
reader,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 11u);
// Send the first parameters chunk.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 11,
.pending_bytes = 16,
.max_chunk_size_bytes = 8,
.offset = 0}));
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 11u);
EXPECT_EQ(c1.offset, 0u);
EXPECT_EQ(c1.data.size(), 8u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 11u);
EXPECT_EQ(c2.offset, 8u);
EXPECT_EQ(c2.data.size(), 8u);
EXPECT_EQ(
std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
0);
// Wait for the timeout to expire without doing anything. The client should
// resend the most recently sent chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
EXPECT_EQ(c3.transfer_id, c2.transfer_id);
EXPECT_EQ(c3.offset, c2.offset);
EXPECT_EQ(c3.data.size(), c2.data.size());
EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(
12,
reader,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 12u);
// Send the first parameters chunk, requesting all the data. The client should
// respond with one data chunk and a remaining_bytes = 0 chunk.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 12,
.pending_bytes = 64,
.max_chunk_size_bytes = 64,
.offset = 0}));
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 12u);
EXPECT_EQ(c1.offset, 0u);
EXPECT_EQ(c1.data.size(), 32u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 12u);
ASSERT_TRUE(c2.remaining_bytes.has_value());
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
// Wait for the timeout to expire without doing anything. The client should
// resend the data chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
EXPECT_EQ(c3.transfer_id, c1.transfer_id);
EXPECT_EQ(c3.offset, c1.offset);
EXPECT_EQ(c3.data.size(), c1.data.size());
EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0);
// The remaining_bytes = 0 chunk should be resent on the next parameters.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 12,
.pending_bytes = 64,
.max_chunk_size_bytes = 64,
.offset = 32}));
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads[4]);
EXPECT_EQ(c4.transfer_id, 12u);
ASSERT_TRUE(c4.remaining_bytes.has_value());
EXPECT_EQ(c4.remaining_bytes.value(), 0u);
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 12, .status = OkStatus()}));
EXPECT_EQ(transfer_status, OkStatus());
}
TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(
13,
reader,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 13u);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
// resend the initial transmit chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), retry + 1);
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.transfer_id, 13u);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
}
// Sleep one more time after the final retry. The client should cancel the
// transfer at this point and send a DEADLINE_EXCEEDED chunk.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 5u);
Chunk c4 = DecodeChunk(payloads.back());
EXPECT_EQ(c4.transfer_id, 13u);
ASSERT_TRUE(c4.status.has_value());
EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
// After finishing the transfer, nothing else should be sent. Verify this by
// waiting for a bit.
this_thread::sleep_for(kTestTimeout * 4);
ASSERT_EQ(payloads.size(), 5u);
}
TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
FakeNonSeekableReader reader(kData32);
Status transfer_status = Status::Unknown();
client_.Write(
14,
reader,
[&transfer_status](Status status) { transfer_status = status; },
kTestTimeout);
// The client begins by just sending the transfer ID.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.transfer_id, 14u);
// Send the first parameters chunk.
context_.server().SendServerStream<Transfer::Write>(
EncodeChunk({.transfer_id = 14,
.pending_bytes = 16,
.max_chunk_size_bytes = 8,
.offset = 0}));
ASSERT_EQ(payloads.size(), 3u);
EXPECT_EQ(transfer_status, Status::Unknown());
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.transfer_id, 14u);
EXPECT_EQ(c1.offset, 0u);
EXPECT_EQ(c1.data.size(), 8u);
EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
Chunk c2 = DecodeChunk(payloads[2]);
EXPECT_EQ(c2.transfer_id, 14u);
EXPECT_EQ(c2.offset, 8u);
EXPECT_EQ(c2.data.size(), 8u);
EXPECT_EQ(
std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
0);
// Wait for the timeout to expire without doing anything. The client should
// fail to seek back and end the transfer.
this_thread::sleep_for(kWaitForTimeout);
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
EXPECT_EQ(c3.transfer_id, 14u);
ASSERT_TRUE(c3.status.has_value());
EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded());
EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
}
} // namespace
} // namespace pw::transfer::test