pw_transfer: Version 2 opening handshake in C++

This implements the opening handshake of pw_transfer's version 2
protocol within the C++ transfer client and server. The handshake
consists of protocol version negotiation and ephemeral session ID
assignment.

The protocol version used for a transfer session is controlled by the
client when it starts a new transfer. By default, this still remains the
legacy protocol for now, though the client API is extended to allow
specifying the desired version.

If the START chunk a transfer service receives is configured for version
2, the service will assign a transfer session ID and proceed with the
handshake. Otherwise, it will fall back to the legacy protocol.

The version 2 START chunk sent by a client retains all of the chunk
proto fields set by the legacy protocol, allowing it to be understood
by a server which is not version 2 aware. In such a case, the server
will process the chunk per the legacy protocol and send a non-handshake
response. The client will recognize the legacy response and revert to
running the legacy protocol.

As a result of this, version 2 capable transfer clients and servers
remain fully backwards-compatible with older code that only runs the
legacy protocol.

Change-Id: Ie0a295509e754b963d3a78593ba1c43bbe13c977
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/99500
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
diff --git a/pw_thread/docs.rst b/pw_thread/docs.rst
index 75df4b2..7e38489 100644
--- a/pw_thread/docs.rst
+++ b/pw_thread/docs.rst
@@ -145,6 +145,9 @@
     const pw::thread::Id my_id = pw::this_thread::get_id();
   }
 
+
+.. _module-pw_thread-thread-creation:
+
 ---------------
 Thread Creation
 ---------------
diff --git a/pw_transfer/chunk.cc b/pw_transfer/chunk.cc
index cf1b84c..ca51aca 100644
--- a/pw_transfer/chunk.cc
+++ b/pw_transfer/chunk.cc
@@ -24,30 +24,45 @@
 
 namespace ProtoChunk = transfer::Chunk;
 
-Result<uint32_t> Chunk::ExtractSessionId(ConstByteSpan message) {
+Result<uint32_t> Chunk::ExtractIdentifier(ConstByteSpan message) {
   protobuf::Decoder decoder(message);
 
   uint32_t session_id = 0;
+  uint32_t resource_id = 0;
+
+  // During the initial handshake, a START_ACK chunk sent from the server
+  // to a client identifies its transfer context using a resource_id, as
+  // the client does not yet know its session_id.
+  bool should_use_resource_id = false;
 
   while (decoder.Next().ok()) {
     ProtoChunk::Fields field =
         static_cast<ProtoChunk::Fields>(decoder.FieldNumber());
 
     if (field == ProtoChunk::Fields::TRANSFER_ID) {
-      // Interpret a legacy transfer_id field as a session ID, but don't
-      // return immediately. Instead, check to see if the message also
-      // contains a newer session_id field.
-      PW_TRY(decoder.ReadUint32(&session_id));
-
+      // Interpret a legacy transfer_id field as a session ID if an explicit
+      // session_id field has not already been seen.
+      if (session_id == 0) {
+        PW_TRY(decoder.ReadUint32(&session_id));
+      }
     } else if (field == ProtoChunk::Fields::SESSION_ID) {
-      // A session_id field always takes precedence over transfer_id, so
-      // return it immediately when encountered.
+      // A session_id field always takes precedence over transfer_id.
       PW_TRY(decoder.ReadUint32(&session_id));
-      return session_id;
+    } else if (field == ProtoChunk::Fields::TYPE) {
+      // Check if the chunk is a START_ACK.
+      uint32_t type;
+      PW_TRY(decoder.ReadUint32(&type));
+      should_use_resource_id = static_cast<Type>(type) == Type::kStartAck;
+    } else if (field == ProtoChunk::Fields::RESOURCE_ID) {
+      PW_TRY(decoder.ReadUint32(&resource_id));
     }
   }
 
-  if (session_id != 0) {
+  if (should_use_resource_id) {
+    if (resource_id != 0) {
+      return resource_id;
+    }
+  } else if (session_id != 0) {
     return session_id;
   }
 
@@ -61,9 +76,9 @@
 
   Chunk chunk;
 
-  // Assume the legacy protocol by default. Field presence in the serialized
-  // message may change this.
-  chunk.protocol_version_ = ProtocolVersion::kLegacy;
+  // Determine the protocol version of the chunk depending on field presence in
+  // the serialized message.
+  chunk.protocol_version_ = ProtocolVersion::kUnknown;
 
   // Some older versions of the protocol set the deprecated pending_bytes field
   // in their chunks. The newer transfer handling code does not process this
@@ -88,8 +103,12 @@
 
       case ProtoChunk::Fields::SESSION_ID:
         // The existence of a session_id field indicates that a newer protocol
-        // is running.
-        chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+        // is running. Update the deduced protocol unless it was explicitly
+        // specified.
+        if (chunk.protocol_version_ == ProtocolVersion::kUnknown) {
+          chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+        }
+
         PW_TRY(decoder.ReadUint32(&chunk.session_id_));
         break;
 
@@ -141,16 +160,29 @@
       case ProtoChunk::Fields::RESOURCE_ID:
         PW_TRY(decoder.ReadUint32(&value));
         chunk.set_resource_id(value);
+        break;
 
-        // The existence of a resource_id field indicates that a newer protocol
-        // is running.
-        chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
+      case ProtoChunk::Fields::PROTOCOL_VERSION:
+        // The protocol_version field is added as part of the initial handshake
+        // starting from version 2. If provided, it should override any deduced
+        // protocol version.
+        PW_TRY(decoder.ReadUint32(&value));
+        if (!ValidProtocolVersion(value)) {
+          return Status::DataLoss();
+        }
+        chunk.protocol_version_ = static_cast<ProtocolVersion>(value);
         break;
 
         // Silently ignore any unrecognized fields.
     }
   }
 
+  if (chunk.protocol_version_ == ProtocolVersion::kUnknown) {
+    // If no fields in the chunk specified its protocol version, assume it is a
+    // legacy chunk.
+    chunk.protocol_version_ = ProtocolVersion::kLegacy;
+  }
+
   if (pending_bytes != 0) {
     // Compute window_end_offset if it isn't explicitly provided (in older
     // protocol versions).
@@ -186,6 +218,13 @@
     }
   }
 
+  // During the initial handshake, the chunk's configured protocol version is
+  // explicitly serialized to the wire.
+  if (IsInitialHandshakeChunk()) {
+    encoder.WriteProtocolVersion(static_cast<uint32_t>(protocol_version_))
+        .IgnoreError();
+  }
+
   if (type_.has_value()) {
     encoder.WriteType(static_cast<ProtoChunk::Type>(type_.value()))
         .IgnoreError();
@@ -197,8 +236,13 @@
 
   // Encode additional fields from the legacy protocol.
   if (ShouldEncodeLegacyFields()) {
-    // The legacy protocol uses the transfer_id field instead of session_id.
-    encoder.WriteTransferId(session_id_).IgnoreError();
+    // The legacy protocol uses the transfer_id field instead of session_id or
+    // resource_id.
+    if (resource_id_.has_value()) {
+      encoder.WriteTransferId(resource_id_.value()).IgnoreError();
+    } else {
+      encoder.WriteTransferId(session_id_).IgnoreError();
+    }
 
     // In the legacy protocol, the pending_bytes field must be set alongside
     // window_end_offset, as some transfer implementations require it.
@@ -241,11 +285,22 @@
     }
 
     if (ShouldEncodeLegacyFields()) {
-      size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
-                                          session_id_);
+      if (resource_id_.has_value()) {
+        size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
+                                            resource_id_.value());
+      } else {
+        size += protobuf::SizeOfVarintField(ProtoChunk::Fields::TRANSFER_ID,
+                                            session_id_);
+      }
     }
   }
 
+  if (IsInitialHandshakeChunk()) {
+    size +=
+        protobuf::SizeOfVarintField(ProtoChunk::Fields::PROTOCOL_VERSION,
+                                    static_cast<uint32_t>(protocol_version_));
+  }
+
   if (protocol_version_ >= ProtocolVersion::kVersionTwo) {
     if (resource_id_.has_value()) {
       size += protobuf::SizeOfVarintField(ProtoChunk::Fields::RESOURCE_ID,
diff --git a/pw_transfer/client.cc b/pw_transfer/client.cc
index c3e2d11..85181f3 100644
--- a/pw_transfer/client.cc
+++ b/pw_transfer/client.cc
@@ -23,8 +23,10 @@
 Status Client::Read(uint32_t resource_id,
                     stream::Writer& output,
                     CompletionFunc&& on_completion,
-                    chrono::SystemClock::duration timeout) {
-  if (on_completion == nullptr) {
+                    chrono::SystemClock::duration timeout,
+                    ProtocolVersion protocol_version) {
+  if (on_completion == nullptr ||
+      protocol_version == ProtocolVersion::kUnknown) {
     return Status::InvalidArgument();
   }
 
@@ -40,10 +42,9 @@
     has_read_stream_ = true;
   }
 
-  // TODO(frolv): Only send the resource ID. The server should assign a session.
   transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
-                                       /*session_id=*/resource_id,
-                                       /*resource_id=*/resource_id,
+                                       protocol_version,
+                                       resource_id,
                                        &output,
                                        max_parameters_,
                                        std::move(on_completion),
@@ -55,8 +56,10 @@
 Status Client::Write(uint32_t resource_id,
                      stream::Reader& input,
                      CompletionFunc&& on_completion,
-                     chrono::SystemClock::duration timeout) {
-  if (on_completion == nullptr) {
+                     chrono::SystemClock::duration timeout,
+                     ProtocolVersion protocol_version) {
+  if (on_completion == nullptr ||
+      protocol_version == ProtocolVersion::kUnknown) {
     return Status::InvalidArgument();
   }
 
@@ -72,10 +75,9 @@
     has_write_stream_ = true;
   }
 
-  // TODO(frolv): Only send the resource ID. The server should assign a session.
   transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
-                                       /*session_id=*/resource_id,
-                                       /*resource_id=*/resource_id,
+                                       protocol_version,
+                                       resource_id,
                                        &input,
                                        max_parameters_,
                                        std::move(on_completion),
diff --git a/pw_transfer/client_test.cc b/pw_transfer/client_test.cc
index 01db655..074bdda 100644
--- a/pw_transfer/client_test.cc
+++ b/pw_transfer/client_test.cc
@@ -30,7 +30,6 @@
 namespace {
 
 using internal::Chunk;
-using internal::ProtocolVersion;
 using pw_rpc::raw::Transfer;
 
 using namespace std::chrono_literals;
@@ -90,10 +89,10 @@
   EXPECT_EQ(c0.session_id(), 3u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(3)
                       .set_offset(0)
                       .set_payload(kData32)
@@ -133,11 +132,11 @@
   EXPECT_EQ(c0.session_id(), 4u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   constexpr ConstByteSpan data(kData32);
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(4)
                       .set_offset(0)
                       .set_payload(data.first(16))));
@@ -146,7 +145,7 @@
   ASSERT_EQ(payloads.size(), 1u);
 
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(4)
                       .set_offset(16)
                       .set_payload(data.subspan(16))
@@ -176,7 +175,7 @@
   transfer_thread_.WaitUntilEventIsProcessed();
 
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(3)
                       .set_offset(0)
                       .set_payload(kData32)
@@ -193,7 +192,7 @@
   transfer_thread_.WaitUntilEventIsProcessed();
 
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(3)
                       .set_offset(0)
                       .set_payload(kData32)
@@ -222,7 +221,7 @@
   EXPECT_EQ(c0.session_id(), 5u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 32u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 }
 
 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
@@ -239,7 +238,7 @@
   EXPECT_EQ(c0.session_id(), 5u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 16u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 }
 
 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
@@ -265,7 +264,7 @@
 
   constexpr ConstByteSpan data(kData64);
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(6)
                       .set_offset(0)
                       .set_payload(data.first(32))));
@@ -281,7 +280,7 @@
   ASSERT_EQ(c1.window_end_offset(), 64u);
 
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(6)
                       .set_offset(32)
                       .set_payload(data.subspan(32))
@@ -322,7 +321,7 @@
 
   constexpr ConstByteSpan data(kData32);
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(data.first(16))));
@@ -333,7 +332,7 @@
 
   // Send a chunk with an incorrect offset. The client should resend parameters.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(8)  // wrong!
                       .set_payload(data.subspan(16))
@@ -350,7 +349,7 @@
 
   // Send the correct chunk, completing the transfer.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(16)
                       .set_payload(data.subspan(16))
@@ -394,21 +393,21 @@
 
   // pending_bytes == 32
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(8)
                       .set_offset(0)
                       .set_payload(data.first(16))));
 
   // pending_bytes == 16
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(8)
                       .set_offset(16)
                       .set_payload(data.subspan(16, 8))));
 
   // pending_bytes == 8, send 16 instead.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(8)
                       .set_offset(24)
                       .set_payload(data.subspan(24, 16))));
@@ -480,7 +479,7 @@
 
   // Send the first 8 bytes of the transfer.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(10)
                       .set_offset(0)
                       .set_payload(data.first(8))));
@@ -488,7 +487,7 @@
   // Skip offset 8, send the rest starting from 16.
   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
     context_.server().SendServerStream<Transfer::Read>(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                         .set_session_id(10)
                         .set_offset(offset)
                         .set_payload(data.subspan(offset, 8))));
@@ -506,7 +505,7 @@
 
   // Send the remaining data to complete the transfer.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(10)
                       .set_offset(8)
                       .set_payload(data.subspan(8))
@@ -548,7 +547,7 @@
 
   // Send the first 8 bytes of the transfer.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(11)
                       .set_offset(0)
                       .set_payload(data.first(8))));
@@ -556,7 +555,7 @@
   // Skip offset 8, send the rest starting from 16.
   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
     context_.server().SendServerStream<Transfer::Read>(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                         .set_session_id(11)
                         .set_offset(offset)
                         .set_payload(data.subspan(offset, 8))));
@@ -567,11 +566,10 @@
   // dropped packet.
   ASSERT_EQ(payloads.size(), 2u);
 
-  const Chunk last_chunk =
-      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
-          .set_session_id(11)
-          .set_offset(24)
-          .set_payload(data.subspan(24));
+  const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+                               .set_session_id(11)
+                               .set_offset(24)
+                               .set_payload(data.subspan(24));
 
   // Re-send the final chunk of the block.
   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
@@ -596,7 +594,7 @@
 
   // Finish the transfer normally.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(11)
                       .set_offset(8)
                       .set_payload(data.subspan(8))
@@ -639,7 +637,7 @@
   EXPECT_EQ(c0.session_id(), 12u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Wait for the timeout to expire without doing anything. The client should
   // resend its initial parameters chunk.
@@ -650,14 +648,14 @@
   EXPECT_EQ(c.session_id(), 12u);
   EXPECT_EQ(c.offset(), 0u);
   EXPECT_EQ(c.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Transfer has not yet completed.
   EXPECT_EQ(transfer_status, Status::Unknown());
 
   // Finish the transfer following the timeout.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(12)
                       .set_offset(0)
                       .set_payload(kData32)
@@ -696,13 +694,13 @@
   EXPECT_EQ(c0.session_id(), 13u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   constexpr ConstByteSpan data(kData32);
 
   // Send some data, but not everything.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(13)
                       .set_offset(0)
                       .set_payload(data.first(16))));
@@ -726,7 +724,7 @@
 
   // Send the rest of the data, finishing the transfer.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(13)
                       .set_offset(16)
                       .set_payload(data.subspan(16))
@@ -765,7 +763,7 @@
   EXPECT_EQ(c0.session_id(), 14u);
   EXPECT_EQ(c0.offset(), 0u);
   EXPECT_EQ(c0.window_end_offset(), 64u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
     // Wait for the timeout to expire without doing anything. The client should
@@ -789,7 +787,7 @@
 
   Chunk c4 = DecodeChunk(payloads.back());
   EXPECT_EQ(c4.session_id(), 14u);
-  EXPECT_EQ(c4.type(), Chunk::Type::kTransferCompletion);
+  EXPECT_EQ(c4.type(), Chunk::Type::kCompletion);
   ASSERT_TRUE(c4.status().has_value());
   EXPECT_EQ(c4.status().value(), Status::DeadlineExceeded());
 
@@ -842,7 +840,7 @@
 
   // Send some data.
   context_.server().SendServerStream<Transfer::Read>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(14)
                       .set_offset(0)
                       .set_payload(data.first(16))));
@@ -938,7 +936,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 3u);
   EXPECT_EQ(c0.resource_id(), 3u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send transfer parameters. Client should send a data chunk and the final
   // chunk.
@@ -995,7 +993,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 4u);
   EXPECT_EQ(c0.resource_id(), 4u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send transfer parameters with a chunk size smaller than the data.
 
@@ -1062,7 +1060,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 5u);
   EXPECT_EQ(c0.resource_id(), 5u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send transfer parameters with a nonzero offset, requesting a seek.
   // Client should send a data chunk and the final chunk.
@@ -1142,7 +1140,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 6u);
   EXPECT_EQ(c0.resource_id(), 6u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send transfer parameters with a nonzero offset, requesting a seek.
   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1158,7 +1156,7 @@
 
   Chunk c1 = DecodeChunk(payloads[1]);
   EXPECT_EQ(c1.session_id(), 6u);
-  EXPECT_EQ(c1.type(), Chunk::Type::kTransferCompletion);
+  EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
   ASSERT_TRUE(c1.status().has_value());
   EXPECT_EQ(c1.status().value(), Status::Unimplemented());
 
@@ -1184,7 +1182,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 7u);
   EXPECT_EQ(c0.resource_id(), 7u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send an error from the server.
   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1215,7 +1213,7 @@
   Chunk c0 = DecodeChunk(payloads[0]);
   EXPECT_EQ(c0.session_id(), 9u);
   EXPECT_EQ(c0.resource_id(), 9u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send an invalid transfer parameters chunk with 0 pending bytes.
   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
@@ -1258,7 +1256,7 @@
   Chunk c0 = DecodeChunk(payloads.back());
   EXPECT_EQ(c0.session_id(), 10u);
   EXPECT_EQ(c0.resource_id(), 10u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Wait for the timeout to expire without doing anything. The client should
   // resend the initial transmit chunk.
@@ -1268,7 +1266,7 @@
   Chunk c = DecodeChunk(payloads.back());
   EXPECT_EQ(c.session_id(), 10u);
   EXPECT_EQ(c.resource_id(), 10u);
-  EXPECT_EQ(c.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c.type(), Chunk::Type::kStart);
 
   // Transfer has not yet completed.
   EXPECT_EQ(transfer_status, Status::Unknown());
@@ -1299,7 +1297,7 @@
   Chunk c0 = DecodeChunk(payloads.back());
   EXPECT_EQ(c0.session_id(), 11u);
   EXPECT_EQ(c0.resource_id(), 11u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send the first parameters chunk.
   rpc::test::WaitForPackets(context_.output(), 2, [this] {
@@ -1372,7 +1370,7 @@
   Chunk c0 = DecodeChunk(payloads.back());
   EXPECT_EQ(c0.session_id(), 12u);
   EXPECT_EQ(c0.resource_id(), 12u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send the first parameters chunk, requesting all the data. The client should
   // respond with one data chunk and a remaining_bytes = 0 chunk.
@@ -1456,7 +1454,7 @@
   Chunk c0 = DecodeChunk(payloads.back());
   EXPECT_EQ(c0.session_id(), 13u);
   EXPECT_EQ(c0.resource_id(), 13u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
     // Wait for the timeout to expire without doing anything. The client should
@@ -1467,7 +1465,7 @@
     Chunk c = DecodeChunk(payloads.back());
     EXPECT_EQ(c.session_id(), 13u);
     EXPECT_EQ(c.resource_id(), 13u);
-    EXPECT_EQ(c.type(), Chunk::Type::kTransferStart);
+    EXPECT_EQ(c.type(), Chunk::Type::kStart);
 
     // Transfer has not yet completed.
     EXPECT_EQ(transfer_status, Status::Unknown());
@@ -1516,7 +1514,7 @@
   Chunk c0 = DecodeChunk(payloads.back());
   EXPECT_EQ(c0.session_id(), 14u);
   EXPECT_EQ(c0.resource_id(), 14u);
-  EXPECT_EQ(c0.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(c0.type(), Chunk::Type::kStart);
 
   // Send the first parameters chunk.
   rpc::test::WaitForPackets(context_.output(), 2, [this] {
@@ -1555,6 +1553,7 @@
   ASSERT_EQ(payloads.size(), 4u);
 
   Chunk c3 = DecodeChunk(payloads[3]);
+  EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
   EXPECT_EQ(c3.session_id(), 14u);
   ASSERT_TRUE(c3.status().has_value());
   EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
@@ -1583,7 +1582,7 @@
   Chunk chunk = DecodeChunk(payloads.back());
   EXPECT_EQ(chunk.session_id(), 15u);
   EXPECT_EQ(chunk.resource_id(), 15u);
-  EXPECT_EQ(chunk.type(), Chunk::Type::kTransferStart);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
 
   client_.CancelTransfer(15);
   transfer_thread_.WaitUntilEventIsProcessed();
@@ -1592,11 +1591,725 @@
   ASSERT_EQ(payloads.size(), 2u);
   chunk = DecodeChunk(payloads.back());
   EXPECT_EQ(chunk.session_id(), 15u);
-  ASSERT_EQ(chunk.type(), Chunk::Type::kTransferCompletion);
+  ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
   EXPECT_EQ(chunk.status().value(), Status::Cancelled());
 
   EXPECT_EQ(transfer_status, Status::Cancelled());
 }
 
+TEST_F(ReadTransfer, Version2_SingleChunk) {
+  stream::MemoryWriterBuffer<64> writer;
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Read(
+                3,
+                writer,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Initial chunk of the transfer is sent. This chunk should contain all the
+  // fields from both legacy and version 2 protocols for backwards
+  // compatibility.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Read>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads[0]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // The server responds with a START_ACK, continuing the version 2 handshake
+  // and assigning a session_id to the transfer.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(29)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 2u);
+
+  // Client should accept the session_id with a START_ACK_CONFIRMATION,
+  // additionally containing the initial parameters for the read transfer.
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 29u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // Send all the transfer data. Client should accept it and complete the
+  // transfer.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(29)
+                      .set_offset(0)
+                      .set_payload(kData32)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.session_id(), 29u);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_EQ(transfer_status, OkStatus());
+  EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+            0);
+}
+
+TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
+  stream::MemoryWriterBuffer<64> writer;
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Read(
+                3,
+                writer,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Initial chunk of the transfer is sent. This chunk should contain all the
+  // fields from both legacy and version 2 protocols for backwards
+  // compatibility.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Read>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads[0]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // Instead of a START_ACK to continue the handshake, the server responds with
+  // an immediate data chunk, indicating that it is running the legacy protocol
+  // version. Client should revert to legacy, using the resource_id of 3 as the
+  // session_id, and complete the transfer.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+                      .set_session_id(3)
+                      .set_offset(0)
+                      .set_payload(kData32)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 2u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_EQ(transfer_status, OkStatus());
+  EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+            0);
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
+  stream::MemoryWriterBuffer<64> writer;
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Read(
+                3,
+                writer,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Initial chunk of the transfer is sent. This chunk should contain all the
+  // fields from both legacy and version 2 protocols for backwards
+  // compatibility.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Read>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // Wait for the timeout to expire without doing anything. The client should
+  // resend the initial chunk.
+  transfer_thread_.SimulateClientTimeout(3);
+  ASSERT_EQ(payloads.size(), 2u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // This time, the server responds, continuing the handshake and transfer.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(31)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 31u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(31)
+                      .set_offset(0)
+                      .set_payload(kData32)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 4u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.session_id(), 31u);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_EQ(transfer_status, OkStatus());
+  EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+            0);
+}
+
+TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
+  stream::MemoryWriterBuffer<64> writer;
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Read(
+                3,
+                writer,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Initial chunk of the transfer is sent. This chunk should contain all the
+  // fields from both legacy and version 2 protocols for backwards
+  // compatibility.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Read>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // The server responds with a START_ACK, continuing the version 2 handshake
+  // and assigning a session_id to the transfer.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(33)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 2u);
+
+  // Client should accept the session_id with a START_ACK_CONFIRMATION,
+  // additionally containing the initial parameters for the read transfer.
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // Wait for the timeout to expire without doing anything. The client should
+  // resend the confirmation chunk.
+  transfer_thread_.SimulateClientTimeout(33);
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // The server responds and the transfer should continue normally.
+  context_.server().SendServerStream<Transfer::Read>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(33)
+                      .set_offset(0)
+                      .set_payload(kData32)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 4u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_EQ(transfer_status, OkStatus());
+  EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
+            0);
+}
+
+TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
+  stream::MemoryWriterBuffer<64> writer;
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Read(
+                3,
+                writer,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Initial chunk of the transfer is sent. This chunk should contain all the
+  // fields from both legacy and version 2 protocols for backwards
+  // compatibility.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Read>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 64u);
+  EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
+
+  // The server responds to the start request with an error.
+  context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
+      ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unauthenticated());
+}
+
+TEST_F(WriteTransfer, Version2_SingleChunk) {
+  stream::MemoryReader reader(kData32);
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Write(
+                3,
+                reader,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // The client begins by sending the ID of the resource to transfer.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Write>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // The server responds with a START_ACK, continuing the version 2 handshake
+  // and assigning a session_id to the transfer.
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(29)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 2u);
+
+  // Client should accept the session_id with a START_ACK_CONFIRMATION.
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 29u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+
+  // The server can then begin the data transfer by sending its transfer
+  // parameters. Client should respond with a data chunk and the final chunk.
+  rpc::test::WaitForPackets(context_.output(), 2, [this] {
+    context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+            .set_session_id(29)
+            .set_offset(0)
+            .set_window_end_offset(64)
+            .set_max_chunk_size_bytes(32)));
+  });
+
+  ASSERT_EQ(payloads.size(), 4u);
+
+  chunk = DecodeChunk(payloads[2]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 29u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_TRUE(chunk.has_payload());
+  EXPECT_EQ(std::memcmp(
+                chunk.payload().data(), kData32.data(), chunk.payload().size()),
+            0);
+
+  chunk = DecodeChunk(payloads[3]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 29u);
+  ASSERT_TRUE(chunk.remaining_bytes().has_value());
+  EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  // Send the final status chunk to complete the transfer.
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 4u);
+  EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
+  stream::MemoryReader reader(kData32);
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Write(
+                3,
+                reader,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // The client begins by sending the ID of the resource to transfer.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Write>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Instead of continuing the handshake with a START_ACK, the server
+  // immediately sends parameters, indicating that it only supports the legacy
+  // protocol. Client should switch over to legacy and continue the transfer.
+  rpc::test::WaitForPackets(context_.output(), 2, [this] {
+    context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+        Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+            .set_session_id(3)
+            .set_offset(0)
+            .set_window_end_offset(64)
+            .set_max_chunk_size_bytes(32)));
+  });
+
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads[1]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_TRUE(chunk.has_payload());
+  EXPECT_EQ(std::memcmp(
+                chunk.payload().data(), kData32.data(), chunk.payload().size()),
+            0);
+
+  chunk = DecodeChunk(payloads[2]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  ASSERT_TRUE(chunk.remaining_bytes().has_value());
+  EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  // Send the final status chunk to complete the transfer.
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 3u);
+  EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
+  stream::MemoryReader reader(kData32);
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Write(
+                3,
+                reader,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // The client begins by sending the ID of the resource to transfer.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Write>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Time out waiting for a server response. The client should resend the
+  // initial packet.
+  transfer_thread_.SimulateClientTimeout(3);
+  ASSERT_EQ(payloads.size(), 2u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // This time, respond with the correct continuation packet. The transfer
+  // should resume and complete normally.
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(31)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 31u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+
+  rpc::test::WaitForPackets(context_.output(), 2, [this] {
+    context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+            .set_session_id(31)
+            .set_offset(0)
+            .set_window_end_offset(64)
+            .set_max_chunk_size_bytes(32)));
+  });
+
+  ASSERT_EQ(payloads.size(), 5u);
+
+  chunk = DecodeChunk(payloads[3]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 31u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_TRUE(chunk.has_payload());
+  EXPECT_EQ(std::memcmp(
+                chunk.payload().data(), kData32.data(), chunk.payload().size()),
+            0);
+
+  chunk = DecodeChunk(payloads[4]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 31u);
+  ASSERT_TRUE(chunk.remaining_bytes().has_value());
+  EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 5u);
+  EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
+  stream::MemoryReader reader(kData32);
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Write(
+                3,
+                reader,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // The client begins by sending the ID of the resource to transfer.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Write>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // The server responds with a START_ACK, continuing the version 2 handshake
+  // and assigning a session_id to the transfer.
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
+                      .set_session_id(33)
+                      .set_resource_id(3)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(payloads.size(), 2u);
+
+  // Client should accept the session_id with a START_ACK_CONFIRMATION.
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+
+  // Time out waiting for a server response. The client should resend the
+  // initial packet.
+  transfer_thread_.SimulateClientTimeout(33);
+  ASSERT_EQ(payloads.size(), 3u);
+
+  chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_FALSE(chunk.resource_id().has_value());
+
+  // This time, respond with the first transfer parameters chunk. The transfer
+  // should resume and complete normally.
+  rpc::test::WaitForPackets(context_.output(), 2, [this] {
+    context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
+            .set_session_id(33)
+            .set_offset(0)
+            .set_window_end_offset(64)
+            .set_max_chunk_size_bytes(32)));
+  });
+
+  ASSERT_EQ(payloads.size(), 5u);
+
+  chunk = DecodeChunk(payloads[3]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_TRUE(chunk.has_payload());
+  EXPECT_EQ(std::memcmp(
+                chunk.payload().data(), kData32.data(), chunk.payload().size()),
+            0);
+
+  chunk = DecodeChunk(payloads[4]);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kData);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 33u);
+  ASSERT_TRUE(chunk.remaining_bytes().has_value());
+  EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
+
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  context_.server().SendServerStream<Transfer::Write>(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 5u);
+  EXPECT_EQ(transfer_status, OkStatus());
+}
+
+TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
+  stream::MemoryReader reader(kData32);
+  Status transfer_status = Status::Unknown();
+
+  ASSERT_EQ(OkStatus(),
+            client_.Write(
+                3,
+                reader,
+                [&transfer_status](Status status) { transfer_status = status; },
+                cfg::kDefaultChunkTimeout,
+                ProtocolVersion::kVersionTwo));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // The client begins by sending the ID of the resource to transfer.
+  rpc::PayloadsView payloads =
+      context_.output().payloads<Transfer::Write>(context_.channel().id());
+  ASSERT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::Unknown());
+
+  Chunk chunk = DecodeChunk(payloads.back());
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.session_id(), 3u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // The server responds to the start request with an error.
+  context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
+      Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_EQ(payloads.size(), 1u);
+  EXPECT_EQ(transfer_status, Status::NotFound());
+}
+
 }  // namespace
 }  // namespace pw::transfer::test
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index dae00cc..68cd9f4 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -42,6 +42,11 @@
         InitiateTransferAsClient();
       } else {
         StartTransferAsServer(event.new_transfer);
+
+        // TODO(frolv): This should probably be restructured.
+        HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
+                          .data = event.new_transfer.raw_chunk_data,
+                          .size = event.new_transfer.raw_chunk_size});
       }
       return;
     }
@@ -83,19 +88,39 @@
 
   SetTimeout(chunk_timeout_);
 
-  PW_LOG_INFO("Starting transfer %u", id_for_log());
+  PW_LOG_INFO("Starting transfer for resource %u",
+              static_cast<unsigned>(resource_id_));
+
+  // Receive transfers should prepare their initial parameters to be send in the
+  // initial chunk.
   if (type() == TransferType::kReceive) {
-    // A receiver begins a new transfer with a parameters chunk telling the
-    // transmitter what to send.
-    UpdateAndSendTransferParameters(TransmitAction::kBegin);
-  } else {
-    SendInitialTransmitChunk();
+    UpdateTransferParameters();
   }
 
-  LogTransferConfiguration();
+  if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
+    // Legacy transfers go straight into the data transfer phase without a
+    // handshake.
+    if (type() == TransferType::kReceive) {
+      SendTransferParameters(TransmitAction::kBegin);
+    } else {
+      SendInitialTransmitChunk();
+    }
 
-  // Don't send an error packet. If the transfer failed to start, then there's
-  // nothing to tell the server about.
+    LogTransferConfiguration();
+    return;
+  }
+
+  // In newer protocol versions, begin the initial transfer handshake.
+  Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
+  start_chunk.set_resource_id(resource_id_);
+
+  if (type() == TransferType::kReceive) {
+    // Parameters should still be set on the initial chunk for backwards
+    // compatibility if the server only supports the legacy protocol.
+    SetTransferParameters(start_chunk);
+  }
+
+  EncodeAndSendChunk(start_chunk);
 }
 
 void Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
@@ -128,22 +153,46 @@
 void Context::SendInitialTransmitChunk() {
   // A transmitter begins a transfer by sending the ID of the resource to which
   // it wishes to write.
-  //
-  // TODO(frolv): Session ID should not be set here, but assigned by the server
-  // in an initial handshake.
-  Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart);
+  Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
   chunk.set_session_id(session_id_);
-  chunk.set_resource_id(session_id_);
 
   EncodeAndSendChunk(chunk);
 }
 
+void Context::UpdateTransferParameters() {
+  size_t pending_bytes =
+      std::min(max_parameters_->pending_bytes(),
+               static_cast<uint32_t>(writer().ConservativeWriteLimit()));
+
+  window_size_ = pending_bytes;
+  window_end_offset_ = offset_ + pending_bytes;
+
+  max_chunk_size_bytes_ = MaxWriteChunkSize(
+      max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
+}
+
+void Context::SetTransferParameters(Chunk& parameters) {
+  parameters.set_window_end_offset(window_end_offset_)
+      .set_max_chunk_size_bytes(max_chunk_size_bytes_)
+      .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
+      .set_offset(offset_);
+}
+
+void Context::UpdateAndSendTransferParameters(TransmitAction action) {
+  UpdateTransferParameters();
+
+  PW_LOG_INFO("Transfer rate: %u B/s",
+              static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
+
+  return SendTransferParameters(action);
+}
+
 void Context::SendTransferParameters(TransmitAction action) {
   Chunk::Type type = Chunk::Type::kParametersRetransmit;
 
   switch (action) {
     case TransmitAction::kBegin:
-      type = Chunk::Type::kTransferStart;
+      type = Chunk::Type::kStart;
       break;
     case TransmitAction::kRetransmit:
       type = Chunk::Type::kParametersRetransmit;
@@ -153,12 +202,9 @@
       break;
   }
 
-  Chunk parameters(ProtocolVersion::kLegacy, type);
-  parameters.set_session_id(session_id_)
-      .set_window_end_offset(window_end_offset_)
-      .set_max_chunk_size_bytes(max_chunk_size_bytes_)
-      .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
-      .set_offset(offset_);
+  Chunk parameters(configured_protocol_version_, type);
+  parameters.set_session_id(session_id_);
+  SetTransferParameters(parameters);
 
   PW_LOG_DEBUG(
       "Transfer %u sending transfer parameters: "
@@ -192,34 +238,37 @@
     }
     return;
   }
-}
 
-void Context::UpdateAndSendTransferParameters(TransmitAction action) {
-  size_t pending_bytes =
-      std::min(max_parameters_->pending_bytes(),
-               static_cast<uint32_t>(writer().ConservativeWriteLimit()));
-
-  window_size_ = pending_bytes;
-  window_end_offset_ = offset_ + pending_bytes;
-
-  max_chunk_size_bytes_ = MaxWriteChunkSize(
-      max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
-
-  PW_LOG_INFO("Transfer rate: %u B/s",
-              static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
-
-  return SendTransferParameters(action);
+  last_chunk_sent_ = chunk.type();
 }
 
 void Context::Initialize(const NewTransferEvent& new_transfer) {
   PW_DCHECK(!active());
 
+  PW_DCHECK_INT_NE(new_transfer.protocol_version,
+                   ProtocolVersion::kUnknown,
+                   "Cannot start a transfer with an unknown protocol");
+
   session_id_ = new_transfer.session_id;
+  resource_id_ = new_transfer.resource_id;
+  desired_protocol_version_ = new_transfer.protocol_version;
+  configured_protocol_version_ = ProtocolVersion::kUnknown;
+
   flags_ = static_cast<uint8_t>(new_transfer.type);
   transfer_state_ = TransferState::kWaiting;
   retries_ = 0;
   max_retries_ = new_transfer.max_retries;
 
+  if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
+    // In a legacy transfer, there is no protocol negotiation stage.
+    // Automatically configure the context to run the legacy protocol and
+    // proceed to waiting for a chunk.
+    configured_protocol_version_ = ProtocolVersion::kLegacy;
+    transfer_state_ = TransferState::kWaiting;
+  } else {
+    transfer_state_ = TransferState::kInitiating;
+  }
+
   rpc_writer_ = new_transfer.rpc_writer;
   stream_ = new_transfer.stream;
 
@@ -231,6 +280,7 @@
   max_parameters_ = new_transfer.max_parameters;
   thread_ = new_transfer.transfer_thread;
 
+  last_chunk_sent_ = Chunk::Type::kStart;
   last_chunk_offset_ = 0;
   chunk_timeout_ = new_transfer.timeout;
   interchunk_delay_ = chrono::SystemClock::for_at_least(
@@ -241,8 +291,6 @@
 }
 
 void Context::HandleChunkEvent(const ChunkEvent& event) {
-  PW_DCHECK(event.session_id == session_id_);
-
   Result<Chunk> maybe_chunk =
       Chunk::Parse(ConstByteSpan(event.data, event.size));
   if (!maybe_chunk.ok()) {
@@ -254,7 +302,9 @@
   // Received some data. Reset the retry counter.
   retries_ = 0;
 
-  if (chunk.status().has_value()) {
+  if (chunk.IsTerminatingChunk()) {
+    // TODO(frolv): Handle the transfer completion handshake in version 2.
+
     if (active()) {
       Finish(chunk.status().value());
     } else {
@@ -272,6 +322,114 @@
   }
 }
 
+void Context::PerformInitialHandshake(const Chunk& chunk) {
+  switch (chunk.type()) {
+    // Initial packet sent from a client to a server.
+    case Chunk::Type::kStart: {
+      UpdateLocalProtocolConfigurationFromPeer(chunk);
+
+      // This cast is safe as we know we're running in a transfer server.
+      uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
+
+      Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
+      start_ack.set_session_id(session_id_).set_resource_id(resource_id);
+
+      EncodeAndSendChunk(start_ack);
+      break;
+    }
+
+    // Response packet sent from a server to a client. Contains the assigned
+    // session_id of the transfer.
+    case Chunk::Type::kStartAck: {
+      UpdateLocalProtocolConfigurationFromPeer(chunk);
+
+      // Accept the assigned session_id and tell the server that the transfer
+      // can begin.
+      session_id_ = chunk.session_id();
+      PW_LOG_DEBUG("Transfer for resource %u was assigned session ID %u",
+                   static_cast<unsigned>(resource_id_),
+                   static_cast<unsigned>(session_id_));
+
+      Chunk start_ack_confirmation(configured_protocol_version_,
+                                   Chunk::Type::kStartAckConfirmation);
+      start_ack_confirmation.set_session_id(session_id_);
+
+      if (type() == TransferType::kReceive) {
+        // In a receive transfer, tag the initial transfer parameters onto the
+        // confirmation chunk so that the server can immediately begin sending
+        // data.
+        UpdateTransferParameters();
+        SetTransferParameters(start_ack_confirmation);
+      }
+
+      set_transfer_state(TransferState::kWaiting);
+      EncodeAndSendChunk(start_ack_confirmation);
+      break;
+    }
+
+    // Confirmation sent by a client to a server of the configured transfer
+    // version and session ID. Completes the handshake and begins the actual
+    // data transfer.
+    case Chunk::Type::kStartAckConfirmation: {
+      set_transfer_state(TransferState::kWaiting);
+
+      if (type() == TransferType::kTransmit) {
+        HandleTransmitChunk(chunk);
+      } else {
+        HandleReceiveChunk(chunk);
+      }
+      break;
+    }
+
+    // If a non-handshake chunk is received during an INITIATING state, the
+    // transfer peer is running a legacy protocol version, which does not
+    // perform a handshake. End the handshake, revert to the legacy protocol,
+    // and process the chunk appropriately.
+    case Chunk::Type::kData:
+    case Chunk::Type::kParametersRetransmit:
+    case Chunk::Type::kParametersContinue:
+      // Update the local context's session ID in case it was expecting one to
+      // be assigned by the server.
+      session_id_ = chunk.session_id();
+
+      configured_protocol_version_ = ProtocolVersion::kLegacy;
+      set_transfer_state(TransferState::kWaiting);
+
+      PW_LOG_DEBUG(
+          "Transfer %u tried to start on protocol version %d, but peer only "
+          "supports legacy",
+          id_for_log(),
+          static_cast<int>(desired_protocol_version_));
+
+      if (type() == TransferType::kTransmit) {
+        HandleTransmitChunk(chunk);
+      } else {
+        HandleReceiveChunk(chunk);
+      }
+      break;
+
+    case Chunk::Type::kCompletion:
+    case Chunk::Type::kCompletionAck:
+      PW_CRASH(
+          "Transfer completion packets should be processed by "
+          "HandleChunkEvent()");
+      break;
+  }
+}
+
+void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
+  PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
+               static_cast<int>(desired_protocol_version_),
+               static_cast<int>(chunk.protocol_version()));
+
+  configured_protocol_version_ =
+      std::min(desired_protocol_version_, chunk.protocol_version());
+
+  PW_LOG_INFO("Transfer %u: using protocol version %d",
+              id_for_log(),
+              static_cast<int>(configured_protocol_version_));
+}
+
 void Context::HandleTransmitChunk(const Chunk& chunk) {
   switch (transfer_state_) {
     case TransferState::kInactive:
@@ -292,9 +450,24 @@
       SendFinalStatusChunk();
       return;
 
+    case TransferState::kInitiating:
+      PerformInitialHandshake(chunk);
+      return;
+
     case TransferState::kWaiting:
     case TransferState::kTransmitting:
-      HandleTransferParametersUpdate(chunk);
+      if (chunk.protocol_version() == configured_protocol_version_) {
+        HandleTransferParametersUpdate(chunk);
+      } else {
+        PW_LOG_ERROR(
+            "Transmit transfer %u was configured to use protocol version %d "
+            "but received a chunk with version %d",
+            id_for_log(),
+            static_cast<int>(configured_protocol_version_),
+            static_cast<int>(chunk.protocol_version()));
+        Finish(Status::Internal());
+      }
+
       if (transfer_state_ == TransferState::kCompleted) {
         SendFinalStatusChunk();
       }
@@ -363,7 +536,7 @@
 }
 
 void Context::TransmitNextChunk(bool retransmit_requested) {
-  Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData);
+  Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
   chunk.set_session_id(session_id_);
   chunk.set_offset(offset_);
 
@@ -440,6 +613,7 @@
     return;
   }
 
+  last_chunk_sent_ = chunk.type();
   flags_ |= kFlagsDataSent;
 
   if (offset_ == window_end_offset_) {
@@ -455,12 +629,29 @@
 }
 
 void Context::HandleReceiveChunk(const Chunk& chunk) {
+  if (transfer_state_ == TransferState::kInitiating) {
+    PerformInitialHandshake(chunk);
+    return;
+  }
+
+  if (chunk.protocol_version() != configured_protocol_version_) {
+    PW_LOG_ERROR(
+        "Receive transfer %u was configured to use protocol version %d "
+        "but received a chunk with version %d",
+        id_for_log(),
+        static_cast<int>(configured_protocol_version_),
+        static_cast<int>(chunk.protocol_version()));
+    Finish(Status::Internal());
+    SendFinalStatusChunk();
+    return;
+  }
+
   switch (transfer_state_) {
     case TransferState::kInactive:
-      PW_CRASH("Never should handle chunk while inactive");
-
     case TransferState::kTransmitting:
-      PW_CRASH("Receive transfer somehow entered TRANSMITTING state");
+    case TransferState::kInitiating:
+      PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
+               static_cast<int>(transfer_state_));
 
     case TransferState::kCompleted:
       // If the transfer has already completed and another chunk is received,
@@ -629,7 +820,7 @@
                status_.code());
 
   EncodeAndSendChunk(
-      Chunk::Final(ProtocolVersion::kLegacy, session_id_, status_));
+      Chunk::Final(configured_protocol_version_, session_id_, status_));
 }
 
 void Context::Finish(Status status) {
@@ -667,6 +858,7 @@
       TransmitNextChunk(/*retransmit_requested=*/false);
       break;
 
+    case TransferState::kInitiating:
     case TransferState::kWaiting:
     case TransferState::kRecovery:
       // A timeout occurring in a WAITING or RECOVERY state indicates that no
@@ -698,6 +890,12 @@
 
   ++retries_;
 
+  if (transfer_state_ == TransferState::kInitiating ||
+      last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
+    RetryHandshake();
+    return;
+  }
+
   if (type() == TransferType::kReceive) {
     // Resend the most recent transfer parameters.
     PW_LOG_DEBUG(
@@ -721,8 +919,8 @@
   // Otherwise, resend the most recent chunk. If the reader doesn't support
   // seeking, this isn't possible, so just terminate the transfer immediately.
   if (!reader().Seek(last_chunk_offset_).ok()) {
-    PW_LOG_ERROR("Transmit transfer %d timed out waiting for new parameters.",
-                 static_cast<unsigned>(session_id_));
+    PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
+                 id_for_log());
     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
     Finish(Status::DeadlineExceeded());
     return;
@@ -734,6 +932,43 @@
   TransmitNextChunk(/*retransmit_requested=*/false);
 }
 
+void Context::RetryHandshake() {
+  Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
+
+  switch (last_chunk_sent_) {
+    case Chunk::Type::kStart:
+      // No protocol version is yet configured at the time of sending the start
+      // chunk, so we use the client's desired version instead.
+      retry_chunk.set_protocol_version(desired_protocol_version_)
+          .set_resource_id(resource_id_);
+      if (type() == TransferType::kReceive) {
+        SetTransferParameters(retry_chunk);
+      }
+      break;
+
+    case Chunk::Type::kStartAck:
+      retry_chunk.set_session_id(session_id_)
+          .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
+      break;
+
+    case Chunk::Type::kStartAckConfirmation:
+      retry_chunk.set_session_id(session_id_);
+      if (type() == TransferType::kReceive) {
+        SetTransferParameters(retry_chunk);
+      }
+      break;
+
+    case Chunk::Type::kData:
+    case Chunk::Type::kParametersRetransmit:
+    case Chunk::Type::kParametersContinue:
+    case Chunk::Type::kCompletion:
+    case Chunk::Type::kCompletionAck:
+      PW_CRASH("Should not RetryHandshake() when not in handshake phase");
+  }
+
+  EncodeAndSendChunk(retry_chunk);
+}
+
 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
                                     uint32_t channel_id) const {
   // Start with the user-provided maximum chunk size, which should be the usable
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index b8672dc..fff63f0 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -3,6 +3,8 @@
 ===========
 pw_transfer
 ===========
+``pw_transfer`` is a reliable data transfer protocol which runs on top of
+Pigweed RPC.
 
 .. attention::
 
@@ -14,8 +16,83 @@
 
 C++
 ===
-The transfer service is defined and registered with an RPC server like any other
-RPC service.
+
+Transfer thread
+---------------
+To run transfers as either a client or server (or both), a dedicated thread is
+required. The transfer thread is used to process all transfer-related events
+safely. The same transfer thread can be shared by a transfer client and service
+running on the same system.
+
+.. note::
+
+   All user-defined transfer callbacks (i.e. the virtual interface of a
+   ``TransferHandler`` or completion function in a transfer client) will be
+   invoked from the transfer thread's context.
+
+In order to operate, a transfer thread requires two buffers:
+
+- The first is a *chunk buffer*. This is used to stage transfer packets received
+  by the RPC system to be processed by the transfer thread. It must be large
+  enough to store the largest possible chunk the system supports.
+
+- The second is an *encode buffer*. This is used by the transfer thread to
+  encode outgoing RPC packets. It is necessarily larger than the chunk buffer.
+  Typically, this is sized to the system's maximum transmission unit at the
+  transport layer.
+
+A transfer thread is created by instantiating a ``pw::transfer::Thread``. This
+class derives from ``pw::thread::ThreadCore``, allowing it to directly be used
+when creating a system thread. Refer to :ref:`module-pw_thread-thread-creation`
+for additional information.
+
+**Example thread configuration**
+
+.. code-block:: cpp
+
+   #include "pw_transfer/transfer_thread.h"
+
+   namespace {
+
+   // The maximum number of concurrent transfers the thread should support as
+   // either a client or a server. These can be set to 0 (if only using one or
+   // the other).
+   constexpr size_t kMaxConcurrentClientTransfers = 5;
+   constexpr size_t kMaxConcurrentServerTransfers = 3;
+
+   // The maximum payload size that can be transmitted by the system's
+   // transport stack. This would typically be defined within some transport
+   // header.
+   constexpr size_t kMaxTransmissionUnit = 512;
+
+   // The maximum amount of data that should be sent within a single transfer
+   // packet. By necessity, this should be less than the max transmission unit.
+   //
+   // pw_transfer requires some additional per-packet overhead, so the actual
+   // amount of data it sends may be lower than this.
+   constexpr size_t kMaxTransferChunkSizeBytes = 480;
+
+   // Buffers for storing and encoding chunks (see documentation above).
+   std::array<std::byte, kMaxTransferChunkSizeBytes> chunk_buffer;
+   std::array<std::byte, kMaxTransmissionUnit> encode_buffer;
+
+   pw::transfer::Thread<kMaxConcurrentClientTransfers,
+                        kMaxConcurrentServerTransfers>
+       transfer_thread(chunk_buffer, encode_buffer);
+
+   }  // namespace
+
+   // pw::transfer::TransferThread is the generic, non-templated version of the
+   // Thread class. A Thread can implicitly convert to a TransferThread.
+   pw::transfer::TransferThread& GetSystemTransferThread() {
+     return transfer_thread;
+   }
+
+
+Transfer server
+---------------
+``pw_transfer`` provides an RPC service for running transfers through an RPC
+server.
 
 To know how to read data from or write data to device, a ``TransferHandler``
 interface is defined (``pw_transfer/public/pw_transfer/handler.h``). Transfer
@@ -25,19 +102,22 @@
 or ``ReadWriteHandler`` as appropriate and override Prepare and Finalize methods
 if necessary.
 
-A transfer handler should be implemented and instantiated for each unique data
-transfer to or from a device. These handlers are then registered with the
-transfer service using their resource IDs.
+A transfer handler should be implemented and instantiated for each unique
+resource that can be transferred to or from a device. Each instantiated handler
+must have a globally-unique integer ID used to identify the resource.
 
-**Example**
+Handlers are registered with the transfer service. This may be done during
+system initialization (for static resources), or dynamically at runtime to
+support ephemeral transfer resources.
+
+**Example transfer handler implementation**
 
 .. code-block:: cpp
 
+  #include "pw_stream/memory_stream.h"
   #include "pw_transfer/transfer.h"
 
-  namespace {
-
-  // Simple transfer handler which reads data from an in-memory buffer.
+  // A simple transfer handler which reads data from an in-memory buffer.
   class SimpleBufferReadHandler : public pw::transfer::ReadOnlyHandler {
    public:
     SimpleReadTransfer(uint32_t resource_id, pw::ConstByteSpan data)
@@ -49,36 +129,110 @@
     pw::stream::MemoryReader reader_;
   };
 
-  // The maximum amount of data that can be sent in a single chunk, excluding
-  // transport layer overhead.
-  constexpr size_t kMaxChunkSizeBytes = 256;
+The transfer service is instantiated with a reference to the system's transfer
+thread and registered with the system's RPC server.
 
-  // In a write transfer, the maximum number of bytes to receive at one time,
+**Example transfer service initialization**
+
+.. code-block:: cpp
+
+  #include "pw_transfer/transfer.h"
+
+  namespace {
+
+  // In a write transfer, the maximum number of bytes to receive at one time
   // (potentially across multiple chunks), unless specified otherwise by the
   // transfer handler's stream::Writer.
   constexpr size_t kDefaultMaxBytesToReceive = 1024;
 
-  // Instantiate a static transfer service.
-  // The service requires a work queue, and a buffer to store data from a chunk.
-  // The helper class TransferServiceBuffer comes with a builtin buffer.
-  pw::transfer::TransferServiceBuffer<kMaxChunkSizeBytes> transfer_service(
-      GetSystemWorkQueue(), kDefaultMaxBytesToReceive);
+  pw::transfer::TransferService transfer_service(
+      GetSystemTransferThread(), kDefaultMaxBytesToReceive);
 
   // Instantiate a handler for the data to be transferred. The resource ID will
   // be used by the transfer client and server to identify the handler.
-  constexpr uint32_t kBufferResourceId = 1;
-  char buffer_to_transfer[256] = { /* ... */ };
-  SimpleBufferReadHandler buffer_handler(kBufferResourceId, buffer_to_transfer);
+  constexpr uint32_t kMagicBufferResourceId = 1;
+  char magic_buffer_to_transfer[256] = { /* ... */ };
+  SimpleBufferReadHandler magic_buffer_handler(
+      kMagicBufferResourceId, magic_buffer_to_transfer);
 
   }  // namespace
 
-  void InitTransfer() {
+  void InitTransferService() {
     // Register the handler with the transfer service, then the transfer service
     // with an RPC server.
-    transfer_service.RegisterHandler(buffer_handler);
+    transfer_service.RegisterHandler(magic_buffer_handler);
     GetSystemRpcServer().RegisterService(transfer_service);
   }
 
+Transfer client
+---------------
+``pw_transfer`` provides a transfer client capable of running transfers through
+an RPC client.
+
+.. note::
+
+   Currently, a transfer client is only capable of running transfers on a single
+   RPC channel. This may be expanded in the future.
+
+The transfer client provides the following two APIs for starting data transfers:
+
+.. cpp:function:: pw::Status pw::transfer::Client::Read(uint32_t resource_id, pw::stream::Writer& output, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+
+  Reads data from a transfer server to the specified ``pw::stream::Writer``.
+  Invokes the provided callback function with the overall status of the
+  transfer.
+
+  Due to the asynchronous nature of transfer operations, this function will only
+  return a non-OK status if it is called with bad arguments. Otherwise, it will
+  return OK and errors will be reported through the completion callback.
+
+.. cpp:function:: pw::Status pw::transfer::Client::Write(uint32_t resource_id, pw::stream::Reader& input, CompletionFunc&& on_completion, pw::chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout, pw::transfer::ProtocolVersion version = kDefaultProtocolVersion)
+
+  Writes data from a source ``pw::stream::Reader`` to a transfer server.
+  Invokes the provided callback function with the overall status of the
+  transfer.
+
+  Due to the asynchronous nature of transfer operations, this function will only
+  return a non-OK status if it is called with bad arguments. Otherwise, it will
+  return OK and errors will be reported through the completion callback.
+
+**Example client setup**
+
+.. code-block:: cpp
+
+   #include "pw_transfer/client.h"
+
+   namespace {
+
+   // RPC channel on which transfers should be run.
+   constexpr uint32_t kChannelId = 42;
+
+   pw::transfer::Client transfer_client(
+       GetSystemRpcClient(), kChannelId, GetSystemTransferThread());
+
+   }  // namespace
+
+   Status ReadMagicBufferSync(pw::ByteSpan sink) {
+     pw::stream::Writer writer(sink);
+
+     struct {
+       pw::sync::ThreadNotification notification;
+       pw::Status status;
+     } transfer_state;
+
+     transfer_client.Read(
+         kMagicBufferResourceId,
+         writer,
+         [&transfer_state](pw::Status status) {
+           transfer_state.status = status;
+           transfer_state.notification.release();
+         });
+
+     // Block until the transfer completes.
+     transfer_state.notification.acquire();
+     return transfer_state.status;
+   }
+
 Module Configuration Options
 ----------------------------
 The following configurations can be adjusted via compile-time configuration of
diff --git a/pw_transfer/public/pw_transfer/client.h b/pw_transfer/public/pw_transfer/client.h
index 44cd638..50a6bf9 100644
--- a/pw_transfer/public/pw_transfer/client.h
+++ b/pw_transfer/public/pw_transfer/client.h
@@ -71,11 +71,11 @@
   // the server is written to the provided writer. Returns OK if the transfer is
   // successfully started. When the transfer finishes (successfully or not), the
   // completion callback is invoked with the overall status.
-  Status Read(
-      uint32_t resource_id,
-      stream::Writer& output,
-      CompletionFunc&& on_completion,
-      chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout);
+  Status Read(uint32_t resource_id,
+              stream::Writer& output,
+              CompletionFunc&& on_completion,
+              chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
+              ProtocolVersion version = kDefaultProtocolVersion);
 
   // Begins a new write transfer for the given resource ID. Data from the
   // provided reader is sent to the server. When the transfer finishes
@@ -85,7 +85,8 @@
       uint32_t resource_id,
       stream::Reader& input,
       CompletionFunc&& on_completion,
-      chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout);
+      chrono::SystemClock::duration timeout = cfg::kDefaultChunkTimeout,
+      ProtocolVersion version = kDefaultProtocolVersion);
 
   // Terminates an ongoing transfer for the specified resource ID.
   //
@@ -103,6 +104,11 @@
   }
 
  private:
+  // TODO(frolv): This should be switched to default to kLatest once
+  // implementation of protocol v2 is complete.
+  static constexpr ProtocolVersion kDefaultProtocolVersion =
+      ProtocolVersion::kLegacy;
+
   using Transfer = pw_rpc::raw::Transfer;
 
   void OnRpcError(Status status, internal::TransferType type);
diff --git a/pw_transfer/public/pw_transfer/internal/chunk.h b/pw_transfer/public/pw_transfer/internal/chunk.h
index 72ce600..82f3cbf 100644
--- a/pw_transfer/public/pw_transfer/internal/chunk.h
+++ b/pw_transfer/public/pw_transfer/internal/chunk.h
@@ -24,12 +24,14 @@
 class Chunk {
  public:
   enum class Type {
-    kTransferData = 0,
-    kTransferStart = 1,
+    kData = 0,
+    kStart = 1,
     kParametersRetransmit = 2,
     kParametersContinue = 3,
-    kTransferCompletion = 4,
-    kTransferCompletionAck = 5,  // Currently unused.
+    kCompletion = 4,
+    kCompletionAck = 5,  // Currently unused.
+    kStartAck = 6,
+    kStartAckConfirmation = 7,
   };
 
   // Constructs a new chunk with the given transfer protocol version. All fields
@@ -40,14 +42,16 @@
   // Parses a chunk from a serialized protobuf message.
   static Result<Chunk> Parse(ConstByteSpan message);
 
-  // Partially decodes a transfer chunk to find its session ID field.
-  static Result<uint32_t> ExtractSessionId(ConstByteSpan message);
+  // Partially decodes a transfer chunk to find its transfer context identifier.
+  // Depending on the protocol version and type of chunk, this may be one of
+  // several proto fields.
+  static Result<uint32_t> ExtractIdentifier(ConstByteSpan message);
 
   // Creates a terminating status chunk within a transfer.
   static Chunk Final(ProtocolVersion version,
                      uint32_t session_id,
                      Status status) {
-    return Chunk(version, Type::kTransferCompletion)
+    return Chunk(version, Type::kCompletion)
         .set_session_id(session_id)
         .set_status(status);
   }
@@ -70,6 +74,11 @@
     return *this;
   }
 
+  constexpr Chunk& set_protocol_version(ProtocolVersion version) {
+    protocol_version_ = version;
+    return *this;
+  }
+
   constexpr Chunk& set_window_end_offset(uint32_t window_end_offset) {
     window_end_offset_ = window_end_offset;
     return *this;
@@ -137,6 +146,10 @@
     return remaining_bytes_;
   }
 
+  constexpr ProtocolVersion protocol_version() const {
+    return protocol_version_;
+  }
+
   constexpr bool is_legacy() const {
     return protocol_version_ == ProtocolVersion::kLegacy;
   }
@@ -152,11 +165,11 @@
     // continuation parameters. Therefore, there are only three possible chunk
     // types: start, data, and retransmit.
     if (IsInitialChunk()) {
-      return Type::kTransferStart;
+      return Type::kStart;
     }
 
     if (has_payload()) {
-      return Type::kTransferData;
+      return Type::kData;
     }
 
     return Type::kParametersRetransmit;
@@ -170,25 +183,40 @@
     }
 
     return type_.value() == Type::kParametersRetransmit ||
-           type_.value() == Type::kTransferStart;
+           type_.value() == Type::kStartAckConfirmation ||
+           type_.value() == Type::kStart;
   }
 
   constexpr bool IsInitialChunk() const {
     if (protocol_version_ >= ProtocolVersion::kVersionTwo) {
-      return type_ == Type::kTransferStart;
+      return type_ == Type::kStart;
     }
 
     // In legacy versions of the transfer protocol, the chunk type is not always
     // set. Infer that a chunk is initial if it has an offset of 0 and no data
     // or status.
-    return type_ == Type::kTransferStart ||
+    return type_ == Type::kStart ||
            (offset_ == 0 && !has_payload() && !status_.has_value());
   }
 
+  constexpr bool IsTerminatingChunk() const {
+    if (is_legacy()) {
+      return status_.has_value();
+    }
+
+    return type_ == Type::kCompletion || type_ == Type::kCompletionAck;
+  }
+
   // The final chunk from the transmitter sets remaining_bytes to 0 in both Read
   // and Write transfers.
   constexpr bool IsFinalTransmitChunk() const { return remaining_bytes_ == 0u; }
 
+  // Returns true if this chunk is part of an initial transfer handshake.
+  constexpr bool IsInitialHandshakeChunk() const {
+    return type_ == Type::kStart || type_ == Type::kStartAck ||
+           type_ == Type::kStartAckConfirmation;
+  }
+
  private:
   constexpr Chunk(ProtocolVersion version, std::optional<Type> type)
       : session_id_(0),
@@ -214,7 +242,7 @@
   // chunk is processable. Following a response, the common protocol version
   // will be determined and fields omitted as necessary.
   constexpr bool ShouldEncodeLegacyFields() const {
-    return is_legacy() || type_ == Chunk::Type::kTransferStart;
+    return is_legacy() || type_ == Type::kStart;
   }
 
   uint32_t session_id_;
diff --git a/pw_transfer/public/pw_transfer/internal/client_context.h b/pw_transfer/public/pw_transfer/internal/client_context.h
index a84af31..9912aa4 100644
--- a/pw_transfer/public/pw_transfer/internal/client_context.h
+++ b/pw_transfer/public/pw_transfer/internal/client_context.h
@@ -1,4 +1,4 @@
-// Copyright 2021 The Pigweed Authors
+// Copyright 2022 The Pigweed Authors
 //
 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
 // use this file except in compliance with the License. You may obtain a copy of
@@ -27,6 +27,13 @@
     on_completion_ = std::move(on_completion);
   }
 
+  // In client-side transfer contexts, a session ID may not yet have been
+  // assigned by the server, in which case resource_id is used as the context
+  // identifier.
+  constexpr uint32_t id() const {
+    return session_id() == kUnassignedSessionId ? resource_id() : session_id();
+  }
+
  private:
   Status FinalCleanup(Status status) override;
 
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index 41d51d8..d15c3e4 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -25,6 +25,7 @@
 #include "pw_stream/stream.h"
 #include "pw_transfer/internal/chunk.h"
 #include "pw_transfer/internal/event.h"
+#include "pw_transfer/internal/protocol.h"
 #include "pw_transfer/rate_estimate.h"
 
 namespace pw::transfer::internal {
@@ -69,12 +70,15 @@
 // Information about a single transfer.
 class Context {
  public:
+  static constexpr uint32_t kUnassignedSessionId = 0;
+
   Context(const Context&) = delete;
   Context(Context&&) = delete;
   Context& operator=(const Context&) = delete;
   Context& operator=(Context&&) = delete;
 
   constexpr uint32_t session_id() const { return session_id_; }
+  constexpr uint32_t resource_id() const { return resource_id_; }
 
   // True if the context has been used for a transfer (it has an ID).
   bool initialized() const {
@@ -82,7 +86,7 @@
   }
 
   // True if the transfer is active.
-  bool active() const { return transfer_state_ >= TransferState::kWaiting; }
+  bool active() const { return transfer_state_ >= TransferState::kInitiating; }
 
   std::optional<chrono::SystemClock::time_point> timeout() const {
     return active() && next_timeout_ != kNoTimeout
@@ -104,7 +108,10 @@
   ~Context() = default;
 
   constexpr Context()
-      : session_id_(0),
+      : session_id_(kUnassignedSessionId),
+        resource_id_(0),
+        desired_protocol_version_(ProtocolVersion::kUnknown),
+        configured_protocol_version_(ProtocolVersion::kUnknown),
         flags_(0),
         transfer_state_(TransferState::kInactive),
         retries_(0),
@@ -117,6 +124,7 @@
         max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
         max_parameters_(nullptr),
         thread_(nullptr),
+        last_chunk_sent_(Chunk::Type::kData),
         last_chunk_offset_(0),
         chunk_timeout_(chrono::SystemClock::duration::zero()),
         interchunk_delay_(chrono::SystemClock::for_at_least(
@@ -132,18 +140,23 @@
     // This ServerContext has never been used for a transfer. It is available
     // for use for a transfer.
     kInactive,
-    // A transfer completed and the final status chunk was sent. The Context
-    // is
-    // available for use for a new transfer. A receive transfer uses this
-    // state
-    // to allow a transmitter to retry its last chunk if the final status
-    // chunk
+
+    // A transfer completed and the final status chunk was sent. The Context is
+    // available for use for a new transfer. A receive transfer uses this state
+    // to allow a transmitter to retry its last chunk if the final status chunk
     // was dropped.
     kCompleted,
+
+    // Transfer is starting. The server and client are performing an initial
+    // handshake and negotiating protocol and feature flags.
+    kInitiating,
+
     // Waiting for the other end to send a chunk.
     kWaiting,
+
     // Transmitting a window of data to a receiver.
     kTransmitting,
+
     // Recovering after one or more chunks was dropped in an active transfer.
     kRecovery,
   };
@@ -215,6 +228,11 @@
   // Processes a chunk in either a transfer or receive transfer.
   void HandleChunkEvent(const ChunkEvent& event);
 
+  // Runs the initial three-way handshake when starting a new transfer.
+  void PerformInitialHandshake(const Chunk& chunk);
+
+  void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);
+
   // Processes a chunk in a transmit transfer.
   void HandleTransmitChunk(const Chunk& chunk);
 
@@ -233,12 +251,18 @@
   // Sends the first chunk in a transmit transfer.
   void SendInitialTransmitChunk();
 
+  // Updates the current receive transfer parameters based on the context's
+  // configuration.
+  void UpdateTransferParameters();
+
+  // Populates the transfer parameters fields on a chunk object.
+  void SetTransferParameters(Chunk& parameters);
+
   // In a receive transfer, sends a parameters chunk telling the transmitter
   // how much data they can send.
   void SendTransferParameters(TransmitAction action);
 
-  // Updates the current receive transfer parameters from the provided object,
-  // then sends them.
+  // Updates the current receive transfer parameters, then sends them.
   void UpdateAndSendTransferParameters(TransmitAction action);
 
   // Sends a final status chunk of a completed transfer without updating the
@@ -264,6 +288,7 @@
   // Resends the last packet or aborts the transfer if the maximum retries has
   // been exceeded.
   void Retry();
+  void RetryHandshake();
 
   void LogTransferConfiguration();
 
@@ -283,6 +308,15 @@
       chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
 
   uint32_t session_id_;
+  uint32_t resource_id_;
+
+  // The version of the transfer protocol that this node wants to run.
+  ProtocolVersion desired_protocol_version_;
+
+  // The version of the transfer protocol that the context is actually running,
+  // following negotiation with the transfer peer.
+  ProtocolVersion configured_protocol_version_;
+
   uint8_t flags_;
   TransferState transfer_state_;
   uint8_t retries_;
@@ -300,6 +334,8 @@
   const TransferParameters* max_parameters_;
   TransferThread* thread_;
 
+  Chunk::Type last_chunk_sent_;
+
   union {
     Status status_;               // Used when state is kCompleted.
     uint32_t last_chunk_offset_;  // Used in states kWaiting and kRecovery.
diff --git a/pw_transfer/public/pw_transfer/internal/event.h b/pw_transfer/public/pw_transfer/internal/event.h
index eb0ac4a..304b32d 100644
--- a/pw_transfer/public/pw_transfer/internal/event.h
+++ b/pw_transfer/public/pw_transfer/internal/event.h
@@ -16,6 +16,7 @@
 #include "pw_chrono/system_clock.h"
 #include "pw_rpc/writer.h"
 #include "pw_stream/stream.h"
+#include "pw_transfer/internal/protocol.h"
 
 namespace pw::transfer::internal {
 
@@ -68,6 +69,7 @@
 
 struct NewTransferEvent {
   TransferType type;
+  ProtocolVersion protocol_version;
   uint32_t session_id;
   uint32_t resource_id;
   rpc::Writer* rpc_writer;
@@ -80,10 +82,13 @@
     stream::Stream* stream;  // In client-side transfers.
     Handler* handler;        // In server-side transfers.
   };
+
+  const std::byte* raw_chunk_data;
+  size_t raw_chunk_size;
 };
 
 struct ChunkEvent {
-  uint32_t session_id;
+  uint32_t context_identifier;
   const std::byte* data;
   size_t size;
 };
@@ -96,6 +101,7 @@
 
 struct SendStatusChunkEvent {
   uint32_t session_id;
+  ProtocolVersion protocol_version;
   Status::Code status;
   TransferStream stream;
 };
diff --git a/pw_transfer/public/pw_transfer/internal/protocol.h b/pw_transfer/public/pw_transfer/internal/protocol.h
index 376e0ed..f50cc8c 100644
--- a/pw_transfer/public/pw_transfer/internal/protocol.h
+++ b/pw_transfer/public/pw_transfer/internal/protocol.h
@@ -13,7 +13,9 @@
 // the License.
 #pragma once
 
-namespace pw::transfer::internal {
+#include <cstdint>
+
+namespace pw::transfer {
 
 enum class ProtocolVersion {
   // Protocol version not know or not set.
@@ -32,4 +34,13 @@
   kLatest = kVersionTwo,
 };
 
-}  // namespace pw::transfer::internal
+constexpr bool ValidProtocolVersion(ProtocolVersion version) {
+  return version > ProtocolVersion::kUnknown &&
+         version <= ProtocolVersion::kLatest;
+}
+
+constexpr bool ValidProtocolVersion(uint32_t version) {
+  return ValidProtocolVersion(static_cast<ProtocolVersion>(version));
+}
+
+}  // namespace pw::transfer
diff --git a/pw_transfer/public/pw_transfer/internal/server_context.h b/pw_transfer/public/pw_transfer/internal/server_context.h
index ce9a78b..265b75a 100644
--- a/pw_transfer/public/pw_transfer/internal/server_context.h
+++ b/pw_transfer/public/pw_transfer/internal/server_context.h
@@ -35,6 +35,9 @@
   // Returns the pointer to the current handler.
   const Handler* handler() { return handler_; }
 
+  // In server-side transfer contexts, a session ID always exists.
+  constexpr uint32_t id() const { return session_id(); }
+
  private:
   // Ends the transfer with the given status, calling the handler's Finalize
   // method. No chunks are sent.
diff --git a/pw_transfer/public/pw_transfer/transfer.h b/pw_transfer/public/pw_transfer/transfer.h
index 1b03952..2f0055f 100644
--- a/pw_transfer/public/pw_transfer/transfer.h
+++ b/pw_transfer/public/pw_transfer/transfer.h
@@ -63,7 +63,8 @@
                         extend_window_divisor),
         thread_(transfer_thread),
         chunk_timeout_(chunk_timeout),
-        max_retries_(max_retries) {}
+        max_retries_(max_retries),
+        next_session_id_(1) {}
 
   TransferService(const TransferService&) = delete;
   TransferService(TransferService&&) = delete;
@@ -121,11 +122,16 @@
  private:
   void HandleChunk(ConstByteSpan message, internal::TransferType type);
 
+  // TODO(frolv): This could be more sophisticated and less predictable.
+  uint32_t GenerateNewSessionId() { return next_session_id_++; }
+
   internal::TransferParameters max_parameters_;
   TransferThread& thread_;
 
   chrono::SystemClock::duration chunk_timeout_;
   uint8_t max_retries_;
+
+  uint32_t next_session_id_;
 };
 
 }  // namespace pw::transfer
diff --git a/pw_transfer/public/pw_transfer/transfer_thread.h b/pw_transfer/public/pw_transfer/transfer_thread.h
index 16b779a..b35253c 100644
--- a/pw_transfer/public/pw_transfer/transfer_thread.h
+++ b/pw_transfer/public/pw_transfer/transfer_thread.h
@@ -46,16 +46,21 @@
         encode_buffer_(encode_buffer) {}
 
   void StartClientTransfer(TransferType type,
-                           uint32_t session_id,
+                           ProtocolVersion version,
                            uint32_t resource_id,
                            stream::Stream* stream,
                            const TransferParameters& max_parameters,
                            Function<void(Status)>&& on_completion,
                            chrono::SystemClock::duration timeout,
                            uint8_t max_retries) {
+    uint32_t session_id = version == ProtocolVersion::kLegacy
+                              ? resource_id
+                              : Context::kUnassignedSessionId;
     StartTransfer(type,
+                  version,
                   session_id,
                   resource_id,
+                  /*raw_chunk=*/{},
                   stream,
                   max_parameters,
                   std::move(on_completion),
@@ -64,14 +69,18 @@
   }
 
   void StartServerTransfer(TransferType type,
+                           ProtocolVersion version,
                            uint32_t session_id,
                            uint32_t resource_id,
+                           ConstByteSpan raw_chunk,
                            const TransferParameters& max_parameters,
                            chrono::SystemClock::duration timeout,
                            uint8_t max_retries) {
     StartTransfer(type,
+                  version,
                   session_id,
                   resource_id,
+                  raw_chunk,
                   /*stream=*/nullptr,
                   max_parameters,
                   /*on_completion=*/nullptr,
@@ -160,10 +169,10 @@
   // Finds an active server or client transfer.
   template <typename T>
   static Context* FindActiveTransfer(const span<T>& transfers,
-                                     uint32_t session_id) {
-    auto transfer =
-        std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) {
-          return c.initialized() && c.session_id() == session_id;
+                                     uint32_t context_identifier) {
+    auto transfer = std::find_if(
+        transfers.begin(), transfers.end(), [context_identifier](auto& c) {
+          return c.initialized() && c.id() == context_identifier;
         });
     return transfer != transfers.end() ? &*transfer : nullptr;
   }
@@ -218,8 +227,10 @@
   chrono::SystemClock::time_point GetNextTransferTimeout() const;
 
   void StartTransfer(TransferType type,
+                     ProtocolVersion version,
                      uint32_t session_id,
                      uint32_t resource_id,
+                     ConstByteSpan raw_chunk,
                      stream::Stream* stream,
                      const TransferParameters& max_parameters,
                      Function<void(Status)>&& on_completion,
diff --git a/pw_transfer/transfer.cc b/pw_transfer/transfer.cc
index 2f6378e..682662c 100644
--- a/pw_transfer/transfer.cc
+++ b/pw_transfer/transfer.cc
@@ -30,18 +30,22 @@
   }
 
   if (chunk->IsInitialChunk()) {
-    // TODO(frolv): Right now, session ID and resource ID are the same thing.
-    // The session ID should be assigned by the server in response to the
-    // initial chunk
+    uint32_t session_id =
+        chunk->is_legacy() ? chunk->session_id() : GenerateNewSessionId();
+    uint32_t resource_id =
+        chunk->is_legacy() ? chunk->session_id() : chunk->resource_id().value();
+
     thread_.StartServerTransfer(type,
-                                chunk->session_id(),
-                                /*resource_id=*/chunk->session_id(),
+                                chunk->protocol_version(),
+                                session_id,
+                                resource_id,
+                                message,
                                 max_parameters_,
                                 chunk_timeout_,
                                 max_retries_);
+  } else {
+    thread_.ProcessServerChunk(message);
   }
-
-  thread_.ProcessServerChunk(message);
 }
 
 }  // namespace pw::transfer
diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto
index b5eb3ab..839eb2c 100644
--- a/pw_transfer/transfer.proto
+++ b/pw_transfer/transfer.proto
@@ -204,4 +204,14 @@
   // Write → ID of transfer session
   // Write ← ID of transfer session
   optional uint32 session_id = 12;
+
+  // The protocol version to use for this transfer. Only sent during the initial
+  // handshake phase of a version 2 or higher transfer to negotiate a common
+  // protocol version between the client and server.
+  //
+  //  Read → Desired (START) or configured (START_ACK_CONFIRMATION) version.
+  //  Read ← Configured protocol version (START_ACK).
+  // Write → Desired (START) or configured (START_ACK_CONFIRMATION) version.
+  // Write ← Configured protocol version (START_ACK).
+  optional uint32 protocol_version = 13;
 }
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index 75a5145..ee32151 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -37,7 +37,6 @@
 }
 
 using internal::Chunk;
-using internal::ProtocolVersion;
 
 class TestMemoryReader : public stream::SeekableReader {
  public:
@@ -141,7 +140,7 @@
 TEST_F(ReadTransfer, SingleChunk) {
   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(3)
                         .set_window_end_offset(64)
                         .set_offset(0)));
@@ -179,7 +178,7 @@
 
 TEST_F(ReadTransfer, MultiChunk) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -240,7 +239,7 @@
 
 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -276,7 +275,7 @@
 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
   rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(3)
                         .set_window_end_offset(16)
                         .set_offset(0)));
@@ -315,7 +314,7 @@
   handler_.set_seek_status(Status::Unimplemented());
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -336,7 +335,7 @@
 TEST_F(ReadTransfer, MaxChunkSize_Client) {
   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(3)
                         .set_window_end_offset(64)
                         .set_max_chunk_size_bytes(8)
@@ -395,7 +394,7 @@
 
 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(64)
                       .set_offset(0)));
@@ -414,7 +413,7 @@
   handler_.finalize_read_called = false;
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(64)
                       .set_offset(0)));
@@ -436,7 +435,7 @@
   // TODO(frolv): Fix
   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(3)
                         .set_window_end_offset(64)
                         // .set_max_chunk_size_bytes(16)
@@ -495,7 +494,7 @@
 
 TEST_F(ReadTransfer, ClientError) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -518,7 +517,7 @@
 
 TEST_F(ReadTransfer, UnregisteredHandler) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(11)
                       .set_window_end_offset(32)
                       .set_offset(0)));
@@ -538,7 +537,7 @@
           .set_window_end_offset(32)
           .set_offset(3)));
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(3)
                       .set_payload(span(kData).first(10))
                       .set_offset(3)));
@@ -553,7 +552,7 @@
 
 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -566,7 +565,7 @@
   handler_.prepare_read_called = false;  // Reset so can check if called again.
 
   ctx_.SendClientStream(  // Resend starting chunk
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(16)
                       .set_offset(0)));
@@ -595,7 +594,7 @@
 
 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(0)
                       .set_offset(0)));
@@ -614,7 +613,7 @@
   handler_.set_read_status(Status::OutOfRange());
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(3)
                       .set_window_end_offset(0)
                       .set_offset(0)));
@@ -636,7 +635,7 @@
 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(3)
                         .set_window_end_offset(64)
                         .set_offset(0)));
@@ -761,9 +760,8 @@
 };
 
 TEST_F(WriteTransfer, SingleChunk) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -777,7 +775,7 @@
   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -799,11 +797,10 @@
   // Return an error when FinalizeWrite is called.
   handler_.set_finalize_write_return(Status::FailedPrecondition());
 
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -821,15 +818,14 @@
 }
 
 TEST_F(WriteTransfer, SendingFinalPacketFails) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ctx_.output().set_send_status(Status::Unknown());
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -850,9 +846,8 @@
 }
 
 TEST_F(WriteTransfer, MultiChunk) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -864,7 +859,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(8))));
@@ -873,7 +868,7 @@
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(8)
                       .set_payload(span(kData).subspan(8))
@@ -899,7 +894,7 @@
   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
     // Send only one client packet so the service times out.
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(7)));
     transfer_thread_.SimulateServerTimeout(7);  // Time out to trigger retry
   });
