pw_transfer: Version 2 opening handshake in C++
This implements the opening handshake of pw_transfer's version 2
protocol within the C++ transfer client and server. The handshake
consists of protocol version negotiation and ephemeral session ID
assignment.
The protocol version used for a transfer session is controlled by the
client when it starts a new transfer. By default, this still remains the
legacy protocol for now, though the client API is extended to allow
specifying the desired version.
If the START chunk a transfer service receives is configured for version
2, the service will assign a transfer session ID and proceed with the
handshake. Otherwise, it will fall back to the legacy protocol.
The version 2 START chunk sent by a client retains all of the chunk
proto fields set by the legacy protocol, allowing it to be understood
by a server which is not version 2 aware. In such a case, the server
will process the chunk per the legacy protocol and send a non-handshake
response. The client will recognize the legacy response and revert to
running the legacy protocol.
As a result of this, version 2 capable transfer clients and servers
remain fully backwards-compatible with older code that only runs the
legacy protocol.
Change-Id: Ie0a295509e754b963d3a78593ba1c43bbe13c977
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/99500
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
diff --git a/pw_thread/docs.rst b/pw_thread/docs.rst
index 75df4b2..7e38489 100644
--- a/pw_thread/docs.rst
+++ b/pw_thread/docs.rst
@@ -145,6 +145,9 @@
const pw::thread::Id my_id = pw::this_thread::get_id();
}
+
+.. _module-pw_thread-thread-creation:
+
---------------
Thread Creation
---------------
diff --git a/pw_transfer/chunk.cc b/pw_transfer/chunk.cc
index cf1b84c..ca51aca 100644
--- a/pw_transfer/chunk.cc
+++ b/pw_transfer/chunk.cc
@@ -24,30 +24,45 @@
namespace ProtoChunk = transfer::Chunk;
-Result<uint32_t> Chunk::ExtractSessionId(ConstByteSpan message) {
+Result<uint32_t> Chunk::ExtractIdentifier(ConstByteSpan message) {
protobuf::Decoder decoder(message);
uint32_t session_id = 0;
+ uint32_t resource_id = 0;
+
+ // During the initial handshake, a START_ACK chunk sent from the server
+ // to a client identifies its transfer context using a resource_id, as
+ // the client does not yet know its session_id.
+ bool should_use_resource_id = false;
while (decoder.Next().ok()) {
ProtoChunk::Fields field =
static_cast<ProtoChunk::Fields>(decoder.FieldNumber());
if (field == ProtoChunk::Fields::TRANSFER_ID) {
- // Interpret a legacy transfer_id field as a session ID, but don't
- // return immediately. Instead, check to see if the message also
- // contains a newer session_id field.
- PW_TRY(decoder.ReadUint32(&session_id));
-
+ // Interpret a legacy transfer_id field as a session ID if an explicit
+ // session_id field has not already been seen.
+ if (session_id == 0) {
+ PW_TRY(decoder.ReadUint32(&session_id));
+ }
} else if (field == ProtoChunk::Fields::SESSION_ID) {
- // A session_id field always takes precedence over transfer_id, so
- // return it immediately when encountered.
+ // A session_id field always takes precedence over transfer_id.
PW_TRY(decoder.ReadUint32(&session_id));
- return session_id;
+ } else if (field == ProtoChunk::Fields::TYPE) {
+ // Check if the chunk is a START_ACK.
+ uint32_t type;
+ PW_TRY(decoder.ReadUint32(&type));
+ should_use_resource_id = static_cast<Type>(type) == Type::kStartAck;
+ } else if (field == ProtoChunk::Fields::RESOURCE_ID) {
+ PW_TRY(decoder.ReadUint32(&resource_id));
}
}
- if (session_id != 0) {
+ if (should_use_resource_id) {
+ if (resource_id != 0) {
+ return resource_id;
+ }
+ } else if (session_id != 0) {
return session_id;
}
@@ -61,9 +76,9 @@
Chunk chunk;
- // Assume the legacy protocol by default. Field presence in the serialized
- // message may change this.
- chunk.protocol_version_ = ProtocolVersion::kLegacy;
+ // Determine the protocol version of the chunk depending on field presence in
+ // the serialized message.
+ chunk.protocol_version_ = ProtocolVersion::kUnknown;
// Some older versions of the protocol set the deprecated pending_bytes field
// in their chunks. The newer transfer handling code does not process this
@@ -88,8 +103,12 @@
case ProtoChunk::Fields::SESSION_ID:
// The existence of a session_id field indicates that a newer protocol
- // is running.
- chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+ // is running. Update the deduced protocol unless it was explicitly
+ // specified.
+ if (chunk.protocol_version_ == ProtocolVersion::kUnknown) {
+ chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+ }
+
PW_TRY(decoder.ReadUint32(&chunk.session_id_));
break;
@@ -141,16 +160,29 @@
case ProtoChunk::Fields::RESOURCE_ID:
PW_TRY(decoder.ReadUint32(&value));
chunk.set_resource_id(value);
+ break;
- // The existence of a resource_id field indicates that a newer protocol
- // is running.
- chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+ case ProtoChunk::Fields::PROTOCOL_VERSION:
+ // The protocol_version field is added as part of the initial handshake
+ // starting from version 2. If provided, it should override any deduced
+ // protocol version.
+ PW_TRY(decoder.ReadUint32(&value));
+ if (!ValidProtocolVersion(value)) {
+ return Status::DataLoss();
+ }
+ chunk.protocol_version_ = static_cast<ProtocolVersion>(value);
break;
// Silently ignore any unrecognized fields.
}
}
+ if (chunk.protocol_version_ == ProtocolVersion::kUnknown) {
+ // If no fields in the chunk specified its protocol version, assume it is a
+ // legacy chunk.
+ chunk.protocol_version_ = ProtocolVersion::kLegacy;
+ }
+
if (pending_bytes != 0) {
// Compute window_end_offset if it isn't explicitly provided (in older
// protocol versions).
@@ -186,6 +218,13 @@
}
}
+ // During the initial handshake, the chunk's configured protocol version is
+ // explicitly serialized to the wire.
+ if (IsInitialHandshakeChunk()) {
+ encoder.WriteProtocolVersion(static_cast<uint32_t>(protocol_version_))
+ .IgnoreError();
+ }
+
if (type_.has_value()) {
encoder.WriteType(static_cast<ProtoChunk::Type>(type_.value()))
.IgnoreError();
@@ -197,8 +236,13 @@
// Encode additional fields from the legacy protocol.
if (ShouldEncodeLegacyFields()) {
- // The legacy protocol uses the transfer_id field instead of session_id.
- encoder.WriteTransferId(session_id_).IgnoreError();
+ // The legacy protocol uses the transfer_id field instead of session_id or
+ // resource_id.
+ if (resource_id_.has_value()) {
+ encoder.WriteTransferId(resource_id_.value()).IgnoreError();
+ } else {
+ encoder.WriteTransferId(session_id_).IgnoreError();
+ }
// In the legacy protocol, the pending_bytes field must be set alongside
// window_end_offset, as some transfer implementations require it.
@@ -241,11 +285,22 @@
}
if (ShouldEncodeLegacyFields()) {
- size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
- session_id_);
+ if (resource_id_.has_value()) {
+ size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
+ resource_id_.value());
+ } else {
+ size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
+ session_id_);
+ }
}
}
+ if (IsInitialHandshakeChunk()) {
+ size +=
+ protobuf::SizeOfVarintField(ProtoChunk::Fields::PROTOCOL_VERSION,
+ static_cast<uint32_t>(protocol_version_));
+ }
+
if (protocol_version_ >= ProtocolVersion::kVersionTwo) {
if (resource_id_.has_value()) {
size += protobuf::SizeOfVarintField(ProtoChunk::Fields::RESOURCE_ID,
diff --git a/pw_transfer/client.cc b/pw_transfer/client.cc
index c3e2d11..85181f3 100644
--- a/pw_transfer/client.cc
+++ b/pw_transfer/client.cc
@@ -23,8 +23,10 @@
Status Client::Read(uint32_t resource_id,
stream::Writer& output,
CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout) {
- if (on_completion == nullptr) {
+ chrono::SystemClock::duration timeout,
+ ProtocolVersion protocol_version) {
+ if (on_completion == nullptr ||
+ protocol_version == ProtocolVersion::kUnknown) {
return Status::InvalidArgument();
}
@@ -40,10 +42,9 @@
has_read_stream_ = true;
}
- // TODO(frolv): Only send the resource ID. The server should assign a session.
transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
- /*session_id=*/resource_id,
- /*resource_id=*/resource_id,
+ protocol_version,
+ resource_id,
&output,
max_parameters_,
std::move(on_completion),
@@ -55,8 +56,10 @@
Status Client::Write(uint32_t resource_id,
stream::Reader& input,
CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout) {
- if (on_completion == nullptr) {
+ chrono::SystemClock::duration timeout,
+ ProtocolVersion protocol_version) {
+ if (on_completion == nullptr ||
+ protocol_version == ProtocolVersion::kUnknown) {
return Status::InvalidArgument();
}
@@ -72,10 +75,9 @@
has_write_stream_ = true;
}
- // TODO(frolv): Only send the resource ID. The server should assign a session.
transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
- /*session_id=*/resource_id,
- /*resource_id=*/resource_id,
+ protocol_version,
+ resource_id,
&input,
max_parameters_,
std::move(on_completion),
diff --git a/pw_transfer/client_test.cc b/pw_transfer/client_test.cc
index 01db655..074bdda 100644
--- a/pw_transfer/client_test.cc
+++ b/pw_transfer/client_test.cc
@@ -30,7 +30,6 @@
namespace {
using internal::Chunk;
-using internal::ProtocolVersion;
using pw_rpc::raw::Transfer;
using namespace std::chrono_literals;
@@ -90,10 +89,10 @@
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(3)
.set_offset(0)
.set_payload(kData32)
@@ -133,11 +132,11 @@
EXPECT_EQ(c0.session_id(), 4u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(4)
.set_offset(0)
.set_payload(data.first(16))));
@@ -146,7 +145,7 @@
ASSERT_EQ(payloads.size(), 1u);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(4)
.set_offset(16)
.set_payload(data.subspan(16))
@@ -176,7 +175,7 @@
transfer_thread_.WaitUntilEventIsProcessed();
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(3)
.set_offset(0)
.set_payload(kData32)
@@ -193,7 +192,7 @@
transfer_thread_.WaitUntilEventIsProcessed();
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(3)
.set_offset(0)
.set_payload(kData32)
@@ -222,7 +221,7 @@
EXPECT_EQ(c0.session_id(), 5u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 32u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
}
TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
@@ -239,7 +238,7 @@
EXPECT_EQ(c0.session_id(), 5u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 16u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
}
TEST_F(ReadTransferMaxBytes32, MultiParameters) {
@@ -265,7 +264,7 @@
constexpr ConstByteSpan data(kData64);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(6)
.set_offset(0)
.set_payload(data.first(32))));
@@ -281,7 +280,7 @@
ASSERT_EQ(c1.window_end_offset(), 64u);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(6)
.set_offset(32)
.set_payload(data.subspan(32))
@@ -322,7 +321,7 @@
constexpr ConstByteSpan data(kData32);
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(data.first(16))));
@@ -333,7 +332,7 @@
// Send a chunk with an incorrect offset. The client should resend parameters.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(8) // wrong!
.set_payload(data.subspan(16))
@@ -350,7 +349,7 @@
// Send the correct chunk, completing the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(16)
.set_payload(data.subspan(16))
@@ -394,21 +393,21 @@
// pending_bytes == 32
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(8)
.set_offset(0)
.set_payload(data.first(16))));
// pending_bytes == 16
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(8)
.set_offset(16)
.set_payload(data.subspan(16, 8))));
// pending_bytes == 8, send 16 instead.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(8)
.set_offset(24)
.set_payload(data.subspan(24, 16))));
@@ -480,7 +479,7 @@
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(10)
.set_offset(0)
.set_payload(data.first(8))));
@@ -488,7 +487,7 @@
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(10)
.set_offset(offset)
.set_payload(data.subspan(offset, 8))));
@@ -506,7 +505,7 @@
// Send the remaining data to complete the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(10)
.set_offset(8)
.set_payload(data.subspan(8))
@@ -548,7 +547,7 @@
// Send the first 8 bytes of the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(11)
.set_offset(0)
.set_payload(data.first(8))));
@@ -556,7 +555,7 @@
// Skip offset 8, send the rest starting from 16.
for (uint32_t offset = 16; offset < data.size(); offset += 8) {
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(11)
.set_offset(offset)
.set_payload(data.subspan(offset, 8))));
@@ -567,11 +566,10 @@
// dropped packet.
ASSERT_EQ(payloads.size(), 2u);
- const Chunk last_chunk =
- Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
- .set_session_id(11)
- .set_offset(24)
- .set_payload(data.subspan(24));
+ const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(11)
+ .set_offset(24)
+ .set_payload(data.subspan(24));
// Re-send the final chunk of the block.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
@@ -596,7 +594,7 @@
// Finish the transfer normally.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(11)
.set_offset(8)
.set_payload(data.subspan(8))
@@ -639,7 +637,7 @@
EXPECT_EQ(c0.session_id(), 12u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Wait for the timeout to expire without doing anything. The client should
// resend its initial parameters chunk.
@@ -650,14 +648,14 @@
EXPECT_EQ(c.session_id(), 12u);
EXPECT_EQ(c.offset(), 0u);
EXPECT_EQ(c.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
// Finish the transfer following the timeout.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(12)
.set_offset(0)
.set_payload(kData32)
@@ -696,13 +694,13 @@
EXPECT_EQ(c0.session_id(), 13u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
constexpr ConstByteSpan data(kData32);
// Send some data, but not everything.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(13)
.set_offset(0)
.set_payload(data.first(16))));
@@ -726,7 +724,7 @@
// Send the rest of the data, finishing the transfer.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(13)
.set_offset(16)
.set_payload(data.subspan(16))
@@ -765,7 +763,7 @@
EXPECT_EQ(c0.session_id(), 14u);
EXPECT_EQ(c0.offset(), 0u);
EXPECT_EQ(c0.window_end_offset(), 64u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
@@ -789,7 +787,7 @@
Chunk c4 = DecodeChunk(payloads.back());
EXPECT_EQ(c4.session_id(), 14u);
- EXPECT_EQ(c4.type(), Chunk::Type::kTransferCompletion);
+ EXPECT_EQ(c4.type(), Chunk::Type::kCompletion);
ASSERT_TRUE(c4.status().has_value());
EXPECT_EQ(c4.status().value(), Status::DeadlineExceeded());
@@ -842,7 +840,7 @@
// Send some data.
context_.server().SendServerStream<Transfer::Read>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(14)
.set_offset(0)
.set_payload(data.first(16))));
@@ -938,7 +936,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 3u);
EXPECT_EQ(c0.resource_id(), 3u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters. Client should send a data chunk and the final
// chunk.
@@ -995,7 +993,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 4u);
EXPECT_EQ(c0.resource_id(), 4u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a chunk size smaller than the data.
@@ -1062,7 +1060,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 5u);
EXPECT_EQ(c0.resource_id(), 5u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a nonzero offset, requesting a seek.
// Client should send a data chunk and the final chunk.
@@ -1142,7 +1140,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 6u);
EXPECT_EQ(c0.resource_id(), 6u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send transfer parameters with a nonzero offset, requesting a seek.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1158,7 +1156,7 @@
Chunk c1 = DecodeChunk(payloads[1]);
EXPECT_EQ(c1.session_id(), 6u);
- EXPECT_EQ(c1.type(), Chunk::Type::kTransferCompletion);
+ EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
ASSERT_TRUE(c1.status().has_value());
EXPECT_EQ(c1.status().value(), Status::Unimplemented());
@@ -1184,7 +1182,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 7u);
EXPECT_EQ(c0.resource_id(), 7u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send an error from the server.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1215,7 +1213,7 @@
Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 9u);
EXPECT_EQ(c0.resource_id(), 9u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send an invalid transfer parameters chunk with 0 pending bytes.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1258,7 +1256,7 @@
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 10u);
EXPECT_EQ(c0.resource_id(), 10u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Wait for the timeout to expire without doing anything. The client should
// resend the initial transmit chunk.
@@ -1268,7 +1266,7 @@
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.session_id(), 10u);
EXPECT_EQ(c.resource_id(), 10u);
- EXPECT_EQ(c.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
@@ -1299,7 +1297,7 @@
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 11u);
EXPECT_EQ(c0.resource_id(), 11u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
@@ -1372,7 +1370,7 @@
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 12u);
EXPECT_EQ(c0.resource_id(), 12u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk, requesting all the data. The client should
// respond with one data chunk and a remaining_bytes = 0 chunk.
@@ -1456,7 +1454,7 @@
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 13u);
EXPECT_EQ(c0.resource_id(), 13u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
// Wait for the timeout to expire without doing anything. The client should
@@ -1467,7 +1465,7 @@
Chunk c = DecodeChunk(payloads.back());
EXPECT_EQ(c.session_id(), 13u);
EXPECT_EQ(c.resource_id(), 13u);
- EXPECT_EQ(c.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c.type(), Chunk::Type::kStart);
// Transfer has not yet completed.
EXPECT_EQ(transfer_status, Status::Unknown());
@@ -1516,7 +1514,7 @@
Chunk c0 = DecodeChunk(payloads.back());
EXPECT_EQ(c0.session_id(), 14u);
EXPECT_EQ(c0.resource_id(), 14u);
- EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(c0.type(), Chunk::Type::kStart);
// Send the first parameters chunk.
rpc::test::WaitForPackets(context_.output(), 2, [this] {
@@ -1555,6 +1553,7 @@
ASSERT_EQ(payloads.size(), 4u);
Chunk c3 = DecodeChunk(payloads[3]);
+ EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
EXPECT_EQ(c3.session_id(), 14u);
ASSERT_TRUE(c3.status().has_value());
EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
@@ -1583,7 +1582,7 @@
Chunk chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.session_id(), 15u);
EXPECT_EQ(chunk.resource_id(), 15u);
- EXPECT_EQ(chunk.type(), Chunk::Type::kTransferStart);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
client_.CancelTransfer(15);
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1592,11 +1591,725 @@
ASSERT_EQ(payloads.size(), 2u);
chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.session_id(), 15u);
- ASSERT_EQ(chunk.type(), Chunk::Type::kTransferCompletion);
+ ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
EXPECT_EQ(chunk.status().value(), Status::Cancelled());
EXPECT_EQ(transfer_status, Status::Cancelled());
}
+TEST_F(ReadTransfer, Version2_SingleChunk) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Send all the transfer data. Client should accept it and complete the
+ // transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+}
+
+TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads[0]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Instead of a START_ACK to continue the handshake, the server responds with
+ // an immediate data chunk, indicating that it is running the legacy protocol
+ // version. Client should revert to legacy, using the resource_id of 3 as the
+ // session_id, and complete the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Wait for the timeout to expire without doing anything. The client should
+ // resend the initial chunk.
+ transfer_thread_.SimulateClientTimeout(3);
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // This time, the server responds, continuing the handshake and transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(31)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(31)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(33)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION,
+ // additionally containing the initial parameters for the read transfer.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // Wait for the timeout to expire without doing anything. The client should
+ // resend the confirmation chunk.
+ transfer_thread_.SimulateClientTimeout(33);
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds and the transfer should continue normally.
+ context_.server().SendServerStream<Transfer::Read>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(33)
+ .set_offset(0)
+ .set_payload(kData32)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_EQ(transfer_status, OkStatus());
+ EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+ 0);
+}
+
+TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
+ stream::MemoryWriterBuffer<64> writer;
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Read(
+ 3,
+ writer,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Initial chunk of the transfer is sent. This chunk should contain all the
+ // fields from both legacy and version 2 protocols for backwards
+ // compatibility.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Read>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 64u);
+ EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+ // The server responds to the start request with an error.
+ context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
+ ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unauthenticated());
+}
+
+TEST_F(WriteTransfer, Version2_SingleChunk) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(29)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // The server can then begin the data transfer by sending its transfer
+ // parameters. Client should respond with a data chunk and the final chunk.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(29)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 4u);
+
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 29u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 4u);
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Instead of continuing the handshake with a START_ACK, the server
+ // immediately sends parameters, indicating that it only supports the legacy
+ // protocol. Client should switch over to legacy and continue the transfer.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(3)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads[1]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[2]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ // Send the final status chunk to complete the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 3u);
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Time out waiting for a server response. The client should resend the
+ // initial packet.
+ transfer_thread_.SimulateClientTimeout(3);
+ ASSERT_EQ(payloads.size(), 2u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // This time, respond with the correct continuation packet. The transfer
+ // should resume and complete normally.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(31)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(31)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 31u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 5u);
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds with a START_ACK, continuing the version 2 handshake
+ // and assigning a session_id to the transfer.
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+ .set_session_id(33)
+ .set_resource_id(3)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(payloads.size(), 2u);
+
+ // Client should accept the session_id with a START_ACK_CONFIRMATION.
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // Time out waiting for a server response. The client should resend the
+ // initial packet.
+ transfer_thread_.SimulateClientTimeout(33);
+ ASSERT_EQ(payloads.size(), 3u);
+
+ chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_FALSE(chunk.resource_id().has_value());
+
+ // This time, respond with the first transfer parameters chunk. The transfer
+ // should resume and complete normally.
+ rpc::test::WaitForPackets(context_.output(), 2, [this] {
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+ .set_session_id(33)
+ .set_offset(0)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(32)));
+ });
+
+ ASSERT_EQ(payloads.size(), 5u);
+
+ chunk = DecodeChunk(payloads[3]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_TRUE(chunk.has_payload());
+ EXPECT_EQ(std::memcmp(
+ chunk.payload().data(), kData32.data(), chunk.payload().size()),
+ 0);
+
+ chunk = DecodeChunk(payloads[4]);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 33u);
+ ASSERT_TRUE(chunk.remaining_bytes().has_value());
+ EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ context_.server().SendServerStream<Transfer::Write>(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 5u);
+ EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
+ stream::MemoryReader reader(kData32);
+ Status transfer_status = Status::Unknown();
+
+ ASSERT_EQ(OkStatus(),
+ client_.Write(
+ 3,
+ reader,
+ [&transfer_status](Status status) { transfer_status = status; },
+ cfg::kDefaultChunkTimeout,
+ ProtocolVersion::kVersionTwo));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // The client begins by sending the ID of the resource to transfer.
+ rpc::PayloadsView payloads =
+ context_.output().payloads<Transfer::Write>(context_.channel().id());
+ ASSERT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::Unknown());
+
+ Chunk chunk = DecodeChunk(payloads.back());
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.session_id(), 3u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // The server responds to the start request with an error.
+ context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+ Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_EQ(payloads.size(), 1u);
+ EXPECT_EQ(transfer_status, Status::NotFound());
+}
+
} // namespace
} // namespace pw::transfer::test
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index dae00cc..68cd9f4 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -42,6 +42,11 @@
InitiateTransferAsClient();
} else {
StartTransferAsServer(event.new_transfer);
+
+ // TODO(frolv): This should probably be restructured.
+ HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
+ .data = event.new_transfer.raw_chunk_data,
+ .size = event.new_transfer.raw_chunk_size});
}
return;
}
@@ -83,19 +88,39 @@
SetTimeout(chunk_timeout_);
- PW_LOG_INFO("Starting transfer %u", id_for_log());
+ PW_LOG_INFO("Starting transfer for resource %u",
+ static_cast<unsigned>(resource_id_));
+
+ // Receive transfers should prepare their initial parameters to be send in the
+ // initial chunk.
if (type() == TransferType::kReceive) {
- // A receiver begins a new transfer with a parameters chunk telling the
- // transmitter what to send.
- UpdateAndSendTransferParameters(TransmitAction::kBegin);
- } else {
- SendInitialTransmitChunk();
+ UpdateTransferParameters();
}
- LogTransferConfiguration();
+ if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
+ // Legacy transfers go straight into the data transfer phase without a
+ // handshake.
+ if (type() == TransferType::kReceive) {
+ SendTransferParameters(TransmitAction::kBegin);
+ } else {
+ SendInitialTransmitChunk();
+ }
- // Don't send an error packet. If the transfer failed to start, then there's
- // nothing to tell the server about.
+ LogTransferConfiguration();
+ return;
+ }
+
+ // In newer protocol versions, begin the initial transfer handshake.
+ Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
+ start_chunk.set_resource_id(resource_id_);
+
+ if (type() == TransferType::kReceive) {
+ // Parameters should still be set on the initial chunk for backwards
+ // compatibility if the server only supports the legacy protocol.
+ SetTransferParameters(start_chunk);
+ }
+
+ EncodeAndSendChunk(start_chunk);
}
void Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
@@ -128,22 +153,46 @@
void Context::SendInitialTransmitChunk() {
// A transmitter begins a transfer by sending the ID of the resource to which
// it wishes to write.
- //
- // TODO(frolv): Session ID should not be set here, but assigned by the server
- // in an initial handshake.
- Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart);
+ Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
chunk.set_session_id(session_id_);
- chunk.set_resource_id(session_id_);
EncodeAndSendChunk(chunk);
}
+void Context::UpdateTransferParameters() {
+ size_t pending_bytes =
+ std::min(max_parameters_->pending_bytes(),
+ static_cast<uint32_t>(writer().ConservativeWriteLimit()));
+
+ window_size_ = pending_bytes;
+ window_end_offset_ = offset_ + pending_bytes;
+
+ max_chunk_size_bytes_ = MaxWriteChunkSize(
+ max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
+}
+
+void Context::SetTransferParameters(Chunk& parameters) {
+ parameters.set_window_end_offset(window_end_offset_)
+ .set_max_chunk_size_bytes(max_chunk_size_bytes_)
+ .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
+ .set_offset(offset_);
+}
+
+void Context::UpdateAndSendTransferParameters(TransmitAction action) {
+ UpdateTransferParameters();
+
+ PW_LOG_INFO("Transfer rate: %u B/s",
+ static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
+
+ return SendTransferParameters(action);
+}
+
void Context::SendTransferParameters(TransmitAction action) {
Chunk::Type type = Chunk::Type::kParametersRetransmit;
switch (action) {
case TransmitAction::kBegin:
- type = Chunk::Type::kTransferStart;
+ type = Chunk::Type::kStart;
break;
case TransmitAction::kRetransmit:
type = Chunk::Type::kParametersRetransmit;
@@ -153,12 +202,9 @@
break;
}
- Chunk parameters(ProtocolVersion::kLegacy, type);
- parameters.set_session_id(session_id_)
- .set_window_end_offset(window_end_offset_)
- .set_max_chunk_size_bytes(max_chunk_size_bytes_)
- .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
- .set_offset(offset_);
+ Chunk parameters(configured_protocol_version_, type);
+ parameters.set_session_id(session_id_);
+ SetTransferParameters(parameters);
PW_LOG_DEBUG(
"Transfer %u sending transfer parameters: "
@@ -192,34 +238,37 @@
}
return;
}
-}
-void Context::UpdateAndSendTransferParameters(TransmitAction action) {
- size_t pending_bytes =
- std::min(max_parameters_->pending_bytes(),
- static_cast<uint32_t>(writer().ConservativeWriteLimit()));
-
- window_size_ = pending_bytes;
- window_end_offset_ = offset_ + pending_bytes;
-
- max_chunk_size_bytes_ = MaxWriteChunkSize(
- max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
-
- PW_LOG_INFO("Transfer rate: %u B/s",
- static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
-
- return SendTransferParameters(action);
+ last_chunk_sent_ = chunk.type();
}
void Context::Initialize(const NewTransferEvent& new_transfer) {
PW_DCHECK(!active());
+ PW_DCHECK_INT_NE(new_transfer.protocol_version,
+ ProtocolVersion::kUnknown,
+ "Cannot start a transfer with an unknown protocol");
+
session_id_ = new_transfer.session_id;
+ resource_id_ = new_transfer.resource_id;
+ desired_protocol_version_ = new_transfer.protocol_version;
+ configured_protocol_version_ = ProtocolVersion::kUnknown;
+
flags_ = static_cast<uint8_t>(new_transfer.type);
transfer_state_ = TransferState::kWaiting;
retries_ = 0;
max_retries_ = new_transfer.max_retries;
+ if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
+ // In a legacy transfer, there is no protocol negotiation stage.
+ // Automatically configure the context to run the legacy protocol and
+ // proceed to waiting for a chunk.
+ configured_protocol_version_ = ProtocolVersion::kLegacy;
+ transfer_state_ = TransferState::kWaiting;
+ } else {
+ transfer_state_ = TransferState::kInitiating;
+ }
+
rpc_writer_ = new_transfer.rpc_writer;
stream_ = new_transfer.stream;
@@ -231,6 +280,7 @@
max_parameters_ = new_transfer.max_parameters;
thread_ = new_transfer.transfer_thread;
+ last_chunk_sent_ = Chunk::Type::kStart;
last_chunk_offset_ = 0;
chunk_timeout_ = new_transfer.timeout;
interchunk_delay_ = chrono::SystemClock::for_at_least(
@@ -241,8 +291,6 @@
}
void Context::HandleChunkEvent(const ChunkEvent& event) {
- PW_DCHECK(event.session_id == session_id_);
-
Result<Chunk> maybe_chunk =
Chunk::Parse(ConstByteSpan(event.data, event.size));
if (!maybe_chunk.ok()) {
@@ -254,7 +302,9 @@
// Received some data. Reset the retry counter.
retries_ = 0;
- if (chunk.status().has_value()) {
+ if (chunk.IsTerminatingChunk()) {
+ // TODO(frolv): Handle the transfer completion handshake in version 2.
+
if (active()) {
Finish(chunk.status().value());
} else {
@@ -272,6 +322,114 @@
}
}
+void Context::PerformInitialHandshake(const Chunk& chunk) {
+ switch (chunk.type()) {
+ // Initial packet sent from a client to a server.
+ case Chunk::Type::kStart: {
+ UpdateLocalProtocolConfigurationFromPeer(chunk);
+
+ // This cast is safe as we know we're running in a transfer server.
+ uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
+
+ Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
+ start_ack.set_session_id(session_id_).set_resource_id(resource_id);
+
+ EncodeAndSendChunk(start_ack);
+ break;
+ }
+
+ // Response packet sent from a server to a client. Contains the assigned
+ // session_id of the transfer.
+ case Chunk::Type::kStartAck: {
+ UpdateLocalProtocolConfigurationFromPeer(chunk);
+
+ // Accept the assigned session_id and tell the server that the transfer
+ // can begin.
+ session_id_ = chunk.session_id();
+ PW_LOG_DEBUG("Transfer for resource %u was assigned session ID %u",
+ static_cast<unsigned>(resource_id_),
+ static_cast<unsigned>(session_id_));
+
+ Chunk start_ack_confirmation(configured_protocol_version_,
+ Chunk::Type::kStartAckConfirmation);
+ start_ack_confirmation.set_session_id(session_id_);
+
+ if (type() == TransferType::kReceive) {
+ // In a receive transfer, tag the initial transfer parameters onto the
+ // confirmation chunk so that the server can immediately begin sending
+ // data.
+ UpdateTransferParameters();
+ SetTransferParameters(start_ack_confirmation);
+ }
+
+ set_transfer_state(TransferState::kWaiting);
+ EncodeAndSendChunk(start_ack_confirmation);
+ break;
+ }
+
+ // Confirmation sent by a client to a server of the configured transfer
+ // version and session ID. Completes the handshake and begins the actual
+ // data transfer.
+ case Chunk::Type::kStartAckConfirmation: {
+ set_transfer_state(TransferState::kWaiting);
+
+ if (type() == TransferType::kTransmit) {
+ HandleTransmitChunk(chunk);
+ } else {
+ HandleReceiveChunk(chunk);
+ }
+ break;
+ }
+
+ // If a non-handshake chunk is received during an INITIATING state, the
+ // transfer peer is running a legacy protocol version, which does not
+ // perform a handshake. End the handshake, revert to the legacy protocol,
+ // and process the chunk appropriately.
+ case Chunk::Type::kData:
+ case Chunk::Type::kParametersRetransmit:
+ case Chunk::Type::kParametersContinue:
+ // Update the local context's session ID in case it was expecting one to
+ // be assigned by the server.
+ session_id_ = chunk.session_id();
+
+ configured_protocol_version_ = ProtocolVersion::kLegacy;
+ set_transfer_state(TransferState::kWaiting);
+
+ PW_LOG_DEBUG(
+ "Transfer %u tried to start on protocol version %d, but peer only "
+ "supports legacy",
+ id_for_log(),
+ static_cast<int>(desired_protocol_version_));
+
+ if (type() == TransferType::kTransmit) {
+ HandleTransmitChunk(chunk);
+ } else {
+ HandleReceiveChunk(chunk);
+ }
+ break;
+
+ case Chunk::Type::kCompletion:
+ case Chunk::Type::kCompletionAck:
+ PW_CRASH(
+ "Transfer completion packets should be processed by "
+ "HandleChunkEvent()");
+ break;
+ }
+}
+
+void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
+ PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
+ static_cast<int>(desired_protocol_version_),
+ static_cast<int>(chunk.protocol_version()));
+
+ configured_protocol_version_ =
+ std::min(desired_protocol_version_, chunk.protocol_version());
+
+ PW_LOG_INFO("Transfer %u: using protocol version %d",
+ id_for_log(),
+ static_cast<int>(configured_protocol_version_));
+}
+
void Context::HandleTransmitChunk(const Chunk& chunk) {
switch (transfer_state_) {
case TransferState::kInactive:
@@ -292,9 +450,24 @@
SendFinalStatusChunk();
return;
+ case TransferState::kInitiating:
+ PerformInitialHandshake(chunk);
+ return;
+
case TransferState::kWaiting:
case TransferState::kTransmitting:
- HandleTransferParametersUpdate(chunk);
+ if (chunk.protocol_version() == configured_protocol_version_) {
+ HandleTransferParametersUpdate(chunk);
+ } else {
+ PW_LOG_ERROR(
+ "Transmit transfer %u was configured to use protocol version %d "
+ "but received a chunk with version %d",
+ id_for_log(),
+ static_cast<int>(configured_protocol_version_),
+ static_cast<int>(chunk.protocol_version()));
+ Finish(Status::Internal());
+ }
+
if (transfer_state_ == TransferState::kCompleted) {
SendFinalStatusChunk();
}
@@ -363,7 +536,7 @@
}
void Context::TransmitNextChunk(bool retransmit_requested) {
- Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData);
+ Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
chunk.set_session_id(session_id_);
chunk.set_offset(offset_);
@@ -440,6 +613,7 @@
return;
}
+ last_chunk_sent_ = chunk.type();
flags_ |= kFlagsDataSent;
if (offset_ == window_end_offset_) {
@@ -455,12 +629,29 @@
}
void Context::HandleReceiveChunk(const Chunk& chunk) {
+ if (transfer_state_ == TransferState::kInitiating) {
+ PerformInitialHandshake(chunk);
+ return;
+ }
+
+ if (chunk.protocol_version() != configured_protocol_version_) {
+ PW_LOG_ERROR(
+ "Receive transfer %u was configured to use protocol version %d "
+ "but received a chunk with version %d",
+ id_for_log(),
+ static_cast<int>(configured_protocol_version_),
+ static_cast<int>(chunk.protocol_version()));
+ Finish(Status::Internal());
+ SendFinalStatusChunk();
+ return;
+ }
+
switch (transfer_state_) {
case TransferState::kInactive:
- PW_CRASH("Never should handle chunk while inactive");
-
case TransferState::kTransmitting:
- PW_CRASH("Receive transfer somehow entered TRANSMITTING state");
+ case TransferState::kInitiating:
+ PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
+ static_cast<int>(transfer_state_));
case TransferState::kCompleted:
// If the transfer has already completed and another chunk is received,
@@ -629,7 +820,7 @@
status_.code());
EncodeAndSendChunk(
- Chunk::Final(ProtocolVersion::kLegacy, session_id_, status_));
+ Chunk::Final(configured_protocol_version_, session_id_, status_));
}
void Context::Finish(Status status) {
@@ -667,6 +858,7 @@
TransmitNextChunk(/*retransmit_requested=*/false);
break;
+ case TransferState::kInitiating:
case TransferState::kWaiting:
case TransferState::kRecovery:
// A timeout occurring in a WAITING or RECOVERY state indicates that no
@@ -698,6 +890,12 @@
++retries_;
+ if (transfer_state_ == TransferState::kInitiating ||
+ last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
+ RetryHandshake();
+ return;
+ }
+
if (type() == TransferType::kReceive) {
// Resend the most recent transfer parameters.
PW_LOG_DEBUG(
@@ -721,8 +919,8 @@
// Otherwise, resend the most recent chunk. If the reader doesn't support
// seeking, this isn't possible, so just terminate the transfer immediately.
if (!reader().Seek(last_chunk_offset_).ok()) {
- PW_LOG_ERROR("Transmit transfer %d timed out waiting for new parameters.",
- static_cast<unsigned>(session_id_));
+ PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
+ id_for_log());
PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
Finish(Status::DeadlineExceeded());
return;
@@ -734,6 +932,43 @@
TransmitNextChunk(/*retransmit_requested=*/false);
}
+void Context::RetryHandshake() {
+ Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
+
+ switch (last_chunk_sent_) {
+ case Chunk::Type::kStart:
+ // No protocol version is yet configured at the time of sending the start
+ // chunk, so we use the client's desired version instead.
+ retry_chunk.set_protocol_version(desired_protocol_version_)
+ .set_resource_id(resource_id_);
+ if (type() == TransferType::kReceive) {
+ SetTransferParameters(retry_chunk);
+ }
+ break;
+
+ case Chunk::Type::kStartAck:
+ retry_chunk.set_session_id(session_id_)
+ .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
+ break;
+
+ case Chunk::Type::kStartAckConfirmation:
+ retry_chunk.set_session_id(session_id_);
+ if (type() == TransferType::kReceive) {
+ SetTransferParameters(retry_chunk);
+ }
+ break;
+
+ case Chunk::Type::kData:
+ case Chunk::Type::kParametersRetransmit:
+ case Chunk::Type::kParametersContinue:
+ case Chunk::Type::kCompletion:
+ case Chunk::Type::kCompletionAck:
+ PW_CRASH("Should not RetryHandshake() when not in handshake phase");
+ }
+
+ EncodeAndSendChunk(retry_chunk);
+}
+
uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
uint32_t channel_id) const {
// Start with the user-provided maximum chunk size, which should be the usable
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index b8672dc..fff63f0 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -3,6 +3,8 @@
===========
pw_transfer
===========
+``pw_transfer`` is a reliable data transfer protocol which runs on top of
+Pigweed RPC.
.. attention::
@@ -14,8 +16,83 @@
C++
===
-The transfer service is defined and registered with an RPC server like any other
-RPC service.
+
+Transfer thread
+---------------
+To run transfers as either a client or server (or both), a dedicated thread is
+required. The transfer thread is used to process all transfer-related events
+safely. The same transfer thread can be shared by a transfer client and service
+running on the same system.
+
+.. note::
+
+ All user-defined transfer callbacks (i.e. the virtual interface of a
+ ``TransferHandler`` or completion function in a transfer client) will be
+ invoked from the transfer thread's context.
+
+In order to operate, a transfer thread requires two buffers:
+
+- The first is a *chunk buffer*. This is used to stage transfer packets received
+ by the RPC system to be processed by the transfer thread. It must be large
+ enough to store the largest possible chunk the system supports.
+
+- The second is an *encode buffer*. This is used by the transfer thread to
+ encode outgoing RPC packets. It is necessarily larger than the chunk buffer.
+ Typically, this is sized to the system's maximum transmission unit at the
+ transport layer.
+
+A transfer thread is created by instantiating a ``pw::transfer::Thread``. This
+class derives from ``pw::thread::ThreadCore``, allowing it to directly be used
+when creating a system thread. Refer to :ref:`module-pw_thread-thread-creation`
+for additional information.
+
+**Example thread configuration**
+
+.. code-block:: cpp
+
+ #include "pw_transfer/transfer_thread.h"
+
+ namespace {
+
+ // The maximum number of concurrent transfers the thread should support as
+ // either a client or a server. These can be set to 0 (if only using one or
+ // the other).
+ constexpr size_t kMaxConcurrentClientTransfers = 5;
+ constexpr size_t kMaxConcurrentServerTransfers = 3;
+
+ // The maximum payload size that can be transmitted by the system's
+ // transport stack. This would typically be defined within some transport
+ // header.
+ constexpr size_t kMaxTransmissionUnit = 512;
+
+ // The maximum amount of data that should be sent within a single transfer
+ // packet. By necessity, this should be less than the max transmission unit.
+ //
+ // pw_transfer requires some additional per-packet overhead, so the actual
+ // amount of data it sends may be lower than this.
+ constexpr size_t kMaxTransferChunkSizeBytes = 480;
+
+ // Buffers for storing and encoding chunks (see documentation above).
+ std::array<std::byte, kMaxTransferChunkSizeBytes> chunk_buffer;
+ std::array<std::byte, kMaxTransmissionUnit> encode_buffer;
+
+ pw::transfer::Thread<kMaxConcurrentClientTransfers,
+ kMaxConcurrentServerTransfers>
+ transfer_thread(chunk_buffer, encode_buffer);
+
+ } // namespace
+
+ // pw::transfer::TransferThread is the generic, non-templated version of the
+ // Thread class. A Thread can implicitly convert to a TransferThread.
+ pw::transfer::TransferThread& GetSystemTransferThread() {
+ return transfer_thread;
+ }
+
+
+Transfer server
+---------------
+``pw_transfer`` provides an RPC service for running transfers through an RPC
+server.
To know how to read data from or write data to device, a ``TransferHandler``
interface is defined (``pw_transfer/public/pw_transfer/handler.h``). Transfer
@@ -25,19 +102,22 @@
or ``ReadWriteHandler`` as appropriate and override Prepare and Finalize methods
if necessary.
-A transfer handler should be implemented and instantiated for each unique data
-transfer to or from a device. These handlers are then registered with the
-transfer service using their resource IDs.
+A transfer handler should be implemented and instantiated for each unique
+resource that can be transferred to or from a device. Each instantiated handler
+must have a globally-unique integer ID used to identify the resource.
-**Example**
+Handlers are registered with the transfer service. This may be done during
+system initialization (for static resources), or dynamically at runtime to
+support ephemeral transfer resources.
+
+**Example transfer handler implementation**
.. code-block:: cpp
+ #include "pw_stream/memory_stream.h"
#include "pw_transfer/transfer.h"
- namespace {
-
- // Simple transfer handler which reads data from an in-memory buffer.
+ // A simple transfer handler which reads data from an in-memory buffer.
class SimpleBufferReadHandler : public pw::transfer::ReadOnlyHandler {
public:
SimpleReadTransfer(uint32_t resource_id, pw::ConstByteSpan data)
@@ -49,36 +129,110 @@
pw::stream::MemoryReader reader_;
};
- // The maximum amount of data that can be sent in a single chunk, excluding
- // transport layer overhead.
- constexpr size_t kMaxChunkSizeBytes = 256;
+The transfer service is instantiated with a reference to the system's transfer
+thread and registered with the system's RPC server.
- // In a write transfer, the maximum number of bytes to receive at one time,
+**Example transfer service initialization**
+
+.. code-block:: cpp
+
+ #include "pw_transfer/transfer.h"
+
+ namespace {
+
+ // In a write transfer, the maximum number of bytes to receive at one time
// (potentially across multiple chunks), unless specified otherwise by the
// transfer handler's stream::Writer.
constexpr size_t kDefaultMaxBytesToReceive = 1024;
- // Instantiate a static transfer service.
- // The service requires a work queue, and a buffer to store data from a chunk.
- // The helper class TransferServiceBuffer comes with a builtin buffer.
- pw::transfer::TransferServiceBuffer<kMaxChunkSizeBytes> transfer_service(
- GetSystemWorkQueue(), kDefaultMaxBytesToReceive);
+ pw::transfer::TransferService transfer_service(
+ GetSystemTransferThread(), kDefaultMaxBytesToReceive);
// Instantiate a handler for the data to be transferred. The resource ID will
// be used by the transfer client and server to identify the handler.
- constexpr uint32_t kBufferResourceId = 1;
- char buffer_to_transfer[256] = { /* ... */ };
- SimpleBufferReadHandler buffer_handler(kBufferResourceId, buffer_to_transfer);
+ constexpr uint32_t kMagicBufferResourceId = 1;
+ char magic_buffer_to_transfer[256] = { /* ... */ };
+ SimpleBufferReadHandler magic_buffer_handler(
+ kMagicBufferResourceId, magic_buffer_to_transfer);
} // namespace
- void InitTransfer() {
+ void InitTransferService() {
// Register the handler with the transfer service, then the transfer service
// with an RPC server.
- transfer_service.RegisterHandler(buffer_handler);
+ transfer_service.RegisterHandler(magic_buffer_handler);
GetSystemRpcServer().RegisterService(transfer_service);
}
+Transfer client
+---------------
+``pw_transfer`` provides a transfer client capable of running transfers through
+an RPC client.
+
+.. note::
+
+ Currently, a transfer client is only capable of running transfers on a single
+ RPC channel. This may be expanded in the future.
+
+The transfer client provides the following two APIs for starting data transfers:
+
+.. cpp:function:: pw::Status pw::transfer::Client::Read(uint32_t resource_id, pw::stream::Writer& output, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+
+ Reads data from a transfer server to the specified ``pw::stream::Writer``.
+ Invokes the provided callback function with the overall status of the
+ transfer.
+
+ Due to the asynchronous nature of transfer operations, this function will only
+ return a non-OK status if it is called with bad arguments. Otherwise, it will
+ return OK and errors will be reported through the completion callback.
+
+.. cpp:function:: pw::Status pw::transfer::Client::Write(uint32_t resource_id, pw::stream::Reader& input, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+
+ Writes data from a source ``pw::stream::Reader`` to a transfer server.
+ Invokes the provided callback function with the overall status of the
+ transfer.
+
+ Due to the asynchronous nature of transfer operations, this function will only
+ return a non-OK status if it is called with bad arguments. Otherwise, it will
+ return OK and errors will be reported through the completion callback.
+
+**Example client setup**
+
+.. code-block:: cpp
+
+ #include "pw_transfer/client.h"
+
+ namespace {
+
+ // RPC channel on which transfers should be run.
+ constexpr uint32_t kChannelId = 42;
+
+ pw::transfer::Client transfer_client(
+ GetSystemRpcClient(), kChannelId, GetSystemTransferThread());
+
+ } // namespace
+
+ Status ReadMagicBufferSync(pw::ByteSpan sink) {
+ pw::stream::Writer writer(sink);
+
+ struct {
+ pw::sync::ThreadNotification notification;
+ pw::Status status;
+ } transfer_state;
+
+ transfer_client.Read(
+ kMagicBufferResourceId,
+ writer,
+ [&transfer_state](pw::Status status) {
+ transfer_state.status = status;
+ transfer_state.notification.release();
+ });
+
+ // Block until the transfer completes.
+ transfer_state.notification.acquire();
+ return transfer_state.status;
+ }
+
Module Configuration Options
----------------------------
The following configurations can be adjusted via compile-time configuration of
diff --git a/pw_transfer/public/pw_transfer/client.h b/pw_transfer/public/pw_transfer/client.h
index 44cd638..50a6bf9 100644
--- a/pw_transfer/public/pw_transfer/client.h
+++ b/pw_transfer/public/pw_transfer/client.h
@@ -71,11 +71,11 @@
// the server is written to the provided writer. Returns OK if the transfer is
// successfully started. When the transfer finishes (successfully or not), the
// completion callback is invoked with the overall status.
- Status Read(
- uint32_t resource_id,
- stream::Writer& output,
- CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout);
+ Status Read(uint32_t resource_id,
+ stream::Writer& output,
+ CompletionFunc&& on_completion,
+ chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
+ ProtocolVersion version = kDefaultProtocolVersion);
// Begins a new write transfer for the given resource ID. Data from the
// provided reader is sent to the server. When the transfer finishes
@@ -85,7 +85,8 @@
uint32_t resource_id,
stream::Reader& input,
CompletionFunc&& on_completion,
- chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout);
+ chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
+ ProtocolVersion version = kDefaultProtocolVersion);
// Terminates an ongoing transfer for the specified resource ID.
//
@@ -103,6 +104,11 @@
}
private:
+ // TODO(frolv): This should be switched to default to kLatest once
+ // implementation of protocol v2 is complete.
+ static constexpr ProtocolVersion kDefaultProtocolVersion =
+ ProtocolVersion::kLegacy;
+
using Transfer = pw_rpc::raw::Transfer;
void OnRpcError(Status status, internal::TransferType type);
diff --git a/pw_transfer/public/pw_transfer/internal/chunk.h b/pw_transfer/public/pw_transfer/internal/chunk.h
index 72ce600..82f3cbf 100644
--- a/pw_transfer/public/pw_transfer/internal/chunk.h
+++ b/pw_transfer/public/pw_transfer/internal/chunk.h
@@ -24,12 +24,14 @@
class Chunk {
public:
enum class Type {
- kTransferData = 0,
- kTransferStart = 1,
+ kData = 0,
+ kStart = 1,
kParametersRetransmit = 2,
kParametersContinue = 3,
- kTransferCompletion = 4,
- kTransferCompletionAck = 5, // Currently unused.
+ kCompletion = 4,
+ kCompletionAck = 5, // Currently unused.
+ kStartAck = 6,
+ kStartAckConfirmation = 7,
};
// Constructs a new chunk with the given transfer protocol version. All fields
@@ -40,14 +42,16 @@
// Parses a chunk from a serialized protobuf message.
static Result<Chunk> Parse(ConstByteSpan message);
- // Partially decodes a transfer chunk to find its session ID field.
- static Result<uint32_t> ExtractSessionId(ConstByteSpan message);
+ // Partially decodes a transfer chunk to find its transfer context identifier.
+ // Depending on the protocol version and type of chunk, this may be one of
+ // several proto fields.
+ static Result<uint32_t> ExtractIdentifier(ConstByteSpan message);
// Creates a terminating status chunk within a transfer.
static Chunk Final(ProtocolVersion version,
uint32_t session_id,
Status status) {
- return Chunk(version, Type::kTransferCompletion)
+ return Chunk(version, Type::kCompletion)
.set_session_id(session_id)
.set_status(status);
}
@@ -70,6 +74,11 @@
return *this;
}
+ constexpr Chunk& set_protocol_version(ProtocolVersion version) {
+ protocol_version_ = version;
+ return *this;
+ }
+
constexpr Chunk& set_window_end_offset(uint32_t window_end_offset) {
window_end_offset_ = window_end_offset;
return *this;
@@ -137,6 +146,10 @@
return remaining_bytes_;
}
+ constexpr ProtocolVersion protocol_version() const {
+ return protocol_version_;
+ }
+
constexpr bool is_legacy() const {
return protocol_version_ == ProtocolVersion::kLegacy;
}
@@ -152,11 +165,11 @@
// continuation parameters. Therefore, there are only three possible chunk
// types: start, data, and retransmit.
if (IsInitialChunk()) {
- return Type::kTransferStart;
+ return Type::kStart;
}
if (has_payload()) {
- return Type::kTransferData;
+ return Type::kData;
}
return Type::kParametersRetransmit;
@@ -170,25 +183,40 @@
}
return type_.value() == Type::kParametersRetransmit ||
- type_.value() == Type::kTransferStart;
+ type_.value() == Type::kStartAckConfirmation ||
+ type_.value() == Type::kStart;
}
constexpr bool IsInitialChunk() const {
if (protocol_version_ >= ProtocolVersion::kVersionTwo) {
- return type_ == Type::kTransferStart;
+ return type_ == Type::kStart;
}
// In legacy versions of the transfer protocol, the chunk type is not always
// set. Infer that a chunk is initial if it has an offset of 0 and no data
// or status.
- return type_ == Type::kTransferStart ||
+ return type_ == Type::kStart ||
(offset_ == 0 && !has_payload() && !status_.has_value());
}
+ constexpr bool IsTerminatingChunk() const {
+ if (is_legacy()) {
+ return status_.has_value();
+ }
+
+ return type_ == Type::kCompletion || type_ == Type::kCompletionAck;
+ }
+
// The final chunk from the transmitter sets remaining_bytes to 0 in both Read
// and Write transfers.
constexpr bool IsFinalTransmitChunk() const { return remaining_bytes_ == 0u; }
+ // Returns true if this chunk is part of an initial transfer handshake.
+ constexpr bool IsInitialHandshakeChunk() const {
+ return type_ == Type::kStart || type_ == Type::kStartAck ||
+ type_ == Type::kStartAckConfirmation;
+ }
+
private:
constexpr Chunk(ProtocolVersion version, std::optional<Type> type)
: session_id_(0),
@@ -214,7 +242,7 @@
// chunk is processable. Following a response, the common protocol version
// will be determined and fields omitted as necessary.
constexpr bool ShouldEncodeLegacyFields() const {
- return is_legacy() || type_ == Chunk::Type::kTransferStart;
+ return is_legacy() || type_ == Type::kStart;
}
uint32_t session_id_;
diff --git a/pw_transfer/public/pw_transfer/internal/client_context.h b/pw_transfer/public/pw_transfer/internal/client_context.h
index a84af31..9912aa4 100644
--- a/pw_transfer/public/pw_transfer/internal/client_context.h
+++ b/pw_transfer/public/pw_transfer/internal/client_context.h
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
@@ -27,6 +27,13 @@
on_completion_ = std::move(on_completion);
}
+ // In client-side transfer contexts, a session ID may not yet have been
+ // assigned by the server, in which case resource_id is used as the context
+ // identifier.
+ constexpr uint32_t id() const {
+ return session_id() == kUnassignedSessionId ? resource_id() : session_id();
+ }
+
private:
Status FinalCleanup(Status status) override;
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index 41d51d8..d15c3e4 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -25,6 +25,7 @@
#include "pw_stream/stream.h"
#include "pw_transfer/internal/chunk.h"
#include "pw_transfer/internal/event.h"
+#include "pw_transfer/internal/protocol.h"
#include "pw_transfer/rate_estimate.h"
namespace pw::transfer::internal {
@@ -69,12 +70,15 @@
// Information about a single transfer.
class Context {
public:
+ static constexpr uint32_t kUnassignedSessionId = 0;
+
Context(const Context&) = delete;
Context(Context&&) = delete;
Context& operator=(const Context&) = delete;
Context& operator=(Context&&) = delete;
constexpr uint32_t session_id() const { return session_id_; }
+ constexpr uint32_t resource_id() const { return resource_id_; }
// True if the context has been used for a transfer (it has an ID).
bool initialized() const {
@@ -82,7 +86,7 @@
}
// True if the transfer is active.
- bool active() const { return transfer_state_ >= TransferState::kWaiting; }
+ bool active() const { return transfer_state_ >= TransferState::kInitiating; }
std::optional<chrono::SystemClock::time_point> timeout() const {
return active() && next_timeout_ != kNoTimeout
@@ -104,7 +108,10 @@
~Context() = default;
constexpr Context()
- : session_id_(0),
+ : session_id_(kUnassignedSessionId),
+ resource_id_(0),
+ desired_protocol_version_(ProtocolVersion::kUnknown),
+ configured_protocol_version_(ProtocolVersion::kUnknown),
flags_(0),
transfer_state_(TransferState::kInactive),
retries_(0),
@@ -117,6 +124,7 @@
max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
max_parameters_(nullptr),
thread_(nullptr),
+ last_chunk_sent_(Chunk::Type::kData),
last_chunk_offset_(0),
chunk_timeout_(chrono::SystemClock::duration::zero()),
interchunk_delay_(chrono::SystemClock::for_at_least(
@@ -132,18 +140,23 @@
// This ServerContext has never been used for a transfer. It is available
// for use for a transfer.
kInactive,
- // A transfer completed and the final status chunk was sent. The Context
- // is
- // available for use for a new transfer. A receive transfer uses this
- // state
- // to allow a transmitter to retry its last chunk if the final status
- // chunk
+
+ // A transfer completed and the final status chunk was sent. The Context is
+ // available for use for a new transfer. A receive transfer uses this state
+ // to allow a transmitter to retry its last chunk if the final status chunk
// was dropped.
kCompleted,
+
+ // Transfer is starting. The server and client are performing an initial
+ // handshake and negotiating protocol and feature flags.
+ kInitiating,
+
// Waiting for the other end to send a chunk.
kWaiting,
+
// Transmitting a window of data to a receiver.
kTransmitting,
+
// Recovering after one or more chunks was dropped in an active transfer.
kRecovery,
};
@@ -215,6 +228,11 @@
// Processes a chunk in either a transfer or receive transfer.
void HandleChunkEvent(const ChunkEvent& event);
+ // Runs the initial three-way handshake when starting a new transfer.
+ void PerformInitialHandshake(const Chunk& chunk);
+
+ void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);
+
// Processes a chunk in a transmit transfer.
void HandleTransmitChunk(const Chunk& chunk);
@@ -233,12 +251,18 @@
// Sends the first chunk in a transmit transfer.
void SendInitialTransmitChunk();
+ // Updates the current receive transfer parameters based on the context's
+ // configuration.
+ void UpdateTransferParameters();
+
+ // Populates the transfer parameters fields on a chunk object.
+ void SetTransferParameters(Chunk& parameters);
+
// In a receive transfer, sends a parameters chunk telling the transmitter
// how much data they can send.
void SendTransferParameters(TransmitAction action);
- // Updates the current receive transfer parameters from the provided object,
- // then sends them.
+ // Updates the current receive transfer parameters, then sends them.
void UpdateAndSendTransferParameters(TransmitAction action);
// Sends a final status chunk of a completed transfer without updating the
@@ -264,6 +288,7 @@
// Resends the last packet or aborts the transfer if the maximum retries has
// been exceeded.
void Retry();
+ void RetryHandshake();
void LogTransferConfiguration();
@@ -283,6 +308,15 @@
chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
uint32_t session_id_;
+ uint32_t resource_id_;
+
+ // The version of the transfer protocol that this node wants to run.
+ ProtocolVersion desired_protocol_version_;
+
+ // The version of the transfer protocol that the context is actually running,
+ // following negotiation with the transfer peer.
+ ProtocolVersion configured_protocol_version_;
+
uint8_t flags_;
TransferState transfer_state_;
uint8_t retries_;
@@ -300,6 +334,8 @@
const TransferParameters* max_parameters_;
TransferThread* thread_;
+ Chunk::Type last_chunk_sent_;
+
union {
Status status_; // Used when state is kCompleted.
uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery.
diff --git a/pw_transfer/public/pw_transfer/internal/event.h b/pw_transfer/public/pw_transfer/internal/event.h
index eb0ac4a..304b32d 100644
--- a/pw_transfer/public/pw_transfer/internal/event.h
+++ b/pw_transfer/public/pw_transfer/internal/event.h
@@ -16,6 +16,7 @@
#include "pw_chrono/system_clock.h"
#include "pw_rpc/writer.h"
#include "pw_stream/stream.h"
+#include "pw_transfer/internal/protocol.h"
namespace pw::transfer::internal {
@@ -68,6 +69,7 @@
struct NewTransferEvent {
TransferType type;
+ ProtocolVersion protocol_version;
uint32_t session_id;
uint32_t resource_id;
rpc::Writer* rpc_writer;
@@ -80,10 +82,13 @@
stream::Stream* stream; // In client-side transfers.
Handler* handler; // In server-side transfers.
};
+
+ const std::byte* raw_chunk_data;
+ size_t raw_chunk_size;
};
struct ChunkEvent {
- uint32_t session_id;
+ uint32_t context_identifier;
const std::byte* data;
size_t size;
};
@@ -96,6 +101,7 @@
struct SendStatusChunkEvent {
uint32_t session_id;
+ ProtocolVersion protocol_version;
Status::Code status;
TransferStream stream;
};
diff --git a/pw_transfer/public/pw_transfer/internal/protocol.h b/pw_transfer/public/pw_transfer/internal/protocol.h
index 376e0ed..f50cc8c 100644
--- a/pw_transfer/public/pw_transfer/internal/protocol.h
+++ b/pw_transfer/public/pw_transfer/internal/protocol.h
@@ -13,7 +13,9 @@
// the License.
#pragma once
-namespace pw::transfer::internal {
+#include <cstdint>
+
+namespace pw::transfer {
enum class ProtocolVersion {
// Protocol version not know or not set.
@@ -32,4 +34,13 @@
kLatest = kVersionTwo,
};
-} // namespace pw::transfer::internal
+constexpr bool ValidProtocolVersion(ProtocolVersion version) {
+ return version > ProtocolVersion::kUnknown &&
+ version <= ProtocolVersion::kLatest;
+}
+
+constexpr bool ValidProtocolVersion(uint32_t version) {
+ return ValidProtocolVersion(static_cast<ProtocolVersion>(version));
+}
+
+} // namespace pw::transfer
diff --git a/pw_transfer/public/pw_transfer/internal/server_context.h b/pw_transfer/public/pw_transfer/internal/server_context.h
index ce9a78b..265b75a 100644
--- a/pw_transfer/public/pw_transfer/internal/server_context.h
+++ b/pw_transfer/public/pw_transfer/internal/server_context.h
@@ -35,6 +35,9 @@
// Returns the pointer to the current handler.
const Handler* handler() { return handler_; }
+ // In server-side transfer contexts, a session ID always exists.
+ constexpr uint32_t id() const { return session_id(); }
+
private:
// Ends the transfer with the given status, calling the handler's Finalize
// method. No chunks are sent.
diff --git a/pw_transfer/public/pw_transfer/transfer.h b/pw_transfer/public/pw_transfer/transfer.h
index 1b03952..2f0055f 100644
--- a/pw_transfer/public/pw_transfer/transfer.h
+++ b/pw_transfer/public/pw_transfer/transfer.h
@@ -63,7 +63,8 @@
extend_window_divisor),
thread_(transfer_thread),
chunk_timeout_(chunk_timeout),
- max_retries_(max_retries) {}
+ max_retries_(max_retries),
+ next_session_id_(1) {}
TransferService(const TransferService&) = delete;
TransferService(TransferService&&) = delete;
@@ -121,11 +122,16 @@
private:
void HandleChunk(ConstByteSpan message, internal::TransferType type);
+ // TODO(frolv): This could be more sophisticated and less predictable.
+ uint32_t GenerateNewSessionId() { return next_session_id_++; }
+
internal::TransferParameters max_parameters_;
TransferThread& thread_;
chrono::SystemClock::duration chunk_timeout_;
uint8_t max_retries_;
+
+ uint32_t next_session_id_;
};
} // namespace pw::transfer
diff --git a/pw_transfer/public/pw_transfer/transfer_thread.h b/pw_transfer/public/pw_transfer/transfer_thread.h
index 16b779a..b35253c 100644
--- a/pw_transfer/public/pw_transfer/transfer_thread.h
+++ b/pw_transfer/public/pw_transfer/transfer_thread.h
@@ -46,16 +46,21 @@
encode_buffer_(encode_buffer) {}
void StartClientTransfer(TransferType type,
- uint32_t session_id,
+ ProtocolVersion version,
uint32_t resource_id,
stream::Stream* stream,
const TransferParameters& max_parameters,
Function<void(Status)>&& on_completion,
chrono::SystemClock::duration timeout,
uint8_t max_retries) {
+ uint32_t session_id = version == ProtocolVersion::kLegacy
+ ? resource_id
+ : Context::kUnassignedSessionId;
StartTransfer(type,
+ version,
session_id,
resource_id,
+ /*raw_chunk=*/{},
stream,
max_parameters,
std::move(on_completion),
@@ -64,14 +69,18 @@
}
void StartServerTransfer(TransferType type,
+ ProtocolVersion version,
uint32_t session_id,
uint32_t resource_id,
+ ConstByteSpan raw_chunk,
const TransferParameters& max_parameters,
chrono::SystemClock::duration timeout,
uint8_t max_retries) {
StartTransfer(type,
+ version,
session_id,
resource_id,
+ raw_chunk,
/*stream=*/nullptr,
max_parameters,
/*on_completion=*/nullptr,
@@ -160,10 +169,10 @@
// Finds an active server or client transfer.
template <typename T>
static Context* FindActiveTransfer(const span<T>& transfers,
- uint32_t session_id) {
- auto transfer =
- std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) {
- return c.initialized() && c.session_id() == session_id;
+ uint32_t context_identifier) {
+ auto transfer = std::find_if(
+ transfers.begin(), transfers.end(), [context_identifier](auto& c) {
+ return c.initialized() && c.id() == context_identifier;
});
return transfer != transfers.end() ? &*transfer : nullptr;
}
@@ -218,8 +227,10 @@
chrono::SystemClock::time_point GetNextTransferTimeout() const;
void StartTransfer(TransferType type,
+ ProtocolVersion version,
uint32_t session_id,
uint32_t resource_id,
+ ConstByteSpan raw_chunk,
stream::Stream* stream,
const TransferParameters& max_parameters,
Function<void(Status)>&& on_completion,
diff --git a/pw_transfer/transfer.cc b/pw_transfer/transfer.cc
index 2f6378e..682662c 100644
--- a/pw_transfer/transfer.cc
+++ b/pw_transfer/transfer.cc
@@ -30,18 +30,22 @@
}
if (chunk->IsInitialChunk()) {
- // TODO(frolv): Right now, session ID and resource ID are the same thing.
- // The session ID should be assigned by the server in response to the
- // initial chunk
+ uint32_t session_id =
+ chunk->is_legacy() ? chunk->session_id() : GenerateNewSessionId();
+ uint32_t resource_id =
+ chunk->is_legacy() ? chunk->session_id() : chunk->resource_id().value();
+
thread_.StartServerTransfer(type,
- chunk->session_id(),
- /*resource_id=*/chunk->session_id(),
+ chunk->protocol_version(),
+ session_id,
+ resource_id,
+ message,
max_parameters_,
chunk_timeout_,
max_retries_);
+ } else {
+ thread_.ProcessServerChunk(message);
}
-
- thread_.ProcessServerChunk(message);
}
} // namespace pw::transfer
diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto
index b5eb3ab..839eb2c 100644
--- a/pw_transfer/transfer.proto
+++ b/pw_transfer/transfer.proto
@@ -204,4 +204,14 @@
// Write → ID of transfer session
// Write ← ID of transfer session
optional uint32 session_id = 12;
+
+ // The protocol version to use for this transfer. Only sent during the initial
+ // handshake phase of a version 2 or higher transfer to negotiate a common
+ // protocol version between the client and server.
+ //
+ // Read → Desired (START) or configured (START_ACK_CONFIRMATION) version.
+ // Read ← Configured protocol version (START_ACK).
+ // Write → Desired (START) or configured (START_ACK_CONFIRMATION) version.
+ // Write ← Configured protocol version (START_ACK).
+ optional uint32 protocol_version = 13;
}
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index 75a5145..ee32151 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -37,7 +37,6 @@
}
using internal::Chunk;
-using internal::ProtocolVersion;
class TestMemoryReader : public stream::SeekableReader {
public:
@@ -141,7 +140,7 @@
TEST_F(ReadTransfer, SingleChunk) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
@@ -179,7 +178,7 @@
TEST_F(ReadTransfer, MultiChunk) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -240,7 +239,7 @@
TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -276,7 +275,7 @@
TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -315,7 +314,7 @@
handler_.set_seek_status(Status::Unimplemented());
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -336,7 +335,7 @@
TEST_F(ReadTransfer, MaxChunkSize_Client) {
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_max_chunk_size_bytes(8)
@@ -395,7 +394,7 @@
TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
@@ -414,7 +413,7 @@
handler_.finalize_read_called = false;
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
@@ -436,7 +435,7 @@
// TODO(frolv): Fix
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
// .set_max_chunk_size_bytes(16)
@@ -495,7 +494,7 @@
TEST_F(ReadTransfer, ClientError) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -518,7 +517,7 @@
TEST_F(ReadTransfer, UnregisteredHandler) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(11)
.set_window_end_offset(32)
.set_offset(0)));
@@ -538,7 +537,7 @@
.set_window_end_offset(32)
.set_offset(3)));
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(3)
.set_payload(span(kData).first(10))
.set_offset(3)));
@@ -553,7 +552,7 @@
TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -566,7 +565,7 @@
handler_.prepare_read_called = false; // Reset so can check if called again.
ctx_.SendClientStream( // Resend starting chunk
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(16)
.set_offset(0)));
@@ -595,7 +594,7 @@
TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(0)
.set_offset(0)));
@@ -614,7 +613,7 @@
handler_.set_read_status(Status::OutOfRange());
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(0)
.set_offset(0)));
@@ -636,7 +635,7 @@
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(3)
.set_window_end_offset(64)
.set_offset(0)));
@@ -761,9 +760,8 @@
};
TEST_F(WriteTransfer, SingleChunk) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -777,7 +775,7 @@
EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -799,11 +797,10 @@
// Return an error when FinalizeWrite is called.
handler_.set_finalize_write_return(Status::FailedPrecondition());
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -821,15 +818,14 @@
}
TEST_F(WriteTransfer, SendingFinalPacketFails) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ctx_.output().set_send_status(Status::Unknown());
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -850,9 +846,8 @@
}
TEST_F(WriteTransfer, MultiChunk) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -864,7 +859,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(8))));
@@ -873,7 +868,7 @@
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(8)
.set_payload(span(kData).subspan(8))
@@ -899,7 +894,7 @@
rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
// Send only one client packet so the service times out.
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(7)));
transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
});
@@ -914,9 +909,8 @@
}
TEST_F(WriteTransfer, TimeoutInRecoveryState) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -928,14 +922,14 @@
constexpr span data(kData);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(data.first(8))));
// Skip offset 8 to enter a recovery state.
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(12)
.set_payload(data.subspan(12, 4))));
@@ -961,9 +955,8 @@
}
TEST_F(WriteTransfer, ExtendWindow) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -976,7 +969,7 @@
// Window starts at 32 bytes and should extend when half of that is sent.
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(4))));
@@ -984,7 +977,7 @@
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(4)
.set_payload(span(kData).subspan(4, 4))));
@@ -992,7 +985,7 @@
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(8)
.set_payload(span(kData).subspan(8, 4))));
@@ -1000,7 +993,7 @@
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(12)
.set_payload(span(kData).subspan(12, 4))));
@@ -1014,7 +1007,7 @@
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(16)
.set_payload(span(kData).subspan(16))
@@ -1038,9 +1031,8 @@
};
TEST_F(WriteTransfer, TransmitterReducesWindow) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1053,7 +1045,7 @@
// Send only 12 bytes and set that as the new end offset.
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_window_end_offset(12)
@@ -1071,9 +1063,8 @@
}
TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1085,7 +1076,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
// Larger window end offset than the receiver's.
@@ -1101,9 +1092,8 @@
}
TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1115,7 +1105,7 @@
EXPECT_EQ(chunk.window_end_offset(), 16u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(8))));
@@ -1128,7 +1118,7 @@
EXPECT_EQ(chunk.window_end_offset(), 24u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(8)
.set_payload(span(kData).subspan(8, 8))));
@@ -1141,7 +1131,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(16)
.set_payload(span(kData).subspan(16, 8))));
@@ -1154,7 +1144,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(24)
.set_payload(span(kData).subspan(24))
@@ -1174,9 +1164,8 @@
TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
// Default max bytes is smaller than buffer.
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -1193,7 +1182,7 @@
ctx_.service().RegisterHandler(handler_);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(987)));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1206,9 +1195,8 @@
}
TEST_F(WriteTransfer, UnexpectedOffset) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1221,7 +1209,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(8))));
@@ -1230,7 +1218,7 @@
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(4) // incorrect
.set_payload(span(kData).subspan(8))
@@ -1244,7 +1232,7 @@
EXPECT_EQ(chunk.window_end_offset(), 32u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(8) // correct
.set_payload(span(kData).subspan(8))
@@ -1263,9 +1251,8 @@
}
TEST_F(WriteTransferMaxBytes16, TooMuchData) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1278,7 +1265,7 @@
// window_end_offset = 16, but send 24 bytes of data.
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(24))));
@@ -1293,7 +1280,7 @@
TEST_F(WriteTransfer, UnregisteredHandler) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(999)));
transfer_thread_.WaitUntilEventIsProcessed();
@@ -1305,9 +1292,8 @@
}
TEST_F(WriteTransfer, ClientError) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1329,16 +1315,15 @@
}
TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
constexpr span data(kData);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(data.first(1))));
@@ -1346,7 +1331,7 @@
// Drop offset 1, then send the rest of the data.
for (uint32_t i = 2; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(i)
.set_payload(data.subspan(i, 1))));
@@ -1361,7 +1346,7 @@
// Send the remaining data.
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(1)
.set_payload(data.subspan(1, 31))
@@ -1373,9 +1358,8 @@
}
TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -1385,7 +1369,7 @@
// Skip offset 0, then send the rest of the data.
for (uint32_t i = 1; i < kData.size(); ++i) {
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(i)
.set_payload(data.subspan(i, 1))));
@@ -1396,7 +1380,7 @@
ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
const auto last_chunk =
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(kData.size() - 1)
.set_payload(data.last(1)));
@@ -1418,7 +1402,7 @@
// Resumes normal operation when correct offset is sent.
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -1430,15 +1414,14 @@
}
TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -1451,7 +1434,7 @@
EXPECT_EQ(chunk.status().value(), OkStatus());
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -1466,11 +1449,11 @@
TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(3)));
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(10))
@@ -1484,15 +1467,14 @@
}
TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 1u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(8))));
@@ -1505,9 +1487,8 @@
handler_.prepare_write_called = false; // Reset to check it's called again.
// Simulate client disappearing then restarting the transfer.
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1519,7 +1500,7 @@
ASSERT_EQ(ctx_.total_responses(), 2u);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(kData)
@@ -1557,7 +1538,7 @@
ctx_.service().RegisterHandler(unavailable_handler);
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(88)
.set_window_end_offset(128)
.set_offset(0)));
@@ -1573,7 +1554,7 @@
// TODO(frolv): This won't work until completion ACKs are supported.
if (false) {
ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
.set_session_id(88)
.set_window_end_offset(128)
.set_offset(0)));
@@ -1590,9 +1571,8 @@
}
TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
- ctx_.SendClientStream(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
- .set_session_id(7)));
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_TRUE(handler_.prepare_write_called);
@@ -1608,7 +1588,7 @@
ctx_.service().set_max_pending_bytes(12);
ctx_.SendClientStream<64>(
- EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
.set_session_id(7)
.set_offset(0)
.set_payload(span(kData).first(8))));
@@ -1622,5 +1602,682 @@
EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
}
+TEST_F(ReadTransfer, Version2_SimpleTransfer) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake by confirming the server's ACK and sending the first
+ // read transfer parameters.
+ rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)
+ .set_window_end_offset(64)
+ .set_offset(0)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ });
+
+ // Server should respond by starting the data transfer, sending its sole data
+ // chunk and a remaining_bytes 0 chunk.
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kData);
+ EXPECT_EQ(c1.session_id(), 1u);
+ EXPECT_EQ(c1.offset(), 0u);
+ ASSERT_TRUE(c1.has_payload());
+ ASSERT_EQ(c1.payload().size(), kData.size());
+ EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+ 0);
+
+ Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+ EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c2.type(), Chunk::Type::kData);
+ EXPECT_EQ(c2.session_id(), 1u);
+ EXPECT_FALSE(c2.has_payload());
+ EXPECT_EQ(c2.remaining_bytes(), 0u);
+
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_MultiChunk) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake by confirming the server's ACK and sending the first
+ // read transfer parameters.
+ rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)
+ .set_window_end_offset(64)
+ .set_max_chunk_size_bytes(16)
+ .set_offset(0)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ });
+
+ ASSERT_EQ(ctx_.total_responses(), 4u);
+
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kData);
+ EXPECT_EQ(c1.session_id(), 1u);
+ EXPECT_EQ(c1.offset(), 0u);
+ ASSERT_TRUE(c1.has_payload());
+ ASSERT_EQ(c1.payload().size(), 16u);
+ EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+ 0);
+
+ Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+ EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c2.type(), Chunk::Type::kData);
+ EXPECT_EQ(c2.session_id(), 1u);
+ EXPECT_EQ(c2.offset(), 16u);
+ ASSERT_TRUE(c2.has_payload());
+ ASSERT_EQ(c2.payload().size(), 16u);
+ EXPECT_EQ(
+ std::memcmp(
+ c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
+ 0);
+
+ Chunk c3 = DecodeChunk(ctx_.responses()[3]);
+ EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c3.type(), Chunk::Type::kData);
+ EXPECT_EQ(c3.session_id(), 1u);
+ EXPECT_FALSE(c3.has_payload());
+ EXPECT_EQ(c3.remaining_bytes(), 0u);
+
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_MultiParameters) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake by confirming the server's ACK and sending the first
+ // read transfer parameters.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)
+ .set_window_end_offset(16)
+ .set_offset(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kData);
+ EXPECT_EQ(c1.session_id(), 1u);
+ EXPECT_EQ(c1.offset(), 0u);
+ ASSERT_TRUE(c1.has_payload());
+ ASSERT_EQ(c1.payload().size(), 16u);
+ EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+ 0);
+
+ rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
+ .set_session_id(1)
+ .set_window_end_offset(64)
+ .set_offset(16)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+ });
+
+ ASSERT_EQ(ctx_.total_responses(), 4u);
+
+ Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+ EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c2.type(), Chunk::Type::kData);
+ EXPECT_EQ(c2.session_id(), 1u);
+ EXPECT_EQ(c2.offset(), 16u);
+ ASSERT_TRUE(c2.has_payload());
+ ASSERT_EQ(c2.payload().size(), 16u);
+ EXPECT_EQ(
+ std::memcmp(
+ c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
+ 0);
+
+ Chunk c3 = DecodeChunk(ctx_.responses()[3]);
+ EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c3.type(), Chunk::Type::kData);
+ EXPECT_EQ(c3.session_id(), 1u);
+ EXPECT_FALSE(c3.has_payload());
+ EXPECT_EQ(c3.remaining_bytes(), 0u);
+
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Send a terminating chunk instead of the third part of the handshake.
+ ctx_.SendClientStream(EncodeChunk(Chunk::Final(
+ ProtocolVersion::kVersionTwo, 1, Status::ResourceExhausted())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake by confirming the server's ACK and sending the first
+ // read transfer parameters.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)
+ .set_window_end_offset(16)
+ .set_offset(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kData);
+ EXPECT_EQ(c1.session_id(), 1u);
+ EXPECT_EQ(c1.offset(), 0u);
+ ASSERT_TRUE(c1.has_payload());
+ ASSERT_EQ(c1.payload().size(), 16u);
+ EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+ 0);
+
+ // Send a parameters update, but with the incorrect protocol version. The
+ // server should terminate the transfer.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
+ .set_session_id(1)
+ .set_window_end_offset(64)
+ .set_offset(16)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), Status::Internal());
+
+ EXPECT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
+}
+
+TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(3)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_read_called);
+ EXPECT_FALSE(handler_.finalize_read_called);
+
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 3u);
+
+ // Complete the handshake, but send an invalid parameters chunk. The server
+ // should terminate the transfer.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)
+ .set_window_end_offset(0)
+ .set_offset(0)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(c1.session_id(), 1u);
+ ASSERT_TRUE(c1.status().has_value());
+ EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, Version2_InvalidResourceId) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(99)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.status().value(), Status::NotFound());
+}
+
+TEST_F(WriteTransfer, Version2_SimpleTransfer) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(7)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_write_called);
+ EXPECT_FALSE(handler_.finalize_write_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 7u);
+
+ // Complete the handshake by confirming the server's ACK.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Server should respond by sending its initial transfer parameters.
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ chunk = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+ ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+ EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+ // Send all of our data.
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_payload(kData)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_TRUE(handler_.finalize_write_called);
+ EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+ EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_Multichunk) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(7)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_write_called);
+ EXPECT_FALSE(handler_.finalize_write_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 7u);
+
+ // Complete the handshake by confirming the server's ACK.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Server should respond by sending its initial transfer parameters.
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ chunk = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+ ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+ EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+ // Send all of our data across two chunks.
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_payload(span(kData).first(8))));
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(8)
+ .set_payload(span(kData).subspan(8))
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_TRUE(handler_.finalize_write_called);
+ EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+ EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_ContinueParameters) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(7)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_write_called);
+ EXPECT_FALSE(handler_.finalize_write_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 7u);
+
+ // Complete the handshake by confirming the server's ACK.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Server should respond by sending its initial transfer parameters.
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ chunk = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+ ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+ EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+ // Send all of our data across several chunks.
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_payload(span(kData).first(8))));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(8)
+ .set_payload(span(kData).subspan(8, 8))));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 16u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(16)
+ .set_payload(span(kData).subspan(16, 8))));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+ ASSERT_EQ(ctx_.total_responses(), 4u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 24u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(24)
+ .set_payload(span(kData).subspan(24))
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 5u);
+
+ chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ ASSERT_TRUE(chunk.status().has_value());
+ EXPECT_EQ(chunk.status().value(), OkStatus());
+
+ EXPECT_TRUE(handler_.finalize_write_called);
+ EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+ EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(7)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_write_called);
+ EXPECT_FALSE(handler_.finalize_write_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 7u);
+
+ // Send an error chunk instead of completing the handshake.
+ ctx_.SendClientStream(EncodeChunk(Chunk::Final(
+ ProtocolVersion::kVersionTwo, 1, Status::FailedPrecondition())));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ EXPECT_TRUE(handler_.finalize_write_called);
+ EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
+}
+
+TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(7)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ EXPECT_TRUE(handler_.prepare_write_called);
+ EXPECT_FALSE(handler_.finalize_write_called);
+
+ // First, the server responds with a START_ACK, assigning a session ID and
+ // confirming the protocol version.
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.resource_id(), 7u);
+
+ // Complete the handshake by confirming the server's ACK.
+ ctx_.SendClientStream(EncodeChunk(
+ Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+ .set_session_id(1)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Server should respond by sending its initial transfer parameters.
+ ASSERT_EQ(ctx_.total_responses(), 2u);
+
+ chunk = DecodeChunk(ctx_.responses()[1]);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+ EXPECT_EQ(chunk.session_id(), 1u);
+ EXPECT_EQ(chunk.offset(), 0u);
+ EXPECT_EQ(chunk.window_end_offset(), 32u);
+ ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+ EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+ // The transfer was configured to use protocol version 2. Send a legacy chunk
+ // instead.
+ ctx_.SendClientStream<64>(
+ EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+ .set_session_id(1)
+ .set_offset(0)
+ .set_payload(kData)
+ .set_remaining_bytes(0)));
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ // Server should terminate the transfer.
+ ASSERT_EQ(ctx_.total_responses(), 3u);
+
+ chunk = DecodeChunk(ctx_.responses()[2]);
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.status().value(), Status::Internal());
+}
+
+TEST_F(WriteTransfer, Version2_InvalidResourceId) {
+ ctx_.SendClientStream(
+ EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+ .set_resource_id(99)));
+
+ transfer_thread_.WaitUntilEventIsProcessed();
+
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+ EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+ EXPECT_EQ(chunk.status().value(), Status::NotFound());
+}
+
} // namespace
} // namespace pw::transfer::test
diff --git a/pw_transfer/transfer_thread.cc b/pw_transfer/transfer_thread.cc
index 85cfa48..c863efc 100644
--- a/pw_transfer/transfer_thread.cc
+++ b/pw_transfer/transfer_thread.cc
@@ -36,7 +36,7 @@
next_event_.type = type;
next_event_.chunk = {};
- next_event_.chunk.session_id = session_id;
+ next_event_.chunk.context_identifier = session_id;
event_notification_.release();
@@ -99,8 +99,10 @@
}
void TransferThread::StartTransfer(TransferType type,
+ ProtocolVersion version,
uint32_t session_id,
uint32_t resource_id,
+ ConstByteSpan raw_chunk,
stream::Stream* stream,
const TransferParameters& max_parameters,
Function<void(Status)>&& on_completion,
@@ -113,14 +115,22 @@
next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
: EventType::kNewServerTransfer;
+
+ if (!raw_chunk.empty()) {
+ std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
+ }
+
next_event_.new_transfer = {
.type = type,
+ .protocol_version = version,
.session_id = session_id,
.resource_id = resource_id,
.max_parameters = &max_parameters,
.timeout = timeout,
.max_retries = max_retries,
.transfer_thread = this,
+ .raw_chunk_data = chunk_buffer_.data(),
+ .raw_chunk_size = raw_chunk.size(),
};
staged_on_completion_ = std::move(on_completion);
@@ -147,6 +157,7 @@
next_event_.type = EventType::kSendStatusChunk;
next_event_.send_status_chunk = {
.session_id = session_id,
+ .protocol_version = version,
.status = Status::NotFound().code(),
.stream = type == TransferType::kTransmit
? TransferStream::kServerRead
@@ -165,9 +176,9 @@
PW_CHECK(chunk.size() <= chunk_buffer_.size(),
"Transfer received a larger chunk than it can handle.");
- Result<uint32_t> session_id = Chunk::ExtractSessionId(chunk);
- if (!session_id.ok()) {
- PW_LOG_ERROR("Received a malformed chunk without a session ID");
+ Result<uint32_t> identifier = Chunk::ExtractIdentifier(chunk);
+ if (!identifier.ok()) {
+ PW_LOG_ERROR("Received a malformed chunk without a context identifier");
return;
}
@@ -178,7 +189,7 @@
next_event_.type = type;
next_event_.chunk = {
- .session_id = *session_id,
+ .context_identifier = *identifier,
.data = chunk_buffer_.data(),
.size = chunk.size(),
};
@@ -347,6 +358,7 @@
// On the server, send a status chunk back to the client.
SendStatusChunk(
{.session_id = event.new_transfer.session_id,
+ .protocol_version = event.new_transfer.protocol_version,
.status = Status::ResourceExhausted().code(),
.stream = event.new_transfer.type == TransferType::kTransmit
? TransferStream::kServerRead
@@ -373,14 +385,18 @@
return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
case EventType::kClientChunk:
- return FindActiveTransfer(client_transfers_, event.chunk.session_id);
+ return FindActiveTransfer(client_transfers_,
+ event.chunk.context_identifier);
case EventType::kServerChunk:
- return FindActiveTransfer(server_transfers_, event.chunk.session_id);
+ return FindActiveTransfer(server_transfers_,
+ event.chunk.context_identifier);
case EventType::kClientTimeout: // Manually triggered client timeout
- return FindActiveTransfer(client_transfers_, event.chunk.session_id);
+ return FindActiveTransfer(client_transfers_,
+ event.chunk.context_identifier);
case EventType::kServerTimeout: // Manually triggered server timeout
- return FindActiveTransfer(server_transfers_, event.chunk.session_id);
+ return FindActiveTransfer(server_transfers_,
+ event.chunk.context_identifier);
case EventType::kClientEndTransfer:
return FindActiveTransfer(client_transfers_,
@@ -404,7 +420,7 @@
rpc::Writer& destination = stream_for(event.stream);
Result<ConstByteSpan> result =
- Chunk::Final(ProtocolVersion::kLegacy, event.session_id, event.status)
+ Chunk::Final(event.protocol_version, event.session_id, event.status)
.Encode(chunk_buffer_);
if (!result.ok()) {
diff --git a/pw_transfer/transfer_thread_test.cc b/pw_transfer/transfer_thread_test.cc
index 9fba333..b7524bb 100644
--- a/pw_transfer/transfer_thread_test.cc
+++ b/pw_transfer/transfer_thread_test.cc
@@ -106,8 +106,10 @@
transfer_thread_.AddTransferHandler(handler);
transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
+ ProtocolVersion::kLegacy,
3,
3,
+ {},
max_parameters_,
std::chrono::seconds(2),
0);
@@ -128,8 +130,10 @@
transfer_thread_.RemoveTransferHandler(handler);
transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
+ ProtocolVersion::kLegacy,
3,
3,
+ {},
max_parameters_,
std::chrono::seconds(2),
0);
@@ -154,21 +158,21 @@
SimpleReadTransfer handler(3, kData);
transfer_thread_.AddTransferHandler(handler);
- transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
- 3,
- 3,
- max_parameters_,
- std::chrono::seconds(2),
- 0);
-
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
- transfer_thread_.ProcessServerChunk(
- EncodeChunk(Chunk(internal::ProtocolVersion::kLegacy,
- Chunk::Type::kParametersRetransmit)
- .set_session_id(3)
- .set_window_end_offset(16)
- .set_max_chunk_size_bytes(8)
- .set_offset(0)));
+ transfer_thread_.StartServerTransfer(
+ internal::TransferType::kTransmit,
+ ProtocolVersion::kLegacy,
+ 3,
+ 3,
+ EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(3)
+ .set_window_end_offset(16)
+ .set_max_chunk_size_bytes(8)
+ .set_offset(0)),
+ max_parameters_,
+ std::chrono::seconds(2),
+ 0);
});
ASSERT_EQ(ctx_.total_responses(), 2u);
@@ -201,31 +205,48 @@
transfer_thread_.AddTransferHandler(handler3);
transfer_thread_.AddTransferHandler(handler4);
- transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
- 3,
- 3,
- max_parameters_,
- std::chrono::seconds(2),
- 0);
+ transfer_thread_.StartServerTransfer(
+ internal::TransferType::kTransmit,
+ ProtocolVersion::kLegacy,
+ 3,
+ 3,
+ EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(3)
+ .set_window_end_offset(16)
+ .set_max_chunk_size_bytes(8)
+ .set_offset(0)),
+ max_parameters_,
+ std::chrono::seconds(2),
+ 0);
transfer_thread_.WaitUntilEventIsProcessed();
// First transfer starts correctly.
EXPECT_TRUE(handler3.prepare_read_called);
EXPECT_FALSE(handler4.prepare_read_called);
+ ASSERT_EQ(ctx_.total_responses(), 1u);
// Try to start a simultaneous transfer to resource 4, for which the thread
// does not have an available context.
- transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
- 4,
- 4,
- max_parameters_,
- std::chrono::seconds(2),
- 0);
+ transfer_thread_.StartServerTransfer(
+ internal::TransferType::kTransmit,
+ ProtocolVersion::kLegacy,
+ 4,
+ 4,
+ EncodeChunk(
+ Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+ .set_session_id(4)
+ .set_window_end_offset(16)
+ .set_max_chunk_size_bytes(8)
+ .set_offset(0)),
+ max_parameters_,
+ std::chrono::seconds(2),
+ 0);
transfer_thread_.WaitUntilEventIsProcessed();
EXPECT_FALSE(handler4.prepare_read_called);
- ASSERT_EQ(ctx_.total_responses(), 1u);
+ ASSERT_EQ(ctx_.total_responses(), 2u);
auto chunk = DecodeChunk(ctx_.response());
EXPECT_EQ(chunk.session_id(), 4u);
ASSERT_TRUE(chunk.status().has_value());
@@ -248,7 +269,7 @@
transfer_thread_.StartClientTransfer(
internal::TransferType::kReceive,
- 3,
+ ProtocolVersion::kLegacy,
3,
&buffer3,
max_parameters_,
@@ -264,7 +285,7 @@
// does not have an available context.
transfer_thread_.StartClientTransfer(
internal::TransferType::kReceive,
- 4,
+ ProtocolVersion::kLegacy,
4,
&buffer4,
max_parameters_,
@@ -275,6 +296,9 @@
EXPECT_EQ(status3, Status::Unknown());
EXPECT_EQ(status4, Status::ResourceExhausted());
+
+ transfer_thread_.EndClientTransfer(3, Status::Cancelled());
+ transfer_thread_.EndClientTransfer(4, Status::Cancelled());
}
} // namespace