pw_transfer: Python client and C++ service fixes
- Client: respect the pending_bytes field from the transfer parameters
when sending a chunk.
- Client: Notify the server if a timeout occurred so that it can cancel
the transfer.
- Client: Don't invoke the end_transfer callback when killing all
transfers due to an error to avoid modifying the transfer dictionary
while iterating through it.
- Client: Accept string as well as bytes data for write transfers.
- Server: Terminate a transfer if the client sends more data than
requested.
- Server: Log and clean up if write transfer parameters fail to send.
Change-Id: I89144c5f7e64d8b8e9ed4bb2718d8f73ff31a2e5
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/62143
Commit-Queue: Alexei Frolov <frolv@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py
index 144940c..4be04f7 100644
--- a/pw_transfer/py/pw_transfer/transfer.py
+++ b/pw_transfer/py/pw_transfer/transfer.py
@@ -18,7 +18,7 @@
from dataclasses import dataclass
import logging
import threading
-from typing import Any, Callable, Dict, Optional
+from typing import Any, Callable, Dict, Optional, Union
from pw_rpc.callback_client import BidirectionalStreamingCall
from pw_status import Status
@@ -109,13 +109,15 @@
def progress_stats(self) -> ProgressStats:
"""Returns the current progress of the transfer."""
- def finish(self, status: Status) -> None:
+ def finish(self, status: Status, skip_callback: bool = False) -> None:
"""Ends the transfer with the specified status."""
self._response_timer.stop()
self.status = status
self._invoke_progress_callback()
self.done.set()
- self._end_transfer(self)
+
+ if not skip_callback:
+ self._end_transfer(self)
def _invoke_progress_callback(self) -> None:
"""Invokes the provided progress callback, if any, with the progress."""
@@ -143,9 +145,10 @@
response_timeout_s: float,
progress_callback: ProgressCallback = None,
):
- super().__init__(transfer_id, data, send_chunk, end_transfer,
- _Timer(lambda: self.finish(Status.DEADLINE_EXCEEDED)),
- progress_callback)
+ super().__init__(
+ transfer_id, data, send_chunk, end_transfer,
+ _Timer(lambda: self._send_error(Status.DEADLINE_EXCEEDED)),
+ progress_callback)
self._offset = 0
self._bytes_acknowledged = 0
@@ -228,14 +231,13 @@
def _next_chunk(self) -> Chunk:
"""Returns the next Chunk message to send in the data transfer."""
chunk = Chunk(transfer_id=self.id, offset=self._offset)
+ max_bytes_in_chunk = min(self._max_chunk_size, self._max_bytes_to_send)
- if len(self.data) - self._offset <= self._max_chunk_size:
- # Final chunk of the transfer.
- chunk.data = self.data[self._offset:]
+ chunk.data = self.data[self._offset:self._offset + max_bytes_in_chunk]
+
+ # Mark the final chunk of the transfer.
+ if len(self.data) - self._offset <= max_bytes_in_chunk:
chunk.remaining_bytes = 0
- else:
- chunk.data = self.data[self._offset:self._offset +
- self._max_chunk_size]
return chunk
@@ -439,7 +441,7 @@
def write(self,
transfer_id: int,
- data: bytes,
+ data: Union[bytes, str],
progress_callback: ProgressCallback = None) -> None:
"""Transmits ("uploads") data to the server.
@@ -454,6 +456,9 @@
Error: the transfer failed to complete
"""
+ if isinstance(data, str):
+ data = data.encode()
+
if transfer_id in self._write_transfers:
raise ValueError(f'Write transfer {transfer_id} already exists')
@@ -576,7 +581,7 @@
self._read_stream = None
for _, transfer in self._read_transfers.items():
- transfer.finish(Status.INTERNAL)
+ transfer.finish(Status.INTERNAL, skip_callback=True)
self._read_transfers = {}
_LOG.error('Read stream shut down: %s', status)
@@ -603,7 +608,7 @@
self._write_stream = None
for _, transfer in self._write_transfers.items():
- transfer.finish(Status.INTERNAL)
+ transfer.finish(Status.INTERNAL, skip_callback=True)
self._write_transfers = {}
_LOG.error('Write stream shut down: %s', status)
diff --git a/pw_transfer/py/tests/python_cpp_transfer_test.py b/pw_transfer/py/tests/python_cpp_transfer_test.py
index d43de66..74f50cb 100755
--- a/pw_transfer/py/tests/python_cpp_transfer_test.py
+++ b/pw_transfer/py/tests/python_cpp_transfer_test.py
@@ -110,6 +110,22 @@
self.manager.write(31, b'*' * 512)
self.assertEqual(self.get_content(31), '*' * 512)
+ def test_write_very_large_amount_of_data(self) -> None:
+ self.set_content(31, 'junk')
+
+ for _ in range(ITERATIONS):
+ # Larger than the transfer service's configured pending_bytes.
+ self.manager.write(31, b'*' * 4096)
+ self.assertEqual(self.get_content(31), '*' * 4096)
+
+ def test_write_string(self) -> None:
+ self.set_content(32, 'junk')
+
+ for _ in range(ITERATIONS):
+ # Write a string instead of bytes.
+ self.manager.write(32, 'hello world')
+ self.assertEqual(self.get_content(32), 'hello world')
+
def _main(test_server_command: List[str], port: int, unittest_args: List[str],
directory: str) -> None:
diff --git a/pw_transfer/transfer.cc b/pw_transfer/transfer.cc
index d1a4647..edf6845 100644
--- a/pw_transfer/transfer.cc
+++ b/pw_transfer/transfer.cc
@@ -230,6 +230,15 @@
transfer.set_offset(transfer.offset() + chunk.data.size());
transfer.set_pending_bytes(transfer.pending_bytes() - chunk.data.size());
chunk_data_processed = true;
+ } else {
+ // End the transfer, as this indcates a bug with the client implementation
+ // where it doesn't respect pending_bytes. Trying to recover from here
+ // could potentially result in an infinite transfer loop.
+ PW_LOG_ERROR(
+ "Received more data than what was requested; terminating transfer.");
+ SendStatusChunk(write_stream_, chunk.transfer_id, Status::Internal());
+ transfer.Finish(Status::Internal());
+ return;
}
} else {
// Bad offset; reset pending_bytes to send another parameters chunk.
@@ -261,10 +270,24 @@
parameters.pending_bytes = transfer.pending_bytes();
parameters.max_chunk_size_bytes = MaxWriteChunkSize(transfer);
- if (auto data =
- internal::EncodeChunk(parameters, write_stream_.PayloadBuffer());
- data.ok()) {
- write_stream_.Write(*data);
+ // If the parameters can't be encoded or sent, it most likely indicates a
+ // transport-layer issue, so there isn't much that can be done by the transfer
+ // service. The client will time out and can try to restart the transfer.
+ Result<ConstByteSpan> data =
+ internal::EncodeChunk(parameters, write_stream_.PayloadBuffer());
+ if (data.ok()) {
+ if (Status status = write_stream_.Write(*data); !status.ok()) {
+ PW_LOG_ERROR("Failed to write parameters for transfer %u: %d",
+ static_cast<unsigned>(parameters.transfer_id),
+ status.code());
+ transfer.Finish(Status::Internal());
+ }
+ } else {
+ PW_LOG_ERROR("Failed to encode parameters for transfer %u: %d",
+ static_cast<unsigned>(parameters.transfer_id),
+ data.status().code());
+ write_stream_.ReleaseBuffer();
+ transfer.Finish(Status::Internal());
}
}
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index e29fe3c..c1e6fd6 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -52,7 +52,7 @@
reader_(data) {}
Status PrepareRead() final {
- // reader_.Seek(0);
+ reader_.Seek(0);
set_reader(reader_);
prepare_read_called = true;
return OkStatus();
@@ -342,7 +342,7 @@
writer_(data) {}
Status PrepareWrite() final {
- // writer_.Seek(0);
+ writer_.Seek(0);
set_writer(writer_);
prepare_write_called = true;
return OkStatus();
@@ -584,6 +584,51 @@
EXPECT_EQ(std::memcmp(buffer.data(), data.data(), data.size()), 0);
}
+TEST(Transfer, Write_TooMuchData) {
+ constexpr auto data = bytes::Initialized<32>([](size_t i) { return i; });
+ std::array<std::byte, sizeof(data)> buffer = {};
+ SimpleWriteTransfer handler(7, buffer);
+
+ PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx(64, 16);
+ ctx.service().RegisterHandler(handler);
+
+ EXPECT_FALSE(handler.prepare_write_called);
+ EXPECT_FALSE(handler.finalize_write_called);
+
+ ctx.call();
+ ctx.SendClientStream(EncodeChunk({.transfer_id = 7}));
+
+ EXPECT_TRUE(handler.prepare_write_called);
+ EXPECT_FALSE(handler.finalize_write_called);
+
+ ASSERT_EQ(ctx.total_responses(), 1u);
+ Chunk chunk = DecodeChunk(ctx.responses()[0]);
+ EXPECT_EQ(chunk.transfer_id, 7u);
+ ASSERT_TRUE(chunk.pending_bytes.has_value());
+ EXPECT_EQ(chunk.pending_bytes.value(), 16u);
+
+ // pending_bytes = 16
+ ctx.SendClientStream<64>(EncodeChunk(
+ {.transfer_id = 7, .offset = 0, .data = std::span(data).first(8)}));
+ ASSERT_EQ(ctx.total_responses(), 1u);
+
+ // pending_bytes = 8
+ ctx.SendClientStream<64>(EncodeChunk(
+ {.transfer_id = 7, .offset = 8, .data = std::span(data).subspan(8, 4)}));
+ ASSERT_EQ(ctx.total_responses(), 1u);
+
+ // pending_bytes = 4 but send 8 instead
+ ctx.SendClientStream<64>(
+ EncodeChunk({.transfer_id = 7,
+ .offset = 12,
+ .data = std::span(data).subspan(12, 8)}));
+ ASSERT_EQ(ctx.total_responses(), 2u);
+ chunk = DecodeChunk(ctx.responses()[1]);
+ EXPECT_EQ(chunk.transfer_id, 7u);
+ ASSERT_TRUE(chunk.status.has_value());
+ EXPECT_EQ(chunk.status.value(), Status::Internal());
+}
+
TEST(Transfer, Write_UnregisteredHandler) {
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx(64, 64);