pw_transfer: Python client and C++ service fixes

- Client: respect the pending_bytes field from the transfer parameters
  when sending a chunk.
- Client: Notify the server if a timeout occurred so that it can cancel
  the transfer.
- Client: Don't invoke the end_transfer callback when killing all
  transfers due to an error to avoid modifying the transfer dictionary
  while iterating through it.
- Client: Accept string as well as bytes data for write transfers.
- Server: Terminate a transfer if the client sends more data than
  requested.
- Server: Log and clean up if write transfer parameters fail to send.

Change-Id: I89144c5f7e64d8b8e9ed4bb2718d8f73ff31a2e5
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/62143
Commit-Queue: Alexei Frolov <frolv@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py
index 144940c..4be04f7 100644
--- a/pw_transfer/py/pw_transfer/transfer.py
+++ b/pw_transfer/py/pw_transfer/transfer.py
@@ -18,7 +18,7 @@
 from dataclasses import dataclass
 import logging
 import threading
-from typing import Any, Callable, Dict, Optional
+from typing import Any, Callable, Dict, Optional, Union
 
 from pw_rpc.callback_client import BidirectionalStreamingCall
 from pw_status import Status
@@ -109,13 +109,15 @@
     def progress_stats(self) -> ProgressStats:
         """Returns the current progress of the transfer."""
 
-    def finish(self, status: Status) -> None:
+    def finish(self, status: Status, skip_callback: bool = False) -> None:
         """Ends the transfer with the specified status."""
         self._response_timer.stop()
         self.status = status
         self._invoke_progress_callback()
         self.done.set()
-        self._end_transfer(self)
+
+        if not skip_callback:
+            self._end_transfer(self)
 
     def _invoke_progress_callback(self) -> None:
         """Invokes the provided progress callback, if any, with the progress."""
@@ -143,9 +145,10 @@
         response_timeout_s: float,
         progress_callback: ProgressCallback = None,
     ):
-        super().__init__(transfer_id, data, send_chunk, end_transfer,
-                         _Timer(lambda: self.finish(Status.DEADLINE_EXCEEDED)),
-                         progress_callback)
+        super().__init__(
+            transfer_id, data, send_chunk, end_transfer,
+            _Timer(lambda: self._send_error(Status.DEADLINE_EXCEEDED)),
+            progress_callback)
 
         self._offset = 0
         self._bytes_acknowledged = 0
@@ -228,14 +231,13 @@
     def _next_chunk(self) -> Chunk:
         """Returns the next Chunk message to send in the data transfer."""
         chunk = Chunk(transfer_id=self.id, offset=self._offset)
+        max_bytes_in_chunk = min(self._max_chunk_size, self._max_bytes_to_send)
 
-        if len(self.data) - self._offset <= self._max_chunk_size:
-            # Final chunk of the transfer.
-            chunk.data = self.data[self._offset:]
+        chunk.data = self.data[self._offset:self._offset + max_bytes_in_chunk]
+
+        # Mark the final chunk of the transfer.
+        if len(self.data) - self._offset <= max_bytes_in_chunk:
             chunk.remaining_bytes = 0
-        else:
-            chunk.data = self.data[self._offset:self._offset +
-                                   self._max_chunk_size]
 
         return chunk
 
@@ -439,7 +441,7 @@
 
     def write(self,
               transfer_id: int,
-              data: bytes,
+              data: Union[bytes, str],
               progress_callback: ProgressCallback = None) -> None:
         """Transmits ("uploads") data to the server.
 
@@ -454,6 +456,9 @@
           Error: the transfer failed to complete
         """
 
+        if isinstance(data, str):
+            data = data.encode()
+
         if transfer_id in self._write_transfers:
             raise ValueError(f'Write transfer {transfer_id} already exists')
 
@@ -576,7 +581,7 @@
             self._read_stream = None
 
             for _, transfer in self._read_transfers.items():
-                transfer.finish(Status.INTERNAL)
+                transfer.finish(Status.INTERNAL, skip_callback=True)
             self._read_transfers = {}
 
             _LOG.error('Read stream shut down: %s', status)
@@ -603,7 +608,7 @@
             self._write_stream = None
 
             for _, transfer in self._write_transfers.items():
-                transfer.finish(Status.INTERNAL)
+                transfer.finish(Status.INTERNAL, skip_callback=True)
             self._write_transfers = {}
 
             _LOG.error('Write stream shut down: %s', status)
diff --git a/pw_transfer/py/tests/python_cpp_transfer_test.py b/pw_transfer/py/tests/python_cpp_transfer_test.py
index d43de66..74f50cb 100755
--- a/pw_transfer/py/tests/python_cpp_transfer_test.py
+++ b/pw_transfer/py/tests/python_cpp_transfer_test.py
@@ -110,6 +110,22 @@
             self.manager.write(31, b'*' * 512)
             self.assertEqual(self.get_content(31), '*' * 512)
 
+    def test_write_very_large_amount_of_data(self) -> None:
+        self.set_content(31, 'junk')
+
+        for _ in range(ITERATIONS):
+            # Larger than the transfer service's configured pending_bytes.
+            self.manager.write(31, b'*' * 4096)
+            self.assertEqual(self.get_content(31), '*' * 4096)
+
+    def test_write_string(self) -> None:
+        self.set_content(32, 'junk')
+
+        for _ in range(ITERATIONS):
+            # Write a string instead of bytes.
+            self.manager.write(32, 'hello world')
+            self.assertEqual(self.get_content(32), 'hello world')
+
 
 def _main(test_server_command: List[str], port: int, unittest_args: List[str],
           directory: str) -> None:
diff --git a/pw_transfer/transfer.cc b/pw_transfer/transfer.cc
index d1a4647..edf6845 100644
--- a/pw_transfer/transfer.cc
+++ b/pw_transfer/transfer.cc
@@ -230,6 +230,15 @@
       transfer.set_offset(transfer.offset() + chunk.data.size());
       transfer.set_pending_bytes(transfer.pending_bytes() - chunk.data.size());
       chunk_data_processed = true;
+    } else {
+      // End the transfer, as this indcates a bug with the client implementation
+      // where it doesn't respect pending_bytes. Trying to recover from here
+      // could potentially result in an infinite transfer loop.
+      PW_LOG_ERROR(
+          "Received more data than what was requested; terminating transfer.");
+      SendStatusChunk(write_stream_, chunk.transfer_id, Status::Internal());
+      transfer.Finish(Status::Internal());
+      return;
     }
   } else {
     // Bad offset; reset pending_bytes to send another parameters chunk.
@@ -261,10 +270,24 @@
   parameters.pending_bytes = transfer.pending_bytes();
   parameters.max_chunk_size_bytes = MaxWriteChunkSize(transfer);
 
-  if (auto data =
-          internal::EncodeChunk(parameters, write_stream_.PayloadBuffer());
-      data.ok()) {
-    write_stream_.Write(*data);
+  // If the parameters can't be encoded or sent, it most likely indicates a
+  // transport-layer issue, so there isn't much that can be done by the transfer
+  // service. The client will time out and can try to restart the transfer.
+  Result<ConstByteSpan> data =
+      internal::EncodeChunk(parameters, write_stream_.PayloadBuffer());
+  if (data.ok()) {
+    if (Status status = write_stream_.Write(*data); !status.ok()) {
+      PW_LOG_ERROR("Failed to write parameters for transfer %u: %d",
+                   static_cast<unsigned>(parameters.transfer_id),
+                   status.code());
+      transfer.Finish(Status::Internal());
+    }
+  } else {
+    PW_LOG_ERROR("Failed to encode parameters for transfer %u: %d",
+                 static_cast<unsigned>(parameters.transfer_id),
+                 data.status().code());
+    write_stream_.ReleaseBuffer();
+    transfer.Finish(Status::Internal());
   }
 }
 
diff --git a/pw_transfer/transfer_test.cc b/pw_transfer/transfer_test.cc
index e29fe3c..c1e6fd6 100644
--- a/pw_transfer/transfer_test.cc
+++ b/pw_transfer/transfer_test.cc
@@ -52,7 +52,7 @@
         reader_(data) {}
 
   Status PrepareRead() final {
-    // reader_.Seek(0);
+    reader_.Seek(0);
     set_reader(reader_);
     prepare_read_called = true;
     return OkStatus();
@@ -342,7 +342,7 @@
         writer_(data) {}
 
   Status PrepareWrite() final {
-    // writer_.Seek(0);
+    writer_.Seek(0);
     set_writer(writer_);
     prepare_write_called = true;
     return OkStatus();
@@ -584,6 +584,51 @@
   EXPECT_EQ(std::memcmp(buffer.data(), data.data(), data.size()), 0);
 }
 
+TEST(Transfer, Write_TooMuchData) {
+  constexpr auto data = bytes::Initialized<32>([](size_t i) { return i; });
+  std::array<std::byte, sizeof(data)> buffer = {};
+  SimpleWriteTransfer handler(7, buffer);
+
+  PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx(64, 16);
+  ctx.service().RegisterHandler(handler);
+
+  EXPECT_FALSE(handler.prepare_write_called);
+  EXPECT_FALSE(handler.finalize_write_called);
+
+  ctx.call();
+  ctx.SendClientStream(EncodeChunk({.transfer_id = 7}));
+
+  EXPECT_TRUE(handler.prepare_write_called);
+  EXPECT_FALSE(handler.finalize_write_called);
+
+  ASSERT_EQ(ctx.total_responses(), 1u);
+  Chunk chunk = DecodeChunk(ctx.responses()[0]);
+  EXPECT_EQ(chunk.transfer_id, 7u);
+  ASSERT_TRUE(chunk.pending_bytes.has_value());
+  EXPECT_EQ(chunk.pending_bytes.value(), 16u);
+
+  // pending_bytes = 16
+  ctx.SendClientStream<64>(EncodeChunk(
+      {.transfer_id = 7, .offset = 0, .data = std::span(data).first(8)}));
+  ASSERT_EQ(ctx.total_responses(), 1u);
+
+  // pending_bytes = 8
+  ctx.SendClientStream<64>(EncodeChunk(
+      {.transfer_id = 7, .offset = 8, .data = std::span(data).subspan(8, 4)}));
+  ASSERT_EQ(ctx.total_responses(), 1u);
+
+  // pending_bytes = 4 but send 8 instead
+  ctx.SendClientStream<64>(
+      EncodeChunk({.transfer_id = 7,
+                   .offset = 12,
+                   .data = std::span(data).subspan(12, 8)}));
+  ASSERT_EQ(ctx.total_responses(), 2u);
+  chunk = DecodeChunk(ctx.responses()[1]);
+  EXPECT_EQ(chunk.transfer_id, 7u);
+  ASSERT_TRUE(chunk.status.has_value());
+  EXPECT_EQ(chunk.status.value(), Status::Internal());
+}
+
 TEST(Transfer, Write_UnregisteredHandler) {
   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx(64, 64);