@@ -914,9 +909,8 @@
 }
 
 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -928,14 +922,14 @@
   constexpr span data(kData);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(data.first(8))));
 
   // Skip offset 8 to enter a recovery state.
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(12)
                       .set_payload(data.subspan(12, 4))));
@@ -961,9 +955,8 @@
 }
 
 TEST_F(WriteTransfer, ExtendWindow) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -976,7 +969,7 @@
 
   // Window starts at 32 bytes and should extend when half of that is sent.
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(4))));
@@ -984,7 +977,7 @@
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(4)
                       .set_payload(span(kData).subspan(4, 4))));
@@ -992,7 +985,7 @@
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(8)
                       .set_payload(span(kData).subspan(8, 4))));
@@ -1000,7 +993,7 @@
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(12)
                       .set_payload(span(kData).subspan(12, 4))));
@@ -1014,7 +1007,7 @@
   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(16)
                       .set_payload(span(kData).subspan(16))
@@ -1038,9 +1031,8 @@
 };
 
 TEST_F(WriteTransfer, TransmitterReducesWindow) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1053,7 +1045,7 @@
 
   // Send only 12 bytes and set that as the new end offset.
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_window_end_offset(12)
@@ -1071,9 +1063,8 @@
 }
 
 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1085,7 +1076,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       // Larger window end offset than the receiver's.
@@ -1101,9 +1092,8 @@
 }
 
 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1115,7 +1105,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 16u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(8))));
@@ -1128,7 +1118,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 24u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(8)
                       .set_payload(span(kData).subspan(8, 8))));
@@ -1141,7 +1131,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(16)
                       .set_payload(span(kData).subspan(16, 8))));
@@ -1154,7 +1144,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(24)
                       .set_payload(span(kData).subspan(24))
@@ -1174,9 +1164,8 @@
 
 TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
   // Default max bytes is smaller than buffer.
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -1193,7 +1182,7 @@
   ctx_.service().RegisterHandler(handler_);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(987)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
@@ -1206,9 +1195,8 @@
 }
 
 TEST_F(WriteTransfer, UnexpectedOffset) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1221,7 +1209,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(8))));
@@ -1230,7 +1218,7 @@
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(4)  // incorrect
                       .set_payload(span(kData).subspan(8))
@@ -1244,7 +1232,7 @@
   EXPECT_EQ(chunk.window_end_offset(), 32u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(8)  // correct
                       .set_payload(span(kData).subspan(8))
@@ -1263,9 +1251,8 @@
 }
 
 TEST_F(WriteTransferMaxBytes16, TooMuchData) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1278,7 +1265,7 @@
 
   // window_end_offset = 16, but send 24 bytes of data.
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(24))));
@@ -1293,7 +1280,7 @@
 
 TEST_F(WriteTransfer, UnregisteredHandler) {
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(999)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
@@ -1305,9 +1292,8 @@
 }
 
 TEST_F(WriteTransfer, ClientError) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1329,16 +1315,15 @@
 }
 
 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   constexpr span data(kData);
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(data.first(1))));
@@ -1346,7 +1331,7 @@
   // Drop offset 1, then send the rest of the data.
   for (uint32_t i = 2; i < kData.size(); ++i) {
     ctx_.SendClientStream<64>(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                         .set_session_id(7)
                         .set_offset(i)
                         .set_payload(data.subspan(i, 1))));
@@ -1361,7 +1346,7 @@
 
   // Send the remaining data.
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(1)
                       .set_payload(data.subspan(1, 31))
@@ -1373,9 +1358,8 @@
 }
 
 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
@@ -1385,7 +1369,7 @@
   // Skip offset 0, then send the rest of the data.
   for (uint32_t i = 1; i < kData.size(); ++i) {
     ctx_.SendClientStream<64>(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                         .set_session_id(7)
                         .set_offset(i)
                         .set_payload(data.subspan(i, 1))));
@@ -1396,7 +1380,7 @@
   ASSERT_EQ(ctx_.total_responses(), 2u);  // Resent transfer parameters once.
 
   const auto last_chunk =
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(kData.size() - 1)
                       .set_payload(data.last(1)));
@@ -1418,7 +1402,7 @@
 
   // Resumes normal operation when correct offset is sent.
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -1430,15 +1414,14 @@
 }
 
 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -1451,7 +1434,7 @@
   EXPECT_EQ(chunk.status().value(), OkStatus());
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -1466,11 +1449,11 @@
 
 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(3)));
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(10))
@@ -1484,15 +1467,14 @@
 }
 
 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(8))));
@@ -1505,9 +1487,8 @@
   handler_.prepare_write_called = false;  // Reset to check it's called again.
 
   // Simulate client disappearing then restarting the transfer.
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1519,7 +1500,7 @@
   ASSERT_EQ(ctx_.total_responses(), 2u);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(kData)
@@ -1557,7 +1538,7 @@
   ctx_.service().RegisterHandler(unavailable_handler);
 
   ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                       .set_session_id(88)
                       .set_window_end_offset(128)
                       .set_offset(0)));
@@ -1573,7 +1554,7 @@
   // TODO(frolv): This won't work until completion ACKs are supported.
   if (false) {
     ctx_.SendClientStream(
-        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
+        EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
                         .set_session_id(88)
                         .set_window_end_offset(128)
                         .set_offset(0)));
@@ -1590,9 +1571,8 @@
 }
 
 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
-  ctx_.SendClientStream(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferStart)
-                      .set_session_id(7)));
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_TRUE(handler_.prepare_write_called);
@@ -1608,7 +1588,7 @@
   ctx_.service().set_max_pending_bytes(12);
 
   ctx_.SendClientStream<64>(
-      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kTransferData)
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
                       .set_session_id(7)
                       .set_offset(0)
                       .set_payload(span(kData).first(8))));
@@ -1622,5 +1602,682 @@
   EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
 }
 
+TEST_F(ReadTransfer, Version2_SimpleTransfer) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Complete the handshake by confirming the server's ACK and sending the first
+  // read transfer parameters.
+  rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
+    ctx_.SendClientStream(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+            .set_session_id(1)
+            .set_window_end_offset(64)
+            .set_offset(0)));
+
+    transfer_thread_.WaitUntilEventIsProcessed();
+  });
+
+  // Server should respond by starting the data transfer, sending its sole data
+  // chunk and a remaining_bytes 0 chunk.
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c1.type(), Chunk::Type::kData);
+  EXPECT_EQ(c1.session_id(), 1u);
+  EXPECT_EQ(c1.offset(), 0u);
+  ASSERT_TRUE(c1.has_payload());
+  ASSERT_EQ(c1.payload().size(), kData.size());
+  EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+            0);
+
+  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+  EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c2.type(), Chunk::Type::kData);
+  EXPECT_EQ(c2.session_id(), 1u);
+  EXPECT_FALSE(c2.has_payload());
+  EXPECT_EQ(c2.remaining_bytes(), 0u);
+
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_MultiChunk) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Complete the handshake by confirming the server's ACK and sending the first
+  // read transfer parameters.
+  rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
+    ctx_.SendClientStream(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+            .set_session_id(1)
+            .set_window_end_offset(64)
+            .set_max_chunk_size_bytes(16)
+            .set_offset(0)));
+
+    transfer_thread_.WaitUntilEventIsProcessed();
+  });
+
+  ASSERT_EQ(ctx_.total_responses(), 4u);
+
+  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c1.type(), Chunk::Type::kData);
+  EXPECT_EQ(c1.session_id(), 1u);
+  EXPECT_EQ(c1.offset(), 0u);
+  ASSERT_TRUE(c1.has_payload());
+  ASSERT_EQ(c1.payload().size(), 16u);
+  EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+            0);
+
+  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+  EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c2.type(), Chunk::Type::kData);
+  EXPECT_EQ(c2.session_id(), 1u);
+  EXPECT_EQ(c2.offset(), 16u);
+  ASSERT_TRUE(c2.has_payload());
+  ASSERT_EQ(c2.payload().size(), 16u);
+  EXPECT_EQ(
+      std::memcmp(
+          c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
+      0);
+
+  Chunk c3 = DecodeChunk(ctx_.responses()[3]);
+  EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c3.type(), Chunk::Type::kData);
+  EXPECT_EQ(c3.session_id(), 1u);
+  EXPECT_FALSE(c3.has_payload());
+  EXPECT_EQ(c3.remaining_bytes(), 0u);
+
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_MultiParameters) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Complete the handshake by confirming the server's ACK and sending the first
+  // read transfer parameters.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)
+          .set_window_end_offset(16)
+          .set_offset(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c1.type(), Chunk::Type::kData);
+  EXPECT_EQ(c1.session_id(), 1u);
+  EXPECT_EQ(c1.offset(), 0u);
+  ASSERT_TRUE(c1.has_payload());
+  ASSERT_EQ(c1.payload().size(), 16u);
+  EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+            0);
+
+  rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
+    ctx_.SendClientStream(EncodeChunk(
+        Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
+            .set_session_id(1)
+            .set_window_end_offset(64)
+            .set_offset(16)));
+    transfer_thread_.WaitUntilEventIsProcessed();
+  });
+
+  ASSERT_EQ(ctx_.total_responses(), 4u);
+
+  Chunk c2 = DecodeChunk(ctx_.responses()[2]);
+  EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c2.type(), Chunk::Type::kData);
+  EXPECT_EQ(c2.session_id(), 1u);
+  EXPECT_EQ(c2.offset(), 16u);
+  ASSERT_TRUE(c2.has_payload());
+  ASSERT_EQ(c2.payload().size(), 16u);
+  EXPECT_EQ(
+      std::memcmp(
+          c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
+      0);
+
+  Chunk c3 = DecodeChunk(ctx_.responses()[3]);
+  EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c3.type(), Chunk::Type::kData);
+  EXPECT_EQ(c3.session_id(), 1u);
+  EXPECT_FALSE(c3.has_payload());
+  EXPECT_EQ(c3.remaining_bytes(), 0u);
+
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
+}
+
+TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Send a terminating chunk instead of the third part of the handshake.
+  ctx_.SendClientStream(EncodeChunk(Chunk::Final(
+      ProtocolVersion::kVersionTwo, 1, Status::ResourceExhausted())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Complete the handshake by confirming the server's ACK and sending the first
+  // read transfer parameters.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)
+          .set_window_end_offset(16)
+          .set_offset(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c1.type(), Chunk::Type::kData);
+  EXPECT_EQ(c1.session_id(), 1u);
+  EXPECT_EQ(c1.offset(), 0u);
+  ASSERT_TRUE(c1.has_payload());
+  ASSERT_EQ(c1.payload().size(), 16u);
+  EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
+            0);
+
+  // Send a parameters update, but with the incorrect protocol version. The
+  // server should terminate the transfer.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
+          .set_session_id(1)
+          .set_window_end_offset(64)
+          .set_offset(16)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), Status::Internal());
+
+  EXPECT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
+}
+
+TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(3)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_read_called);
+  EXPECT_FALSE(handler_.finalize_read_called);
+
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 3u);
+
+  // Complete the handshake, but send an invalid parameters chunk. The server
+  // should terminate the transfer.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)
+          .set_window_end_offset(0)
+          .set_offset(0)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  Chunk c1 = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(c1.session_id(), 1u);
+  ASSERT_TRUE(c1.status().has_value());
+  EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, Version2_InvalidResourceId) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(99)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.status().value(), Status::NotFound());
+}
+
+TEST_F(WriteTransfer, Version2_SimpleTransfer) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(7)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_write_called);
+  EXPECT_FALSE(handler_.finalize_write_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 7u);
+
+  // Complete the handshake by confirming the server's ACK.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Server should respond by sending its initial transfer parameters.
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  chunk = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+  // Send all of our data.
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(0)
+                      .set_payload(kData)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_TRUE(handler_.finalize_write_called);
+  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_Multichunk) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(7)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_write_called);
+  EXPECT_FALSE(handler_.finalize_write_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 7u);
+
+  // Complete the handshake by confirming the server's ACK.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Server should respond by sending its initial transfer parameters.
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  chunk = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+  // Send all of our data across two chunks.
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(0)
+                      .set_payload(span(kData).first(8))));
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(8)
+                      .set_payload(span(kData).subspan(8))
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_TRUE(handler_.finalize_write_called);
+  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_ContinueParameters) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(7)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_write_called);
+  EXPECT_FALSE(handler_.finalize_write_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 7u);
+
+  // Complete the handshake by confirming the server's ACK.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Server should respond by sending its initial transfer parameters.
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  chunk = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+  // Send all of our data across several chunks.
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(0)
+                      .set_payload(span(kData).first(8))));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(8)
+                      .set_payload(span(kData).subspan(8, 8))));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 16u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(16)
+                      .set_payload(span(kData).subspan(16, 8))));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+  ASSERT_EQ(ctx_.total_responses(), 4u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 24u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(24)
+                      .set_payload(span(kData).subspan(24))
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 5u);
+
+  chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  ASSERT_TRUE(chunk.status().has_value());
+  EXPECT_EQ(chunk.status().value(), OkStatus());
+
+  EXPECT_TRUE(handler_.finalize_write_called);
+  EXPECT_EQ(handler_.finalize_write_status, OkStatus());
+  EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
+}
+
+TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(7)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_write_called);
+  EXPECT_FALSE(handler_.finalize_write_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 7u);
+
+  // Send an error chunk instead of completing the handshake.
+  ctx_.SendClientStream(EncodeChunk(Chunk::Final(
+      ProtocolVersion::kVersionTwo, 1, Status::FailedPrecondition())));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  EXPECT_TRUE(handler_.finalize_write_called);
+  EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
+}
+
+TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(7)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  EXPECT_TRUE(handler_.prepare_write_called);
+  EXPECT_FALSE(handler_.finalize_write_called);
+
+  // First, the server responds with a START_ACK, assigning a session ID and
+  // confirming the protocol version.
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.resource_id(), 7u);
+
+  // Complete the handshake by confirming the server's ACK.
+  ctx_.SendClientStream(EncodeChunk(
+      Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
+          .set_session_id(1)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Server should respond by sending its initial transfer parameters.
+  ASSERT_EQ(ctx_.total_responses(), 2u);
+
+  chunk = DecodeChunk(ctx_.responses()[1]);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
+  EXPECT_EQ(chunk.session_id(), 1u);
+  EXPECT_EQ(chunk.offset(), 0u);
+  EXPECT_EQ(chunk.window_end_offset(), 32u);
+  ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
+  EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
+
+  // The transfer was configured to use protocol version 2. Send a legacy chunk
+  // instead.
+  ctx_.SendClientStream<64>(
+      EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
+                      .set_session_id(1)
+                      .set_offset(0)
+                      .set_payload(kData)
+                      .set_remaining_bytes(0)));
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  // Server should terminate the transfer.
+  ASSERT_EQ(ctx_.total_responses(), 3u);
+
+  chunk = DecodeChunk(ctx_.responses()[2]);
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.status().value(), Status::Internal());
+}
+
+TEST_F(WriteTransfer, Version2_InvalidResourceId) {
+  ctx_.SendClientStream(
+      EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
+                      .set_resource_id(99)));
+
+  transfer_thread_.WaitUntilEventIsProcessed();
+
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
+  EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
+  EXPECT_EQ(chunk.status().value(), Status::NotFound());
+}
+
 }  // namespace
 }  // namespace pw::transfer::test
