pw_transfer: Support filling a receive buffer

Previously, transfers would fail if the client set pending_bytes to 0,
even if there were no more bytes to send. This made it impossible for
the C++ client to completely fill its receive buffer.

This change permits the receiver to set pending_bytes to 0. The transfer
aborts with an error if there is still data to send, but completes
successfully if there is no more data.

Change-Id: Idf0aa5a785efb60a8332131c49c4ae2a5ff81064
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/70883
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 50b3905..d8a01ca 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -52,6 +52,19 @@
   return ReadReceiveChunk(buffer, max_parameters, chunk);
 }
 
+void Context::ProcessChunk(ChunkDataBuffer& buffer,
+                           const TransferParameters& max_parameters) {
+  if (type() == kTransmit) {
+    ProcessTransmitChunk();
+  } else {
+    ProcessReceiveChunk(buffer, max_parameters);
+  }
+
+  if (active()) {
+    timer_.InvokeAfter(chunk_timeout_);
+  }
+}
+
 Status Context::SendInitialTransmitChunk() {
   // A transmitter begins a transfer by just sending its ID.
   internal::Chunk chunk = {};
@@ -115,13 +128,6 @@
     return false;
   }
 
-  if (chunk.pending_bytes == 0u) {
-    PW_LOG_ERROR("Transfer %d receiver requested 0 bytes (invalid); aborting",
-                 static_cast<unsigned>(transfer_id_));
-    FinishAndSendStatus(Status::Internal());
-    return false;
-  }
-
   // If the offsets don't match, attempt to seek on the reader. Not all readers
   // support seeking; abort with UNIMPLEMENTED if this handler doesn't.
   if (offset_ != chunk.offset) {
@@ -280,10 +286,6 @@
 }
 
 Status Context::SendNextDataChunk() {
-  if (pending_bytes_ == 0) {
-    return Status::OutOfRange();
-  }
-
   ByteSpan buffer = rpc_writer_->PayloadBuffer();
 
   // Begin by doing a partial encode of all the metadata fields, leaving the
@@ -309,6 +311,15 @@
     encoder.WriteRemainingBytes(0).IgnoreError();
     pending_bytes_ = 0;
   } else if (data.ok()) {
+    if (pending_bytes_ == 0u) {
+      PW_LOG_DEBUG(
+          "Transfer %u is not finished, but the receiver cannot accept any "
+          "more data (pending_bytes is 0)",
+          static_cast<unsigned>(transfer_id_));
+      rpc_writer_->ReleasePayloadBuffer();
+      return Status::ResourceExhausted();
+    }
+
     encoder.WriteData(data.value()).IgnoreError();
     last_chunk_offset_ = offset_;
     offset_ += data.value().size();
@@ -336,6 +347,11 @@
   }
 
   flags_ |= kFlagsDataSent;
+
+  if (pending_bytes_ == 0) {
+    return Status::OutOfRange();
+  }
+
   return data.status();
 }
 
@@ -418,18 +434,9 @@
 
 Status Context::UpdateAndSendTransferParameters(
     const TransferParameters& max_parameters) {
-  const size_t write_limit = writer().ConservativeWriteLimit();
-  if (write_limit == 0) {
-    PW_LOG_WARN(
-        "Transfer %u writer returned 0 from ConservativeWriteLimit(); cannot "
-        "continue, aborting with RESOURCE_EXHAUSTED",
-        static_cast<unsigned>(transfer_id_));
-    FinishAndSendStatus(Status::ResourceExhausted());
-    return Status::ResourceExhausted();
-  }
-
-  pending_bytes_ = std::min(max_parameters.pending_bytes(),
-                            static_cast<uint32_t>(write_limit));
+  pending_bytes_ =
+      std::min(max_parameters.pending_bytes(),
+               static_cast<uint32_t>(writer().ConservativeWriteLimit()));
 
   max_chunk_size_bytes_ = MaxWriteChunkSize(
       max_parameters.max_chunk_size_bytes(), rpc_writer_->channel_id());
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index 8f3360e..229faa9 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -203,7 +203,11 @@
 | ``PERMISSION_DENIED``   | The transfer does not support the requested       |
 |                         | operation (either reading or writing).            |
 +-------------------------+-------------------------+-------------------------+
-| ``RESOURCE_EXHAUSTED``  | (not sent)              | Storage is full.        |
+| ``RESOURCE_EXHAUSTED``  | The receiver requested  | Storage is full.        |
+|                         | zero bytes, indicating  |                         |
+|                         | their storage is full,  |                         |
+|                         | but there is still data |                         |
+|                         | to send.                |                         |
 +-------------------------+-------------------------+-------------------------+
 | ``UNAVAILABLE``         | The service is busy with other transfers and      |
 |                         | cannot begin a new transfer at this time.         |
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index cd0abfa..5f9d410 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -96,17 +96,7 @@
   // operation is intended to be deferred, running from a different context than
   // the RPC callback in which the chunk was received.
   void ProcessChunk(ChunkDataBuffer& buffer,
-                    const TransferParameters& max_parameters) {
-    if (type() == kTransmit) {
-      ProcessTransmitChunk();
-    } else {
-      ProcessReceiveChunk(buffer, max_parameters);
-    }
-
-    if (active()) {
-      timer_.InvokeAfter(chunk_timeout_);
-    }
-  }
+                    const TransferParameters& max_parameters);
 
  protected:
   using CompletionFunction = Status (*)(Context&, Status);
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index 1a100e6..fad0987 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -41,12 +41,17 @@
   }
 
   StatusWithSize DoRead(ByteSpan dest) final {
+    if (!read_status.ok()) {
+      return StatusWithSize(read_status, 0);
+    }
+
     auto result = memory_reader_.Read(dest);
     return result.ok() ? StatusWithSize(result->size())
                        : StatusWithSize(result.status(), 0);
   }
 
   Status seek_status;
+  Status read_status;
 
  private:
   stream::MemoryReader memory_reader_;
@@ -74,6 +79,7 @@
   }
 
   void set_seek_status(Status status) { reader_.seek_status = status; }
+  void set_read_status(Status status) { reader_.read_status = status; }
 
   bool prepare_read_called;
   bool finalize_read_called;
@@ -389,16 +395,32 @@
   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
 }
 
-TEST_F(ReadTransfer, AbortTransferIfZeroBytesAreRequested) {
+TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
 
   ASSERT_EQ(ctx_.total_responses(), 1u);
-  EXPECT_TRUE(handler_.finalize_read_called);
-  EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
+  ASSERT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
 
   Chunk chunk = DecodeChunk(ctx_.responses().back());
-  EXPECT_TRUE(chunk.status.has_value());
-  EXPECT_EQ(*chunk.status, Status::Internal());
+  EXPECT_EQ(chunk.status, Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
+  // Make the next read appear to be the end of the stream.
+  handler_.set_read_status(Status::OutOfRange());
+
+  ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
+
+  Chunk chunk = DecodeChunk(ctx_.responses().back());
+  EXPECT_EQ(chunk.transfer_id, 3u);
+  EXPECT_EQ(chunk.remaining_bytes, 0u);
+
+  ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
+
+  ASSERT_EQ(ctx_.total_responses(), 1u);
+  ASSERT_TRUE(handler_.finalize_read_called);
+  EXPECT_EQ(handler_.finalize_read_status, OkStatus());
 }
 
 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {