pw_transfer: Support filling a receive buffer
Previously, transfers would fail if the client set pending_bytes to 0,
even if there were no more bytes to send. This made it impossible for
the C++ client to completely fill its receive buffer.
This change permits the receiver to set pending_bytes to 0. The transfer
aborts with an error if there is still data to send, but completes
successfully if there is no more data.
Change-Id: Idf0aa5a785efb60a8332131c49c4ae2a5ff81064
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/70883
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 50b3905..d8a01ca 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -52,6 +52,19 @@
return ReadReceiveChunk(buffer, max_parameters, chunk);
}
+void Context::ProcessChunk(ChunkDataBuffer& buffer,
+ const TransferParameters& max_parameters) {
+ if (type() == kTransmit) {
+ ProcessTransmitChunk();
+ } else {
+ ProcessReceiveChunk(buffer, max_parameters);
+ }
+
+ if (active()) {
+ timer_.InvokeAfter(chunk_timeout_);
+ }
+}
+
Status Context::SendInitialTransmitChunk() {
// A transmitter begins a transfer by just sending its ID.
internal::Chunk chunk = {};
@@ -115,13 +128,6 @@
return false;
}
- if (chunk.pending_bytes == 0u) {
- PW_LOG_ERROR("Transfer %d receiver requested 0 bytes (invalid); aborting",
- static_cast<unsigned>(transfer_id_));
- FinishAndSendStatus(Status::Internal());
- return false;
- }
-
// If the offsets don't match, attempt to seek on the reader. Not all readers
// support seeking; abort with UNIMPLEMENTED if this handler doesn't.
if (offset_ != chunk.offset) {
@@ -280,10 +286,6 @@
}
Status Context::SendNextDataChunk() {
- if (pending_bytes_ == 0) {
- return Status::OutOfRange();
- }
-
ByteSpan buffer = rpc_writer_->PayloadBuffer();
// Begin by doing a partial encode of all the metadata fields, leaving the
@@ -309,6 +311,15 @@
encoder.WriteRemainingBytes(0).IgnoreError();
pending_bytes_ = 0;
} else if (data.ok()) {
+ if (pending_bytes_ == 0u) {
+ PW_LOG_DEBUG(
+ "Transfer %u is not finished, but the receiver cannot accept any "
+ "more data (pending_bytes is 0)",
+ static_cast<unsigned>(transfer_id_));
+ rpc_writer_->ReleasePayloadBuffer();
+ return Status::ResourceExhausted();
+ }
+
encoder.WriteData(data.value()).IgnoreError();
last_chunk_offset_ = offset_;
offset_ += data.value().size();
@@ -336,6 +347,11 @@
}
flags_ |= kFlagsDataSent;
+
+ if (pending_bytes_ == 0) {
+ return Status::OutOfRange();
+ }
+
return data.status();
}
@@ -418,18 +434,9 @@
Status Context::UpdateAndSendTransferParameters(
const TransferParameters& max_parameters) {
- const size_t write_limit = writer().ConservativeWriteLimit();
- if (write_limit == 0) {
- PW_LOG_WARN(
- "Transfer %u writer returned 0 from ConservativeWriteLimit(); cannot "
- "continue, aborting with RESOURCE_EXHAUSTED",
- static_cast<unsigned>(transfer_id_));
- FinishAndSendStatus(Status::ResourceExhausted());
- return Status::ResourceExhausted();
- }
-
- pending_bytes_ = std::min(max_parameters.pending_bytes(),
- static_cast<uint32_t>(write_limit));
+ pending_bytes_ =
+ std::min(max_parameters.pending_bytes(),
+ static_cast<uint32_t>(writer().ConservativeWriteLimit()));
max_chunk_size_bytes_ = MaxWriteChunkSize(
max_parameters.max_chunk_size_bytes(), rpc_writer_->channel_id());
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index 8f3360e..229faa9 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -203,7 +203,11 @@
| ``PERMISSION_DENIED`` | The transfer does not support the requested |
| | operation (either reading or writing). |
+-------------------------+-------------------------+-------------------------+
-| ``RESOURCE_EXHAUSTED`` | (not sent) | Storage is full. |
+| ``RESOURCE_EXHAUSTED`` | The receiver requested | Storage is full. |
+| | zero bytes, indicating | |
+| | their storage is full, | |
+| | but there is still data | |
+| | to send. | |
+-------------------------+-------------------------+-------------------------+
| ``UNAVAILABLE`` | The service is busy with other transfers and |
| | cannot begin a new transfer at this time. |
diff --git a/pw_transfer/public/pw_transfer/internal/context.h b/pw_transfer/public/pw_transfer/internal/context.h
index cd0abfa..5f9d410 100644
--- a/pw_transfer/public/pw_transfer/internal/context.h
+++ b/pw_transfer/public/pw_transfer/internal/context.h
@@ -96,17 +96,7 @@
// operation is intended to be deferred, running from a different context than
// the RPC callback in which the chunk was received.
void ProcessChunk(ChunkDataBuffer& buffer,
- const TransferParameters& max_parameters) {
- if (type() == kTransmit) {
- ProcessTransmitChunk();
- } else {
- ProcessReceiveChunk(buffer, max_parameters);
- }
-
- if (active()) {
- timer_.InvokeAfter(chunk_timeout_);
- }
- }
+ const TransferParameters& max_parameters);
protected:
using CompletionFunction = Status (*)(Context&, Status);
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index 1a100e6..fad0987 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -41,12 +41,17 @@
}
StatusWithSize DoRead(ByteSpan dest) final {
+ if (!read_status.ok()) {
+ return StatusWithSize(read_status, 0);
+ }
+
auto result = memory_reader_.Read(dest);
return result.ok() ? StatusWithSize(result->size())
: StatusWithSize(result.status(), 0);
}
Status seek_status;
+ Status read_status;
private:
stream::MemoryReader memory_reader_;
@@ -74,6 +79,7 @@
}
void set_seek_status(Status status) { reader_.seek_status = status; }
+ void set_read_status(Status status) { reader_.read_status = status; }
bool prepare_read_called;
bool finalize_read_called;
@@ -389,16 +395,32 @@
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
-TEST_F(ReadTransfer, AbortTransferIfZeroBytesAreRequested) {
+TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
ASSERT_EQ(ctx_.total_responses(), 1u);
- EXPECT_TRUE(handler_.finalize_read_called);
- EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
+ ASSERT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
Chunk chunk = DecodeChunk(ctx_.responses().back());
- EXPECT_TRUE(chunk.status.has_value());
- EXPECT_EQ(*chunk.status, Status::Internal());
+ EXPECT_EQ(chunk.status, Status::ResourceExhausted());
+}
+
+TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
+ // Make the next read appear to be the end of the stream.
+ handler_.set_read_status(Status::OutOfRange());
+
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .pending_bytes = 0}));
+
+ Chunk chunk = DecodeChunk(ctx_.responses().back());
+ EXPECT_EQ(chunk.transfer_id, 3u);
+ EXPECT_EQ(chunk.remaining_bytes, 0u);
+
+ ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
+
+ ASSERT_EQ(ctx_.total_responses(), 1u);
+ ASSERT_TRUE(handler_.finalize_read_called);
+ EXPECT_EQ(handler_.finalize_read_status, OkStatus());
}
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {