pw_transfer: Set RPC streams directly

Rather than passing the RPC stream to the transfer thread to be moved,
set it directly. pw_rpc call objects are synchronized by pw_rpc, so it
is safe to move them between threads.

With upcoming pw_rpc changes, a thread will wait until an RPC call's
callbacks finish before moving the call object. This could cause
deadlocks in pw_transfer if a packet arrives immediately after the
stream starts, before the transfer thread gets a chance to move the call
object to its final location. The RPC thread would wait for the next
event to be available in the callback, while the transfer thread would
wait to move the new call object out of the next event until the
callback completed, resulting in deadlock. This change avoids this
issue without needing to drop any packets.

Change-Id: I24b088e36b7712ceda042cdbe80e0b05dec480b7
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126924
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_transfer/context.cc b/pw_transfer/context.cc
index 2e2d77b..2268639 100644
--- a/pw_transfer/context.cc
+++ b/pw_transfer/context.cc
@@ -75,7 +75,6 @@
       return;
 
     case EventType::kSendStatusChunk:
-    case EventType::kSetTransferStream:
     case EventType::kAddTransferHandler:
     case EventType::kRemoveTransferHandler:
     case EventType::kTerminate:
diff --git a/pw_transfer/public/pw_transfer/internal/event.h b/pw_transfer/public/pw_transfer/internal/event.h
index 61fc195..79a7853 100644
--- a/pw_transfer/public/pw_transfer/internal/event.h
+++ b/pw_transfer/public/pw_transfer/internal/event.h
@@ -55,9 +55,6 @@
   // transfer context's completion handler; it is for out-of-band termination.
   kSendStatusChunk,
 
-  // Updates one of the transfer thread's RPC streams.
-  kSetTransferStream,
-
   // Manages the list of transfer handlers for a transfer service.
   kAddTransferHandler,
   kRemoveTransferHandler,
@@ -126,7 +123,6 @@
     ChunkEvent chunk;
     EndTransferEvent end_transfer;
     SendStatusChunkEvent send_status_chunk;
-    TransferStream set_transfer_stream;
     Handler* add_transfer_handler;
     Handler* remove_transfer_handler;
   };
diff --git a/pw_transfer/public/pw_transfer/transfer_thread.h b/pw_transfer/public/pw_transfer/transfer_thread.h
index 6663c7d..5904c95 100644
--- a/pw_transfer/public/pw_transfer/transfer_thread.h
+++ b/pw_transfer/public/pw_transfer/transfer_thread.h
@@ -114,20 +114,23 @@
         EventType::kServerEndTransfer, session_id, status, send_status_chunk);
   }
 
+  // Move the read/write streams on this thread instead of the transfer thread.
+  // RPC call objects are synchronized by pw_rpc, so this move will be atomic
+  // with respect to the transfer thread.
   void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) {
-    SetClientStream(TransferStream::kClientRead, read_stream);
+    client_read_stream_ = std::move(read_stream);
   }
 
   void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) {
-    SetClientStream(TransferStream::kClientWrite, write_stream);
+    client_write_stream_ = std::move(write_stream);
   }
 
   void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) {
-    SetServerStream(TransferStream::kServerRead, read_stream);
+    server_read_stream_ = std::move(read_stream);
   }
 
   void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) {
-    SetServerStream(TransferStream::kServerWrite, write_stream);
+    server_write_stream_ = std::move(write_stream);
   }
 
   void AddTransferHandler(Handler& handler) {
@@ -260,9 +263,6 @@
                    Status status,
                    bool send_status_chunk);
 
-  void SetClientStream(TransferStream type, rpc::RawClientReaderWriter& stream);
-  void SetServerStream(TransferStream type, rpc::RawServerReaderWriter& stream);
-
   void TransferHandlerEvent(EventType type, Handler& handler);
 
   void HandleEvent(const Event& event);
@@ -275,8 +275,6 @@
 
   Event next_event_;
   Function<void(Status)> staged_on_completion_;
-  rpc::RawClientReaderWriter staged_client_stream_;
-  rpc::RawServerReaderWriter staged_server_stream_;
 
   rpc::RawClientReaderWriter client_read_stream_;
   rpc::RawClientReaderWriter client_write_stream_;
diff --git a/pw_transfer/transfer_thread.cc b/pw_transfer/transfer_thread.cc
index 4f115ab..c0d03ef 100644
--- a/pw_transfer/transfer_thread.cc
+++ b/pw_transfer/transfer_thread.cc
@@ -221,30 +221,6 @@
   event_notification_.release();
 }
 
-void TransferThread::SetClientStream(TransferStream type,
-                                     rpc::RawClientReaderWriter& stream) {
-  // Block until the last event has been processed.
-  next_event_ownership_.acquire();
-
-  next_event_.type = EventType::kSetTransferStream;
-  next_event_.set_transfer_stream = type;
-  staged_client_stream_ = std::move(stream);
-
-  event_notification_.release();
-}
-
-void TransferThread::SetServerStream(TransferStream type,
-                                     rpc::RawServerReaderWriter& stream) {
-  // Block until the last event has been processed.
-  next_event_ownership_.acquire();
-
-  next_event_.type = EventType::kSetTransferStream;
-  next_event_.set_transfer_stream = type;
-  staged_server_stream_ = std::move(stream);
-
-  event_notification_.release();
-}
-
 void TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
   // Block until the last event has been processed.
   next_event_ownership_.acquire();
@@ -299,26 +275,6 @@
       SendStatusChunk(event.send_status_chunk);
       break;
 
-    case EventType::kSetTransferStream:
-      switch (event.set_transfer_stream) {
-        case TransferStream::kClientRead:
-          client_read_stream_ = std::move(staged_client_stream_);
-          break;
-
-        case TransferStream::kClientWrite:
-          client_write_stream_ = std::move(staged_client_stream_);
-          break;
-
-        case TransferStream::kServerRead:
-          server_read_stream_ = std::move(staged_server_stream_);
-          break;
-
-        case TransferStream::kServerWrite:
-          server_write_stream_ = std::move(staged_server_stream_);
-          break;
-      }
-      return;
-
     case EventType::kAddTransferHandler:
       handlers_.push_front(*event.add_transfer_handler);
       return;
@@ -423,7 +379,6 @@
                                           event.end_transfer.session_id);
 
     case EventType::kSendStatusChunk:
-    case EventType::kSetTransferStream:
     case EventType::kAddTransferHandler:
     case EventType::kRemoveTransferHandler:
     case EventType::kTerminate: