// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#include "pw_transfer/transfer.h"

#include "gtest/gtest.h"
#include "pw_assert/check.h"
#include "pw_bytes/array.h"
#include "pw_containers/algorithm.h"
#include "pw_rpc/raw/test_method_context.h"
#include "pw_rpc/test_helpers.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer_private/chunk_testing.h"

namespace pw::transfer::test {
namespace {

using namespace std::chrono_literals;

// TODO(frolv): Have a generic way to obtain a thread for testing on any system.
thread::Options& TransferThreadOptions() {
  static thread::stl::Options options;
  return options;
}

using internal::Chunk;
using internal::ProtocolVersion;

class TestMemoryReader : public stream::SeekableReader {
 public:
  constexpr TestMemoryReader(std::span<const std::byte> data)
      : memory_reader_(data) {}

  Status DoSeek(ptrdiff_t offset, Whence origin) override {
    if (seek_status.ok()) {
      return memory_reader_.Seek(offset, origin);
    }
    return seek_status;
  }

  StatusWithSize DoRead(ByteSpan dest) final {
    if (!read_status.ok()) {
      return StatusWithSize(read_status, 0);
    }

    auto result = memory_reader_.Read(dest);
    return result.ok() ? StatusWithSize(result->size())
                       : StatusWithSize(result.status(), 0);
  }

  Status seek_status;
  Status read_status;

 private:
  stream::MemoryReader memory_reader_;
};

class SimpleReadTransfer final : public ReadOnlyHandler {
 public:
  SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
      : ReadOnlyHandler(session_id),
        prepare_read_called(false),
        finalize_read_called(false),
        finalize_read_status(Status::Unknown()),
        reader_(data) {}

  Status PrepareRead() final {
    prepare_read_called = true;

    if (!prepare_read_return_status.ok()) {
      return prepare_read_return_status;
    }

    EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
    set_reader(reader_);
    return OkStatus();
  }

  void FinalizeRead(Status status) final {
    finalize_read_called = true;
    finalize_read_status = status;
  }

  void set_seek_status(Status status) { reader_.seek_status = status; }
  void set_read_status(Status status) { reader_.read_status = status; }

  bool prepare_read_called;
  bool finalize_read_called;
  Status prepare_read_return_status;
  Status finalize_read_status;

 private:
  TestMemoryReader reader_;
};

constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });

class ReadTransfer : public ::testing::Test {
 protected:
  ReadTransfer(size_t max_chunk_size_bytes = 64)
      : handler_(3, kData),
        transfer_thread_(std::span(data_buffer_).first(max_chunk_size_bytes),
                         encode_buffer_),
        ctx_(transfer_thread_, 64),
        system_thread_(TransferThreadOptions(), transfer_thread_) {
    ctx_.service().RegisterHandler(handler_);

    PW_CHECK(!handler_.prepare_read_called);
    PW_CHECK(!handler_.finalize_read_called);

    ctx_.call();  // Open the read stream
    transfer_thread_.WaitUntilEventIsProcessed();
  }

  ~ReadTransfer() {
    transfer_thread_.Terminate();
    system_thread_.join();
  }

  SimpleReadTransfer handler_;
  Thread<1, 1> transfer_thread_;
  PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
  thread::Thread system_thread_;
  std::array<std::byte, 64> data_buffer_;
  std::array<std::byte, 64> encode_buffer_;
};

TEST_F(ReadTransfer, SingleChunk) {
  rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(3)
                        .set_window_end_offset(64)
                        .set_offset(0)));

    transfer_thread_.WaitUntilEventIsProcessed();
  });

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk c0 = DecodeChunk(ctx_.responses()[0]);
  Chunk c1 = DecodeChunk(ctx_.responses()[1]);

  // First chunk should have all the read data.
  EXPECT_EQ(c0.session_id(), 3u);
  EXPECT_EQ(c0.offset(), 0u);
  ASSERT_EQ(c0.payload().size(), kData.size());
  EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
            0);

  // Second chunk should be empty and set remaining_bytes = 0.
  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_FALSE(c1.has_payload());
  ASSERT_TRUE(c1.remaining_bytes().has_value());
  EXPECT_EQ(c1.remaining_bytes().value(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, MultiChunk) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));

  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk c0 = DecodeChunk(ctx_.responses().back());

  EXPECT_EQ(c0.session_id(), 3u);
  EXPECT_EQ(c0.offset(), 0u);
  ASSERT_EQ(c0.payload().size(), 16u);
  EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
            0);

  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
          .set_session_id(3)
          .set_window_end_offset(32)
          .set_offset(16)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk c1 = DecodeChunk(ctx_.responses().back());

  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_EQ(c1.offset(), 16u);
  ASSERT_EQ(c1.payload().size(), 16u);
  EXPECT_EQ(
      std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
      0);

  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
          .set_session_id(3)
          .set_window_end_offset(48)
          .set_offset(32)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  Chunk c2 = DecodeChunk(ctx_.responses()[2]);

  EXPECT_EQ(c2.session_id(), 3u);
  EXPECT_FALSE(c2.has_payload());
  ASSERT_TRUE(c2.remaining_bytes().has_value());
  EXPECT_EQ(c2.remaining_bytes().value(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));

  transfer_thread_.WaitUntilEventIsProcessed();

  const auto continue_chunk = EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
          .set_session_id(3)
          .set_window_end_offset(24)
          .set_offset(16));
  ctx_.SendClientStream(continue_chunk);

  transfer_thread_.WaitUntilEventIsProcessed();

  // Resend the CONTINUE packets that don't actually advance the window.
  for (int i = 0; i < 3; ++i) {
    ctx_.SendClientStream(continue_chunk);
    transfer_thread_.WaitUntilEventIsProcessed();
  }

  ASSERT_EQ(ctx_.total_responses(), 2u);  // Only sent one packet
  Chunk c1 = DecodeChunk(ctx_.responses()[1]);

  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_EQ(c1.offset(), 16u);
  ASSERT_EQ(c1.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
      0);
}

TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
  rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(3)
                        .set_window_end_offset(16)
                        .set_offset(0)));

    transfer_thread_.WaitUntilEventIsProcessed();

    Chunk chunk = DecodeChunk(ctx_.responses().back());
    EXPECT_TRUE(
        pw::containers::Equal(std::span(kData).first(16), chunk.payload()));

    ctx_.SendClientStream(EncodeChunk(
        Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
            .set_session_id(3)
            .set_window_end_offset(10)
            .set_offset(2)));

    transfer_thread_.WaitUntilEventIsProcessed();

    chunk = DecodeChunk(ctx_.responses().back());
    EXPECT_TRUE(
        pw::containers::Equal(std::span(kData).subspan(2, 8), chunk.payload()));

    ctx_.SendClientStream(EncodeChunk(
        Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
            .set_session_id(3)
            .set_window_end_offset(64)
            .set_offset(17)));
  });

  ASSERT_EQ(ctx_.total_responses(), 4u);
  Chunk chunk = DecodeChunk(ctx_.responses()[2]);
  EXPECT_TRUE(pw::containers::Equal(std::span(&kData[17], kData.end()),
                                    chunk.payload()));
}

TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
  handler_.set_seek_status(Status::Unimplemented());

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));
  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
          .set_session_id(3)
          .set_window_end_offset(10)
          .set_offset(2)));

  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
}

TEST_F(ReadTransfer, MaxChunkSize_Client) {
  rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(3)
                        .set_window_end_offset(64)
                        .set_max_chunk_size_bytes(8)
                        .set_offset(0)));
  });

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);

  ASSERT_EQ(ctx_.total_responses(), 5u);
  Chunk c0 = DecodeChunk(ctx_.responses()[0]);
  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
  Chunk c3 = DecodeChunk(ctx_.responses()[3]);
  Chunk c4 = DecodeChunk(ctx_.responses()[4]);

  EXPECT_EQ(c0.session_id(), 3u);
  EXPECT_EQ(c0.offset(), 0u);
  ASSERT_EQ(c0.payload().size(), 8u);
  EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
            0);

  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_EQ(c1.offset(), 8u);
  ASSERT_EQ(c1.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
      0);

  EXPECT_EQ(c2.session_id(), 3u);
  EXPECT_EQ(c2.offset(), 16u);
  ASSERT_EQ(c2.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
      0);

  EXPECT_EQ(c3.session_id(), 3u);
  EXPECT_EQ(c3.offset(), 24u);
  ASSERT_EQ(c3.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
      0);

  EXPECT_EQ(c4.session_id(), 3u);
  EXPECT_EQ(c4.payload().size(), 0u);
  ASSERT_TRUE(c4.remaining_bytes().has_value());
  EXPECT_EQ(c4.remaining_bytes().value(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(64)
                      .set_offset(0)));
  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  ASSERT_TRUE(handler_.prepare_read_called);
  ASSERT_TRUE(handler_.finalize_read_called);
  ASSERT_EQ(OkStatus(), handler_.finalize_read_status);

  // Now, clear state and start a second transfer
  handler_.prepare_read_return_status = Status::FailedPrecondition();
  handler_.prepare_read_called = false;
  handler_.finalize_read_called = false;

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(64)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  // Prepare failed, so the handler should not have been stored in the context,
  // and finalize should not have been called.
  ASSERT_TRUE(handler_.prepare_read_called);
  ASSERT_FALSE(handler_.finalize_read_called);
}

class ReadTransferMaxChunkSize8 : public ReadTransfer {
 protected:
  ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
};

TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
  // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
  // TODO(frolv): Fix
  rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(3)
                        .set_window_end_offset(64)
                        // .set_max_chunk_size_bytes(16)
                        .set_offset(0)));
  });

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);

  ASSERT_EQ(ctx_.total_responses(), 5u);
  Chunk c0 = DecodeChunk(ctx_.responses()[0]);
  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
  Chunk c3 = DecodeChunk(ctx_.responses()[3]);
  Chunk c4 = DecodeChunk(ctx_.responses()[4]);

  EXPECT_EQ(c0.session_id(), 3u);
  EXPECT_EQ(c0.offset(), 0u);
  ASSERT_EQ(c0.payload().size(), 8u);
  EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
            0);

  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_EQ(c1.offset(), 8u);
  ASSERT_EQ(c1.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
      0);

  EXPECT_EQ(c2.session_id(), 3u);
  EXPECT_EQ(c2.offset(), 16u);
  ASSERT_EQ(c2.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
      0);

  EXPECT_EQ(c3.session_id(), 3u);
  EXPECT_EQ(c3.offset(), 24u);
  ASSERT_EQ(c3.payload().size(), 8u);
  EXPECT_EQ(
      std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
      0);

  EXPECT_EQ(c4.session_id(), 3u);
  EXPECT_EQ(c4.payload().size(), 0u);
  ASSERT_TRUE(c4.remaining_bytes().has_value());
  EXPECT_EQ(c4.remaining_bytes().value(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, ClientError) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));

  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);
  ASSERT_EQ(ctx_.total_responses(), 1u);

  // Send client error.
  ctx_.SendClientStream(EncodeChunk(
      Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
}

TEST_F(ReadTransfer, UnregisteredHandler) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(11)
                      .set_window_end_offset(32)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 11u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::NotFound());
}

TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
          .set_session_id(3)
          .set_window_end_offset(32)
          .set_offset(3)));
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(3)
                      .set_payload(std::span(kData).first(10))
                      .set_offset(3)));
  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  // Only start transfer for an initial packet.
  EXPECT_FALSE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);
}

TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);
  handler_.prepare_read_called = false;  // Reset so can check if called again.

  ctx_.SendClientStream(  // Resend starting chunk
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(16)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
  handler_.finalize_read_called = false;  // Reset so can check later

  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
          .set_session_id(3)
          .set_window_end_offset(32)
          .set_offset(16)));
  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(0)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  ASSERT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());

  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
}

TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
  // Make the next read appear to be the end of the stream.
  handler_.set_read_status(Status::OutOfRange());

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(3)
                      .set_window_end_offset(0)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 3u);
  EXPECT_EQ(chunk.remaining_bytes(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  ASSERT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}

TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
  rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(3)
                        .set_window_end_offset(64)
                        .set_offset(0)));
  });

  EXPECT_TRUE(handler_.prepare_read_called);
  EXPECT_FALSE(handler_.finalize_read_called);

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk c0 = DecodeChunk(ctx_.responses()[0]);
  Chunk c1 = DecodeChunk(ctx_.responses()[1]);

  // First chunk should have all the read data.
  EXPECT_EQ(c0.session_id(), 3u);
  EXPECT_EQ(c0.offset(), 0u);
  ASSERT_EQ(c0.payload().size(), kData.size());
  EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
            0);

  // Second chunk should be empty and set remaining_bytes = 0.
  EXPECT_EQ(c1.session_id(), 3u);
  EXPECT_FALSE(c1.has_payload());
  ASSERT_TRUE(c1.remaining_bytes().has_value());
  EXPECT_EQ(c1.remaining_bytes().value(), 0u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_read_called);
  EXPECT_EQ(handler_.finalize_read_status, OkStatus());

  // At this point the transfer should be in a completed state. Send a
  // non-initial chunk as a continuation of the transfer.
  handler_.finalize_read_called = false;

  ctx_.SendClientStream(EncodeChunk(
      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
          .set_session_id(3)
          .set_window_end_offset(64)
          .set_offset(16)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);

  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
  ASSERT_TRUE(c2.status().has_value());
  EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());

  // FinalizeRead should not be called again.
  EXPECT_FALSE(handler_.finalize_read_called);
}

class SimpleWriteTransfer final : public WriteOnlyHandler {
 public:
  SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
      : WriteOnlyHandler(session_id),
        prepare_write_called(false),
        finalize_write_called(false),
        finalize_write_status(Status::Unknown()),
        writer_(data) {}

  Status PrepareWrite() final {
    EXPECT_EQ(OkStatus(), writer_.Seek(0));
    set_writer(writer_);
    prepare_write_called = true;
    return OkStatus();
  }

  Status FinalizeWrite(Status status) final {
    finalize_write_called = true;
    finalize_write_status = status;
    return finalize_write_return_status_;
  }

  void set_finalize_write_return(Status status) {
    finalize_write_return_status_ = status;
  }

  bool prepare_write_called;
  bool finalize_write_called;
  Status finalize_write_status;

 private:
  Status finalize_write_return_status_;
  stream::MemoryWriter writer_;
};

class WriteTransfer : public ::testing::Test {
 protected:
  WriteTransfer(size_t max_bytes_to_receive = 64)
      : buffer{},
        handler_(7, buffer),
        transfer_thread_(data_buffer_, encode_buffer_),
        system_thread_(TransferThreadOptions(), transfer_thread_),
        ctx_(transfer_thread_,
             max_bytes_to_receive,
             // Use a long timeout to avoid accidentally triggering timeouts.
             std::chrono::minutes(1)) {
    ctx_.service().RegisterHandler(handler_);

    PW_CHECK(!handler_.prepare_write_called);
    PW_CHECK(!handler_.finalize_write_called);

    ctx_.call();  // Open the write stream
    transfer_thread_.WaitUntilEventIsProcessed();
  }

  ~WriteTransfer() {
    transfer_thread_.Terminate();
    system_thread_.join();
  }

  std::array<std::byte, kData.size()> buffer;
  SimpleWriteTransfer handler_;

  Thread<1, 1> transfer_thread_;
  thread::Thread system_thread_;
  std::array<std::byte, 64> data_buffer_;
  std::array<std::byte, 64> encode_buffer_;
  PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
};

TEST_F(WriteTransfer, SingleChunk) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);
  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

TEST_F(WriteTransfer, FinalizeFails) {
  // Return an error when FinalizeWrite is called.
  handler_.set_finalize_write_return(Status::FailedPrecondition());

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk chunk = DecodeChunk(ctx_.responses()[1]);
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::DataLoss());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}

TEST_F(WriteTransfer, SendingFinalPacketFails) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ctx_.output().set_send_status(Status::Unknown());

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  // Should only have sent the transfer parameters.
  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses()[0]);
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);
  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);

  // When FinalizeWrite() was called, the transfer was considered successful.
  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}

TEST_F(WriteTransfer, MultiChunk) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses()[0]);
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(8)
                      .set_payload(std::span(kData).subspan(8))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

TEST_F(WriteTransfer, WriteFailsOnRetry) {
  // Skip one packet to fail on a retry.
  ctx_.output().set_send_status(Status::FailedPrecondition(), 1);

  // Wait for 3 packets: initial params, retry attempt, final error
  rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
    // Send only one client packet so the service times out.
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(7)));
    transfer_thread_.SimulateServerTimeout(7);  // Time out to trigger retry
  });

  // Attempted to send 3 packets, but the 2nd packet was dropped.
  // Check that the last packet is an INTERNAL error from the RPC write failure.
  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk chunk = DecodeChunk(ctx_.responses()[1]);
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::Internal());
}

TEST_F(WriteTransfer, TimeoutInRecoveryState) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 0u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  constexpr std::span data(kData);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(data.first(8))));

  // Skip offset 8 to enter a recovery state.
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(12)
                      .set_payload(data.subspan(12, 4))));
  transfer_thread_.WaitUntilEventIsProcessed();

  // Recovery parameters should be sent for offset 8.
  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 8u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  // Timeout while in the recovery state.
  transfer_thread_.SimulateServerTimeout(7);
  transfer_thread_.WaitUntilEventIsProcessed();

  // Same recovery parameters should be re-sent.
  ASSERT_EQ(ctx_.total_responses(), 3u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 8u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);
}

TEST_F(WriteTransfer, ExtendWindow) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  // Window starts at 32 bytes and should extend when half of that is sent.
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(4))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(4)
                      .set_payload(std::span(kData).subspan(4, 4))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(8)
                      .set_payload(std::span(kData).subspan(8, 4))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(12)
                      .set_payload(std::span(kData).subspan(12, 4))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 2u);

  // Extend parameters chunk.
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);
  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(16)
                      .set_payload(std::span(kData).subspan(16))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  chunk = DecodeChunk(ctx_.responses()[2]);
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

class WriteTransferMaxBytes16 : public WriteTransfer {
 protected:
  WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
};

TEST_F(WriteTransfer, TransmitterReducesWindow) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  // Send only 12 bytes and set that as the new end offset.
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_window_end_offset(12)
                      .set_payload(std::span(kData).first(12))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 2u);

  // Receiver should respond immediately with a retransmit chunk as the end of
  // the window has been reached.
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 12u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);
  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
}

TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      // Larger window end offset than the receiver's.
                      .set_window_end_offset(48)
                      .set_payload(std::span(kData).first(16))));
  transfer_thread_.WaitUntilEventIsProcessed();
  ASSERT_EQ(ctx_.total_responses(), 2u);

  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::Internal());
}

TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 16u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 8u);
  EXPECT_EQ(chunk.window_end_offset(), 24u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(8)
                      .set_payload(std::span(kData).subspan(8, 8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 16u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(16)
                      .set_payload(std::span(kData).subspan(16, 8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 4u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 24u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(24)
                      .set_payload(std::span(kData).subspan(24))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 5u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
  // Default max bytes is smaller than buffer.
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 16u);
}

TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
  // Buffer is smaller than constructor's default max bytes.
  std::array<std::byte, 8> small_buffer = {};

  SimpleWriteTransfer handler_(987, small_buffer);
  ctx_.service().RegisterHandler(handler_);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(987)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 987u);
  EXPECT_EQ(chunk.window_end_offset(), 8u);

  ctx_.service().UnregisterHandler(handler_);
}

TEST_F(WriteTransfer, UnexpectedOffset) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 0u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(4)  // incorrect
                      .set_payload(std::span(kData).subspan(8))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 8u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(8)  // correct
                      .set_payload(std::span(kData).subspan(8))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

TEST_F(WriteTransferMaxBytes16, TooMuchData) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 16u);

  // window_end_offset = 16, but send 24 bytes of data.
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(24))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::Internal());
}

TEST_F(WriteTransfer, UnregisteredHandler) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(999)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 999u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::NotFound());
}

TEST_F(WriteTransfer, ClientError) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  ctx_.SendClientStream<64>(EncodeChunk(
      Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_EQ(ctx_.total_responses(), 1u);

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
}

TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  constexpr std::span data(kData);
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(data.first(1))));

  // Drop offset 1, then send the rest of the data.
  for (uint32_t i = 2; i < kData.size(); ++i) {
    ctx_.SendClientStream<64>(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                        .set_session_id(7)
                        .set_offset(i)
                        .set_payload(data.subspan(i, 1))));
  }

  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 1u);

  // Send the remaining data.
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(1)
                      .set_payload(data.subspan(1, 31))
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}

TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  constexpr std::span data(kData);

  // Skip offset 0, then send the rest of the data.
  for (uint32_t i = 1; i < kData.size(); ++i) {
    ctx_.SendClientStream<64>(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                        .set_session_id(7)
                        .set_offset(i)
                        .set_payload(data.subspan(i, 1))));
  }

  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);  // Resent transfer parameters once.

  const auto last_chunk =
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(kData.size() - 1)
                      .set_payload(data.last(1)));
  ctx_.SendClientStream<64>(last_chunk);
  transfer_thread_.WaitUntilEventIsProcessed();

  // Resent transfer parameters since the packet is repeated
  ASSERT_EQ(ctx_.total_responses(), 3u);

  ctx_.SendClientStream<64>(last_chunk);
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 4u);

  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 0u);
  EXPECT_EQ(chunk.window_end_offset(), 32u);

  // Resumes normal operation when correct offset is sent.
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}

TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 2u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);
  chunk = DecodeChunk(ctx_.responses().back());
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), OkStatus());
}

TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(3)));
  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(10))
                      .set_remaining_bytes(0)));

  transfer_thread_.WaitUntilEventIsProcessed();

  // Only start transfer for initial packet.
  EXPECT_FALSE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);
}

TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);

  ASSERT_TRUE(handler_.prepare_write_called);
  ASSERT_FALSE(handler_.finalize_write_called);
  handler_.prepare_write_called = false;  // Reset to check it's called again.

  // Simulate client disappearing then restarting the transfer.
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());

  handler_.finalize_write_called = false;  // Reset to check it's called again.

  ASSERT_EQ(ctx_.total_responses(), 2u);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(kData)
                      .set_remaining_bytes(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 3u);

  EXPECT_TRUE(handler_.finalize_write_called);
  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
 public:
  SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
      : ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}

  Status PrepareRead() final {
    if ((call_count_++ % 2) == 0) {
      return Status::Unavailable();
    }

    set_reader(reader_);
    return OkStatus();
  }

 private:
  stream::MemoryReader reader_;
  int call_count_;
};

TEST_F(ReadTransfer, PrepareError) {
  SometimesUnavailableReadHandler unavailable_handler(88, kData);
  ctx_.service().RegisterHandler(unavailable_handler);

  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(88)
                      .set_window_end_offset(128)
                      .set_offset(0)));
  transfer_thread_.WaitUntilEventIsProcessed();

  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 88u);
  ASSERT_TRUE(chunk.status().has_value());
  EXPECT_EQ(chunk.status().value(), Status::DataLoss());

  // Try starting the transfer again. It should work this time.
  // TODO(frolv): This won't work until completion ACKs are supported.
  if (false) {
    ctx_.SendClientStream(
        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                        .set_session_id(88)
                        .set_window_end_offset(128)
                        .set_offset(0)));
    transfer_thread_.WaitUntilEventIsProcessed();

    ASSERT_EQ(ctx_.total_responses(), 2u);
    chunk = DecodeChunk(ctx_.responses().back());
    EXPECT_EQ(chunk.session_id(), 88u);
    ASSERT_EQ(chunk.payload().size(), kData.size());
    EXPECT_EQ(std::memcmp(
                  chunk.payload().data(), kData.data(), chunk.payload().size()),
              0);
  }
}

TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
  ctx_.SendClientStream(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
                      .set_session_id(7)));
  transfer_thread_.WaitUntilEventIsProcessed();

  EXPECT_TRUE(handler_.prepare_write_called);
  EXPECT_FALSE(handler_.finalize_write_called);

  // First parameters chunk has the default window end offset of 16.
  ASSERT_EQ(ctx_.total_responses(), 1u);
  Chunk chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.window_end_offset(), 16u);

  // Update the pending bytes value.
  ctx_.service().set_max_pending_bytes(12);

  ctx_.SendClientStream<64>(
      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
                      .set_session_id(7)
                      .set_offset(0)
                      .set_payload(std::span(kData).first(8))));
  transfer_thread_.WaitUntilEventIsProcessed();

  // Second parameters chunk should use the new max pending bytes.
  ASSERT_EQ(ctx_.total_responses(), 2u);
  chunk = DecodeChunk(ctx_.responses().back());
  EXPECT_EQ(chunk.session_id(), 7u);
  EXPECT_EQ(chunk.offset(), 8u);
  EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
}

}  // namespace
}  // namespace pw::transfer::test