diff --git a/pw_transfer/transfer_thread.cc b/pw_transfer/transfer_thread.cc
index 85cfa48..c863efc 100644
--- a/pw_transfer/transfer_thread.cc
+++ b/pw_transfer/transfer_thread.cc
@@ -36,7 +36,7 @@
 
   next_event_.type = type;
   next_event_.chunk = {};
-  next_event_.chunk.session_id = session_id;
+  next_event_.chunk.context_identifier = session_id;
 
   event_notification_.release();
 
@@ -99,8 +99,10 @@
 }
 
 void TransferThread::StartTransfer(TransferType type,
+                                   ProtocolVersion version,
                                    uint32_t session_id,
                                    uint32_t resource_id,
+                                   ConstByteSpan raw_chunk,
                                    stream::Stream* stream,
                                    const TransferParameters& max_parameters,
                                    Function<void(Status)>&& on_completion,
@@ -113,14 +115,22 @@
 
   next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
                                         : EventType::kNewServerTransfer;
+
+  if (!raw_chunk.empty()) {
+    std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
+  }
+
   next_event_.new_transfer = {
       .type = type,
+      .protocol_version = version,
       .session_id = session_id,
       .resource_id = resource_id,
       .max_parameters = &max_parameters,
       .timeout = timeout,
       .max_retries = max_retries,
       .transfer_thread = this,
+      .raw_chunk_data = chunk_buffer_.data(),
+      .raw_chunk_size = raw_chunk.size(),
   };
 
   staged_on_completion_ = std::move(on_completion);
@@ -147,6 +157,7 @@
       next_event_.type = EventType::kSendStatusChunk;
       next_event_.send_status_chunk = {
           .session_id = session_id,
+          .protocol_version = version,
           .status = Status::NotFound().code(),
           .stream = type == TransferType::kTransmit
                         ? TransferStream::kServerRead
@@ -165,9 +176,9 @@
   PW_CHECK(chunk.size() <= chunk_buffer_.size(),
            "Transfer received a larger chunk than it can handle.");
 
-  Result<uint32_t> session_id = Chunk::ExtractSessionId(chunk);
-  if (!session_id.ok()) {
-    PW_LOG_ERROR("Received a malformed chunk without a session ID");
+  Result<uint32_t> identifier = Chunk::ExtractIdentifier(chunk);
+  if (!identifier.ok()) {
+    PW_LOG_ERROR("Received a malformed chunk without a context identifier");
     return;
   }
 
@@ -178,7 +189,7 @@
 
   next_event_.type = type;
   next_event_.chunk = {
-      .session_id = *session_id,
+      .context_identifier = *identifier,
       .data = chunk_buffer_.data(),
       .size = chunk.size(),
   };
@@ -347,6 +358,7 @@
       // On the server, send a status chunk back to the client.
       SendStatusChunk(
           {.session_id = event.new_transfer.session_id,
+           .protocol_version = event.new_transfer.protocol_version,
            .status = Status::ResourceExhausted().code(),
            .stream = event.new_transfer.type == TransferType::kTransmit
                          ? TransferStream::kServerRead
@@ -373,14 +385,18 @@
       return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
 
     case EventType::kClientChunk:
-      return FindActiveTransfer(client_transfers_, event.chunk.session_id);
+      return FindActiveTransfer(client_transfers_,
+                                event.chunk.context_identifier);
     case EventType::kServerChunk:
-      return FindActiveTransfer(server_transfers_, event.chunk.session_id);
+      return FindActiveTransfer(server_transfers_,
+                                event.chunk.context_identifier);
 
     case EventType::kClientTimeout:  // Manually triggered client timeout
-      return FindActiveTransfer(client_transfers_, event.chunk.session_id);
+      return FindActiveTransfer(client_transfers_,
+                                event.chunk.context_identifier);
     case EventType::kServerTimeout:  // Manually triggered server timeout
-      return FindActiveTransfer(server_transfers_, event.chunk.session_id);
+      return FindActiveTransfer(server_transfers_,
+                                event.chunk.context_identifier);
 
     case EventType::kClientEndTransfer:
       return FindActiveTransfer(client_transfers_,
@@ -404,7 +420,7 @@
   rpc::Writer& destination = stream_for(event.stream);
 
   Result<ConstByteSpan> result =
-      Chunk::Final(ProtocolVersion::kLegacy, event.session_id, event.status)
+      Chunk::Final(event.protocol_version, event.session_id, event.status)
           .Encode(chunk_buffer_);
 
   if (!result.ok()) {
diff --git a/pw_transfer/transfer_thread_test.cc b/pw_transfer/transfer_thread_test.cc
index 9fba333..b7524bb 100644
--- a/pw_transfer/transfer_thread_test.cc
+++ b/pw_transfer/transfer_thread_test.cc
@@ -106,8 +106,10 @@
   transfer_thread_.AddTransferHandler(handler);
 
   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
+                                       ProtocolVersion::kLegacy,
                                        3,
                                        3,
+                                       {},
                                        max_parameters_,
                                        std::chrono::seconds(2),
                                        0);
@@ -128,8 +130,10 @@
   transfer_thread_.RemoveTransferHandler(handler);
 
   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
+                                       ProtocolVersion::kLegacy,
                                        3,
                                        3,
+                                       {},
                                        max_parameters_,
                                        std::chrono::seconds(2),
                                        0);
@@ -154,21 +158,21 @@
   SimpleReadTransfer handler(3, kData);
   transfer_thread_.AddTransferHandler(handler);
 
-  transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
-                                       3,
-                                       3,
-                                       max_parameters_,
-                                       std::chrono::seconds(2),
-                                       0);
-
   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
-    transfer_thread_.ProcessServerChunk(
-        EncodeChunk(Chunk(internal::ProtocolVersion::kLegacy,
-                          Chunk::Type::kParametersRetransmit)
-                        .set_session_id(3)
-                        .set_window_end_offset(16)
-                        .set_max_chunk_size_bytes(8)
-                        .set_offset(0)));
+    transfer_thread_.StartServerTransfer(
+        internal::TransferType::kTransmit,
+        ProtocolVersion::kLegacy,
+        3,
+        3,
+        EncodeChunk(
+            Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+                .set_session_id(3)
+                .set_window_end_offset(16)
+                .set_max_chunk_size_bytes(8)
+                .set_offset(0)),
+        max_parameters_,
+        std::chrono::seconds(2),
+        0);
   });
 
   ASSERT_EQ(ctx_.total_responses(), 2u);
@@ -201,31 +205,48 @@
   transfer_thread_.AddTransferHandler(handler3);
   transfer_thread_.AddTransferHandler(handler4);
 
-  transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
-                                       3,
-                                       3,
-                                       max_parameters_,
-                                       std::chrono::seconds(2),
-                                       0);
+  transfer_thread_.StartServerTransfer(
+      internal::TransferType::kTransmit,
+      ProtocolVersion::kLegacy,
+      3,
+      3,
+      EncodeChunk(
+          Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+              .set_session_id(3)
+              .set_window_end_offset(16)
+              .set_max_chunk_size_bytes(8)
+              .set_offset(0)),
+      max_parameters_,
+      std::chrono::seconds(2),
+      0);
   transfer_thread_.WaitUntilEventIsProcessed();
 
   // First transfer starts correctly.
   EXPECT_TRUE(handler3.prepare_read_called);
   EXPECT_FALSE(handler4.prepare_read_called);
+  ASSERT_EQ(ctx_.total_responses(), 1u);
 
   // Try to start a simultaneous transfer to resource 4, for which the thread
   // does not have an available context.
-  transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
-                                       4,
-                                       4,
-                                       max_parameters_,
-                                       std::chrono::seconds(2),
-                                       0);
+  transfer_thread_.StartServerTransfer(
+      internal::TransferType::kTransmit,
+      ProtocolVersion::kLegacy,
+      4,
+      4,
+      EncodeChunk(
+          Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
+              .set_session_id(4)
+              .set_window_end_offset(16)
+              .set_max_chunk_size_bytes(8)
+              .set_offset(0)),
+      max_parameters_,
+      std::chrono::seconds(2),
+      0);
   transfer_thread_.WaitUntilEventIsProcessed();
 
   EXPECT_FALSE(handler4.prepare_read_called);
 
-  ASSERT_EQ(ctx_.total_responses(), 1u);
+  ASSERT_EQ(ctx_.total_responses(), 2u);
   auto chunk = DecodeChunk(ctx_.response());
   EXPECT_EQ(chunk.session_id(), 4u);
   ASSERT_TRUE(chunk.status().has_value());
@@ -248,7 +269,7 @@
 
   transfer_thread_.StartClientTransfer(
       internal::TransferType::kReceive,
-      3,
+      ProtocolVersion::kLegacy,
       3,
       &buffer3,
       max_parameters_,
@@ -264,7 +285,7 @@
   // does not have an available context.
   transfer_thread_.StartClientTransfer(
       internal::TransferType::kReceive,
-      4,
+      ProtocolVersion::kLegacy,
       4,
       &buffer4,
       max_parameters_,
@@ -275,6 +296,9 @@
 
   EXPECT_EQ(status3, Status::Unknown());
   EXPECT_EQ(status4, Status::ResourceExhausted());
+
+  transfer_thread_.EndClientTransfer(3, Status::Cancelled());
+  transfer_thread_.EndClientTransfer(4, Status::Cancelled());
 }
 
 }  // namespace