pw_rpc: Fix callback synchronization issues

- Move the on_next callback to a local variable while the lock is
  held, then invoke it. Restore on_next when done, if the call has not
  been closed or on_next changed since.
- Track whether callbacks are executing in the call object.
- If a new stream packet arrives while another callback is running, drop
  it since the on_next callback is not available.
- Hold the lock while invoking on_next or on_completed callback wrappers
  that decode to a Nanopb or pwpb struct.
- Guard the serde objects with the RPC lock.
- Wait until callbacks complete before moving or destroying a call
  object. This helps prevent dangling references in RPC callbacks.

Bug: b/234876851, b/262774186
Change-Id: I9d853001dc27abef0c1fade0057d1da635f622cb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126919
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Khalil Estell <kammce@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index 004c60d..303c5ed 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -117,6 +117,7 @@
         "//pw_containers:intrusive_list",
         "//pw_function",
         "//pw_log",
+        "//pw_preprocessor",
         "//pw_result",
         "//pw_span",
         "//pw_status",
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 1c7bbdb..1b2fced 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -110,6 +110,7 @@
   deps = [
     ":log_config",
     dir_pw_log,
+    dir_pw_preprocessor,
   ]
   public = [
     "public/pw_rpc/client.h",
diff --git a/pw_rpc/CMakeLists.txt b/pw_rpc/CMakeLists.txt
index 2d55a68..f4496cc 100644
--- a/pw_rpc/CMakeLists.txt
+++ b/pw_rpc/CMakeLists.txt
@@ -154,6 +154,7 @@
     packet_meta.cc
   PRIVATE_DEPS
     pw_log
+    pw_preprocessor
     pw_rpc.log_config
 )
 if(NOT "${pw_sync.mutex_BACKEND}" STREQUAL "")
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index e96fd79..db93311 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -15,11 +15,35 @@
 #include "pw_rpc/internal/call.h"
 
 #include "pw_assert/check.h"
+#include "pw_log/log.h"
+#include "pw_preprocessor/util.h"
 #include "pw_rpc/client.h"
 #include "pw_rpc/internal/endpoint.h"
 #include "pw_rpc/internal/method.h"
 #include "pw_rpc/server.h"
 
+// If the callback timeout is enabled, count the number of iterations of the
+// waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
+#if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
+#define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) \
+  iterations += 1;                                \
+  PW_CHECK(                                                                  \
+      iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS,                            \
+      "A callback for RPC %u:%08x/%08x has not finished after "              \
+      PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS)                            \
+      " ticks. This may indicate that an RPC callback attempted to "         \
+      timeout_source                                                         \
+      " its own call object, which is not permitted. Fix this condition or " \
+      "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this "     \
+      "crash. See https://pigweed.dev/pw_rpc"                                \
+      "#destructors-moves-wait-for-callbacks-to-complete for details.",      \
+      static_cast<unsigned>(channel_id_),                                    \
+      static_cast<unsigned>(service_id_),                                    \
+      static_cast<unsigned>(method_id_))
+#else
+#define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) static_cast<void>(iterations)
+#endif  // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
+
 namespace pw::rpc::internal {
 
 using pwpb::PacketType;
@@ -61,6 +85,7 @@
       client_stream_state_(HasClientStream(properties.method_type())
                                ? kClientStreamActive
                                : kClientStreamInactive),
+      callbacks_executing_(0),
       properties_(properties) {
   PW_CHECK_UINT_NE(channel_id,
                    Channel::kUnassignedChannelId,
@@ -82,6 +107,14 @@
   if (active_locked()) {
     endpoint().UnregisterCall(*this);
   }
+
+  // Help prevent dangling references in callbacks by waiting for callbacks to
+  // complete before deleting this call.
+  int iterations = 0;
+  while (CallbacksAreRunning()) {
+    PW_RPC_CHECK_FOR_DEADLOCK("destroy");
+    YieldRpcLock();
+  }
 }
 
 void Call::MoveFrom(Call& other) {
@@ -91,6 +124,10 @@
     return;  // Nothing else to do; this call is already closed.
   }
 
+  // An active call with an executing callback cannot be moved. Derived call
+  // classes must wait for callbacks to finish before calling MoveFrom.
+  PW_DCHECK(!other.CallbacksAreRunning());
+
   // Copy all members from the other call.
   endpoint_ = other.endpoint_;
   channel_id_ = other.channel_id_;
@@ -102,6 +139,9 @@
   client_stream_state_ = other.client_stream_state_;
   properties_ = other.properties_;
 
+  // callbacks_executing_ is not moved since it is associated with the object in
+  // memory, not the call.
+
   on_error_ = std::move(other.on_error_);
   on_next_ = std::move(other.on_next_);
 
@@ -112,6 +152,29 @@
   endpoint().RegisterUniqueCall(*this);
 }
 
+void Call::WaitUntilReadyToBeMoved() const {
+  int iterations = 0;
+  while (CallbacksAreRunning() && active_locked()) {
+    PW_RPC_CHECK_FOR_DEADLOCK("move");
+    YieldRpcLock();
+  }
+}
+
+void Call::CallOnError(Status error) {
+  auto on_error_local = std::move(on_error_);
+
+  CallbackStarted();
+
+  rpc_lock().unlock();
+  if (on_error_local) {
+    on_error_local(error);
+  }
+
+  // This mutex lock could be avoided by making callbacks_executing_ atomic.
+  LockGuard lock(rpc_lock());
+  CallbackFinished();
+}
+
 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
   if (!active_locked()) {
     return Status::FailedPrecondition();
@@ -139,6 +202,53 @@
                     payload);
 }
 
+void Call::HandlePayload(ConstByteSpan payload) {
+  // pw_rpc only supports handling packets for a particular RPC one at a time.
+  // Check if any callbacks are running and drop the packet if they are.
+  //
+  // The on_next callback cannot support multiple packets at once since it is
+  // moved before it is invoked. on_error and on_completed are only called
+  // after the call is closed.
+  if (CallbacksAreRunning()) {
+    PW_LOG_WARN(
+        "Received stream packet for %u:%08x/%08x before the callback for a "
+        "previous packet completed! This packet will be dropped. This can be "
+        "avoided by handling packets for a particular RPC on only one thread.",
+        static_cast<unsigned>(channel_id_),
+        static_cast<unsigned>(service_id_),
+        static_cast<unsigned>(method_id_));
+    rpc_lock().unlock();
+    return;
+  }
+
+  if (on_next_ == nullptr) {
+    rpc_lock().unlock();
+    return;
+  }
+
+  const uint32_t original_id = id();
+  auto on_next_local = std::move(on_next_);
+  CallbackStarted();
+
+  if (hold_lock_while_invoking_callback_with_payload()) {
+    on_next_local(payload);
+  } else {
+    rpc_lock().unlock();
+    on_next_local(payload);
+    rpc_lock().lock();
+  }
+
+  CallbackFinished();
+
+  // Restore the original callback if the original call is still active and
+  // the callback has not been replaced.
+  // NOLINTNEXTLINE(bugprone-use-after-move)
+  if (active_locked() && id() == original_id && on_next_ == nullptr) {
+    on_next_ = std::move(on_next_local);
+  }
+  rpc_lock().unlock();
+}
+
 void Call::UnregisterAndMarkClosed() {
   if (active_locked()) {
     endpoint().UnregisterCall(*this);
diff --git a/pw_rpc/callback_test.cc b/pw_rpc/callback_test.cc
index 0ff915e..6adf040 100644
--- a/pw_rpc/callback_test.cc
+++ b/pw_rpc/callback_test.cc
@@ -46,7 +46,7 @@
 
   ~CallbacksTest() override {
     EXPECT_FALSE(callback_thread_.joinable());  // Tests must join the thread!
-    EXPECT_TRUE(callback_executed_);
+    EXPECT_GT(callback_executed_, 0);
   }
 
   void RespondToCall(const RawClientReaderWriter& call) {
@@ -60,7 +60,7 @@
   thread::Thread callback_thread_;
 
   // Must be set to true by the RPC callback in each test.
-  volatile bool callback_executed_ = false;
+  volatile int callback_executed_ = 0;
 
   // Variables optionally used by tests. These are in this object so lambads
   // only need to capture [this] to access them.
@@ -81,7 +81,7 @@
   const RawClientReaderWriter* respond_to_call_ = &call_1_;
 };
 
-TEST_F(CallbacksTest, DISABLED_DestructorWaitsUntilCallbacksComplete) {
+TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
   {
     RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
         context_.client(), context_.channel().id());
@@ -99,7 +99,7 @@
       // block in the call's destructor until this callback completes.
       EXPECT_TRUE(call_is_in_scope_);
 
-      callback_executed_ = true;
+      callback_executed_ = callback_executed_ + 1;
     });
 
     // Start the callback thread so it can invoke the callback.
@@ -116,17 +116,17 @@
   // Wait for the callback thread to finish.
   callback_thread_.join();
 
-  EXPECT_TRUE(callback_executed_);
+  EXPECT_EQ(callback_executed_, 1);
 }
 
-TEST_F(CallbacksTest, DISABLED_MoveActiveCall_WaitsForCallbackToComplete) {
+TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
   call_1_ = TestService::TestBidirectionalStreamRpc(
       context_.client(), context_.channel().id(), [this](ConstByteSpan) {
         main_thread_sem_.release();  // Confirm that this thread started
 
         YieldToOtherThread();
 
-        callback_executed_ = true;
+        callback_executed_ = callback_executed_ + 1;
       });
 
   // Start the callback thread so it can invoke the callback.
@@ -142,7 +142,7 @@
 
   // The callback should already have finished. This thread should have waited
   // for it to finish during the move.
-  EXPECT_TRUE(callback_executed_);
+  EXPECT_EQ(callback_executed_, 1);
   EXPECT_FALSE(call_1_.active());
   EXPECT_TRUE(call_2_.active());
 
@@ -156,7 +156,7 @@
 
         call_1_ = std::move(call_2_);
 
-        callback_executed_ = true;
+        callback_executed_ = callback_executed_ + 1;
       });
 
   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
@@ -184,7 +184,7 @@
         EXPECT_EQ(OkStatus(), call_1_.Cancel());
         call_2_ = std::move(call_1_);
 
-        callback_executed_ = true;
+        callback_executed_ = callback_executed_ + 1;
       });
 
   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
@@ -201,5 +201,33 @@
   EXPECT_FALSE(call_2_.active());
 }
 
+TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
+  call_1_ = TestService::TestBidirectionalStreamRpc(
+      context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+        main_thread_sem_.release();  // Confirm that this thread started
+
+        callback_thread_sem_.acquire();  // Wait for the main thread to release
+
+        callback_executed_ = callback_executed_ + 1;
+      });
+
+  // Start the callback thread.
+  callback_thread_sem_.release();
+
+  main_thread_sem_.acquire();  // Confirm that the callback is running
+
+  // Handle a few packets for this call, which should be dropped.
+  for (int i = 0; i < 5; ++i) {
+    context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
+        {}, call_1_.id());
+  }
+
+  // Wait for the callback thread to finish.
+  callback_thread_sem_.release();
+  callback_thread_.join();
+
+  EXPECT_EQ(callback_executed_, 1);
+}
+
 }  // namespace
 }  // namespace pw::rpc
diff --git a/pw_rpc/client_call.cc b/pw_rpc/client_call.cc
index d8e57e8..ad5fd21 100644
--- a/pw_rpc/client_call.cc
+++ b/pw_rpc/client_call.cc
@@ -23,4 +23,49 @@
   UnregisterAndMarkClosed();
 }
 
+void ClientCall::MoveClientCallFrom(ClientCall& other)
+    PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+  other.WaitUntilReadyToBeMoved();
+  CloseClientCall();
+  MoveFrom(other);
+}
+
+void UnaryResponseClientCall::HandleCompleted(
+    ConstByteSpan response, Status status) PW_NO_LOCK_SAFETY_ANALYSIS {
+  UnregisterAndMarkClosed();
+
+  auto on_completed_local = std::move(on_completed_);
+  CallbackStarted();
+
+  // The lock is only released when calling into user code. If the callback is
+  // wrapped, this on_completed is an internal function that expects the lock to
+  // be held, and releases it before invoking user code.
+  if (!hold_lock_while_invoking_callback_with_payload()) {
+    rpc_lock().unlock();
+  }
+
+  if (on_completed_local) {
+    on_completed_local(response, status);
+  }
+
+  // This mutex lock could be avoided by making callbacks_executing_ atomic.
+  LockGuard lock(rpc_lock());
+  CallbackFinished();
+}
+
+void StreamResponseClientCall::HandleCompleted(Status status) {
+  UnregisterAndMarkClosed();
+  auto on_completed_local = std::move(on_completed_);
+  CallbackStarted();
+  rpc_lock().unlock();
+
+  if (on_completed_local) {
+    on_completed_local(status);
+  }
+
+  // This mutex lock could be avoided by making callbacks_executing_ atomic.
+  LockGuard lock(rpc_lock());
+  CallbackFinished();
+}
+
 }  // namespace pw::rpc::internal
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 133f413..b8beb25 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -1569,7 +1569,8 @@
 
   - ``PW_RPC_YIELD_MODE_BUSY_LOOP``. Do nothing. Release and reacquire the RPC
     lock in a busy loop. :c:macro:`PW_RPC_USE_GLOBAL_MUTEX` must be 0 as well.
-  - ``PW_RPC_YIELD_MODE_SLEEP``. Yield with 1-tick calls to
+  - ``PW_RPC_YIELD_MODE_SLEEP``. Yield with repeated
+    :c:macro:`PW_RPC_YIELD_SLEEP_DURATION`-length calls to
     :cpp:func:`pw::this_thread::sleep_for`. A backend must be configured for
     pw_thread:sleep.
   - ``PW_RPC_YIELD_MODE_YIELD``. Yield with :cpp:func:`pw::this_thread::yield`.
@@ -1577,6 +1578,13 @@
     platforms, :cpp:func:`pw::this_thread::yield` does not yield to lower
     priority tasks and should not be used here.
 
+.. c:macro:: PW_RPC_YIELD_SLEEP_DURATION
+
+  If :c:macro:`PW_RPC_YIELD_MODE` is :c:macro:`PW_RPC_YIELD_MODE_SLEEP`,
+  ``PW_RPC_YIELD_SLEEP_DURATION`` sets how long to sleep during each iteration
+  of the yield loop. The value must be a constant expression that converts to a
+  :cpp:type:`pw::chrono::SystemClock::duration`.
+
 .. c:macro:: PW_RPC_CALLBACK_TIMEOUT_TICKS
 
   pw_rpc call objects wait for their callbacks to complete before they are moved
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
index a3d922c..7fbccc9 100644
--- a/pw_rpc/endpoint.cc
+++ b/pw_rpc/endpoint.cc
@@ -18,6 +18,8 @@
 #include "pw_rpc/internal/endpoint.h"
 // clang-format on
 
+#include <chrono>
+
 #include "pw_log/log.h"
 #include "pw_rpc/internal/lock.h"
 #include "pw_toolchain/no_destructor.h"
@@ -80,7 +82,9 @@
 void YieldRpcLock() {
   rpc_lock().unlock();
 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
-  this_thread::sleep_for(chrono::SystemClock::duration(1));
+  static constexpr chrono::SystemClock::duration kSleepDuration =
+      PW_RPC_YIELD_SLEEP_DURATION;
+  this_thread::sleep_for(kSleepDuration);
 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
   this_thread::yield();
 #endif  // PW_RPC_YIELD_MODE
@@ -126,9 +130,9 @@
   calls_.push_front(call);
 
   if (existing_call != nullptr) {
-    // TODO(b/234876851): Ensure call object is locked when calling callback.
-    //     For on_error, could potentially move the callback and call it after
-    //     the lock is released.
+    // TODO(b/260922913): The HandleError() call needs to be deferred to avoid
+    //   releasing the lock before finishing state updates. Could move the call
+    //   to the planned calls_to_abort list and clean up later.
     existing_call->HandleError(Status::Cancelled());
     rpc_lock().lock();
   }
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index 8eb7afd..4216420 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -101,24 +101,14 @@
     nanopb_on_completed_ = std::move(on_completed);
 
     UnaryResponseClientCall::set_on_completed_locked(
-        [this](ConstByteSpan payload, Status status) {
-          rpc_lock().lock();
-          auto nanopb_on_completed_local = std::move(nanopb_on_completed_);
-          rpc_lock().unlock();
-
-          if (nanopb_on_completed_local) {
-            Response response_struct{};
-            if (serde_->response().Decode(payload, &response_struct).ok()) {
-              nanopb_on_completed_local(response_struct, status);
-            } else {
-              rpc_lock().lock();
-              HandleError(Status::DataLoss());
-            }
-          }
-        });
+        [this](ConstByteSpan payload, Status status)
+            PW_NO_LOCK_SAFETY_ANALYSIS {
+              DecodeToStructAndInvokeOnCompleted(
+                  payload, serde_->response(), nanopb_on_completed_, status);
+            });
   }
 
-  const NanopbMethodSerde* serde_;
+  const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
   Function<void(const Response&, Status)> nanopb_on_completed_
       PW_GUARDED_BY(rpc_lock());
 };
@@ -199,23 +189,15 @@
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     nanopb_on_next_ = std::move(on_next);
 
-    internal::Call::set_on_next_locked([this](ConstByteSpan payload) {
-      if (nanopb_on_next_) {
-        Response response_struct{};
-        if (serde_->response().Decode(payload, &response_struct).ok()) {
-          nanopb_on_next_(response_struct);
-        } else {
-          // TODO(hepler): This should send a DATA_LOSS error and call the
-          //     error callback.
-          rpc_lock().lock();
-          HandleError(Status::DataLoss());
-        }
-      }
-    });
+    Call::set_on_next_locked(
+        [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+          DecodeToStructAndInvokeOnNext(
+              payload, serde_->response(), nanopb_on_next_);
+        });
   }
 
-  const NanopbMethodSerde* serde_;
-  Function<void(const Response&)> nanopb_on_next_;
+  const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
+  Function<void(const Response&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
 };
 
 }  // namespace internal
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index 02cd8df..a9e6994 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -51,7 +51,10 @@
     return SendFinalResponse(*this, payload, status);
   }
 
-  const NanopbMethodSerde& serde() const { return *serde_; }
+  const NanopbMethodSerde& serde() const
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    return *serde_;
+  }
 
  protected:
   NanopbServerCall(NanopbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -77,7 +80,7 @@
   }
 
  private:
-  const NanopbMethodSerde* serde_;
+  const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
 };
 
 // The BaseNanopbServerReader serves as the base for the ServerReader and
@@ -119,17 +122,14 @@
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     nanopb_on_next_ = std::move(on_next);
 
-    internal::Call::set_on_next_locked([this](ConstByteSpan payload) {
-      if (nanopb_on_next_) {
-        Request request_struct{};
-        if (serde().request().Decode(payload, &request_struct).ok()) {
-          nanopb_on_next_(request_struct);
-        }
-      }
-    });
+    Call::set_on_next_locked(
+        [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+          DecodeToStructAndInvokeOnNext(
+              payload, serde().request(), nanopb_on_next_);
+        });
   }
 
-  Function<void(const Request&)> nanopb_on_next_;
+  Function<void(const Request&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
 };
 
 }  // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 2a3b79a..fef9fa8 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -214,16 +214,7 @@
   // Whenever a payload arrives (in a server/client stream or in a response),
   // call the on_next_ callback.
   // Precondition: rpc_lock() must be held.
-  void HandlePayload(ConstByteSpan message) const
-      PW_UNLOCK_FUNCTION(rpc_lock()) {
-    const bool invoke = on_next_ != nullptr;
-    // TODO(b/234876851): Ensure on_next_ is properly guarded.
-    rpc_lock().unlock();
-
-    if (invoke) {
-      on_next_(message);
-    }
-  }
+  void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
 
   // Handles an error condition for the call. This closes the call and calls the
   // on_error callback, if set.
@@ -268,6 +259,7 @@
         method_id_{},
         rpc_state_{},
         client_stream_state_{},
+        callbacks_executing_{},
         properties_{} {}
 
   // Creates an active server-side Call.
@@ -281,6 +273,14 @@
        uint32_t method_id,
        CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
+  void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    callbacks_executing_ += 1;
+  }
+
+  void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    callbacks_executing_ -= 1;
+  }
+
   // This call must be in a closed state when this is called.
   void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
@@ -340,6 +340,85 @@
   constexpr operator Writer&();
   constexpr operator const Writer&() const;
 
+  // Indicates if the on_next and unary on_completed callbacks are internal
+  // wrappers that decode the raw proto before invoking the user's callback. If
+  // they are, the lock must be held when they are invoked.
+  bool hold_lock_while_invoking_callback_with_payload() const
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    return properties_.callback_proto_type() == kProtoStruct;
+  }
+
+  // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
+  // pwpb or Nanopb version of the on_next callback.
+  //
+  // This must ONLY be called from derived classes the wrap the on_next
+  // callback. These classes MUST indicate that they call calls in their
+  // constructor.
+  template <typename Decoder, typename ProtoStruct>
+  void DecodeToStructAndInvokeOnNext(
+      ConstByteSpan payload,
+      const Decoder& decoder,
+      Function<void(const ProtoStruct&)>& proto_on_next)
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    if (proto_on_next == nullptr) {
+      return;
+    }
+
+    ProtoStruct proto_struct{};
+
+    if (!decoder.Decode(payload, proto_struct).ok()) {
+      HandleError(Status::DataLoss());
+      rpc_lock().lock();
+      return;
+    }
+
+    const uint32_t original_id = id();
+    auto proto_on_next_local = std::move(proto_on_next);
+
+    rpc_lock().unlock();
+    proto_on_next_local(proto_struct);
+    rpc_lock().lock();
+
+    // Restore the original callback if the original call is still active and
+    // the callback has not been replaced.
+    // NOLINTNEXTLINE(bugprone-use-after-move)
+    if (active_locked() && id() == original_id && proto_on_next == nullptr) {
+      proto_on_next = std::move(proto_on_next_local);
+    }
+  }
+
+  // The call is already unregistered and closed.
+  template <typename Decoder, typename ProtoStruct>
+  void DecodeToStructAndInvokeOnCompleted(
+      ConstByteSpan payload,
+      const Decoder& decoder,
+      Function<void(const ProtoStruct&, Status)>& proto_on_completed,
+      Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
+    // Always move proto_on_completed so it goes out of scope in this function.
+    auto proto_on_completed_local = std::move(proto_on_completed);
+
+    // Move on_error in case an error occurs.
+    auto on_error_local = std::move(on_error_);
+
+    // Release the lock before decoding, since decoder is a global.
+    rpc_lock().unlock();
+
+    if (proto_on_completed_local == nullptr) {
+      return;
+    }
+
+    ProtoStruct proto_struct{};
+    if (decoder.Decode(payload, proto_struct).ok()) {
+      proto_on_completed_local(proto_struct, status);
+    } else if (on_error_local != nullptr) {
+      on_error_local(Status::DataLoss());
+    }
+  }
+
+  // An active call cannot be moved if its callbacks are running. This function
+  // must be called on the call being moved before updating any state.
+  void WaitUntilReadyToBeMoved() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+
  private:
   // Common constructor for server & client calls.
   Call(LockedEndpoint& endpoint,
@@ -371,14 +450,7 @@
 
   // Calls the on_error callback without closing the RPC. This is used when the
   // call has already completed.
-  void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) {
-    auto on_error_local = std::move(on_error_);
-
-    rpc_lock().unlock();
-    if (on_error_local) {
-      on_error_local(error);
-    }
-  }
+  void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
 
   // Sends a payload with the specified type. The payload may either be in a
   // previously acquired buffer or in a standalone buffer.
@@ -394,6 +466,10 @@
                                        Status status)
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
+  bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    return callbacks_executing_ != 0u;
+  }
+
   internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
   uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
   uint32_t id_ PW_GUARDED_BY(rpc_lock());
@@ -405,6 +481,11 @@
     kClientStreamInactive,
     kClientStreamActive,
   } client_stream_state_ PW_GUARDED_BY(rpc_lock());
+
+  // Tracks how many of this call's callbacks are running. Must be 0 for the
+  // call to be destroyed.
+  uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
+
   CallProperties properties_ PW_GUARDED_BY(rpc_lock());
 
   // Called when the RPC is terminated due to an error.
@@ -412,7 +493,7 @@
 
   // Called when a request is received. Only used for RPCs with client streams.
   // The raw payload buffer is passed to the callback.
-  Function<void(ConstByteSpan payload)> on_next_;
+  Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
 };
 
 }  // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index 3efe98e..1f671e1 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -64,10 +64,7 @@
   void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 
   void MoveClientCallFrom(ClientCall& other)
-      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
-    CloseClientCall();
-    MoveFrom(other);
-  }
+      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
 };
 
 // Unary response client calls receive both a payload and the status in their
@@ -92,15 +89,7 @@
   }
 
   void HandleCompleted(ConstByteSpan response, Status status)
-      PW_UNLOCK_FUNCTION(rpc_lock()) {
-    UnregisterAndMarkClosed();
-    auto on_completed_local = std::move(on_completed_);
-    rpc_lock().unlock();
-
-    if (on_completed_local) {
-      on_completed_local(response, status);
-    }
-  }
+      PW_UNLOCK_FUNCTION(rpc_lock());
 
  protected:
   constexpr UnaryResponseClientCall() = default;
@@ -172,15 +161,7 @@
     return call;
   }
 
-  void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
-    UnregisterAndMarkClosed();
-    auto on_completed_local = std::move(on_completed_);
-    rpc_lock().unlock();
-
-    if (on_completed_local) {
-      on_completed_local(status);
-    }
-  }
+  void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock());
 
  protected:
   constexpr StreamResponseClientCall() = default;
diff --git a/pw_rpc/public/pw_rpc/internal/config.h b/pw_rpc/public/pw_rpc/internal/config.h
index 8b4f6ba..b047e4c 100644
--- a/pw_rpc/public/pw_rpc/internal/config.h
+++ b/pw_rpc/public/pw_rpc/internal/config.h
@@ -71,6 +71,45 @@
 #define PW_RPC_YIELD_MODE PW_RPC_YIELD_MODE_SLEEP
 #endif  // PW_RPC_YIELD_MODE
 
+// If PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP, PW_RPC_YIELD_SLEEP_DURATION
+// sets how long to sleep during each iteration of the yield loop. The value
+// must be a constant expression that converts to a
+// pw::chrono::SystemClock::duration.
+#ifndef PW_RPC_YIELD_SLEEP_DURATION
+
+// When building for a desktop operating system, use a 1ms sleep by default.
+// 1-tick duration sleeps can result in spurious timeouts.
+#if defined(_WIN32) || defined(__APPLE__) || defined(__linux__)
+#define PW_RPC_YIELD_SLEEP_DURATION std::chrono::milliseconds(1)
+#else
+#define PW_RPC_YIELD_SLEEP_DURATION pw::chrono::SystemClock::duration(1)
+#endif  // defined(_WIN32) || defined(__APPLE__) || defined(__linux__)
+
+#endif  // PW_RPC_YIELD_SLEEP_DURATION
+
+// PW_RPC_YIELD_SLEEP_DURATION is not needed for non-sleep yield modes.
+#if PW_RPC_YIELD_MODE != PW_RPC_YIELD_MODE_SLEEP
+#undef PW_RPC_YIELD_SLEEP_DURATION
+#endif  // PW_RPC_YIELD_MODE != PW_RPC_YIELD_MODE_SLEEP
+
+// pw_rpc call objects wait for their callbacks to complete before they are
+// moved or destoyed. Deadlocks occur if a callback:
+//
+//   - attempts to destroy its call object,
+//   - attempts to move its call object while the call is still active, or
+//   - never returns.
+//
+// If PW_RPC_CALLBACK_TIMEOUT_TICKS is greater than 0, then PW_CRASH is invoked
+// if a thread waits for an RPC callback to complete for more than the specified
+// tick count.
+//
+// A "tick" in this context is one iteration of a loop that yields releases the
+// RPC lock and yields the thread according to PW_RPC_YIELD_MODE. By default,
+// the thread yields with a 1-tick call to pw::this_thread::sleep_for.
+#ifndef PW_RPC_CALLBACK_TIMEOUT_TICKS
+#define PW_RPC_CALLBACK_TIMEOUT_TICKS 10000
+#endif  // PW_RPC_CALLBACK_TIMEOUT_TICKS
+
 // Whether pw_rpc should use dynamic memory allocation internally. If enabled,
 // pw_rpc dynamically allocates channels and its encoding buffers. RPC users may
 // use dynamic allocation independently of this option (e.g. to allocate pw_rpc
diff --git a/pw_rpc/public/pw_rpc/internal/server_call.h b/pw_rpc/public/pw_rpc/internal/server_call.h
index 2e29b40..409d4a7 100644
--- a/pw_rpc/public/pw_rpc/internal/server_call.h
+++ b/pw_rpc/public/pw_rpc/internal/server_call.h
@@ -28,13 +28,17 @@
 
 #if PW_RPC_CLIENT_STREAM_END_CALLBACK
     auto on_client_stream_end_local = std::move(on_client_stream_end_);
+    CallbackStarted();
     rpc_lock().unlock();
+
     if (on_client_stream_end_local) {
       on_client_stream_end_local();
     }
-#else
-    rpc_lock().unlock();
+
+    rpc_lock().lock();
+    CallbackFinished();
 #endif  // PW_RPC_CLIENT_STREAM_END_CALLBACK
+    rpc_lock().unlock();
   }
 
  protected:
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index 0d39ecd..002e498 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -128,26 +128,14 @@
     pwpb_on_completed_ = std::move(on_completed);
 
     UnaryResponseClientCall::set_on_completed_locked(
-        [this](ConstByteSpan payload, Status status) {
-          rpc_lock().lock();
-          auto pwpb_on_completed_local = std::move(pwpb_on_completed_);
-          rpc_lock().unlock();
-
-          if (pwpb_on_completed_local) {
-            Response response{};
-            const Status decode_status =
-                serde_->response().Decode(payload, response);
-            if (decode_status.ok()) {
-              pwpb_on_completed_local(response, status);
-            } else {
-              rpc_lock().lock();
-              HandleError(Status::DataLoss());
-            }
-          }
-        });
+        [this](ConstByteSpan payload, Status status)
+            PW_NO_LOCK_SAFETY_ANALYSIS {
+              DecodeToStructAndInvokeOnCompleted(
+                  payload, serde_->response(), pwpb_on_completed_, status);
+            });
   }
 
-  const PwpbMethodSerde* serde_;
+  const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
   Function<void(const Response&, Status)> pwpb_on_completed_
       PW_GUARDED_BY(rpc_lock());
 };
@@ -253,22 +241,15 @@
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     pwpb_on_next_ = std::move(on_next);
 
-    Call::set_on_next_locked([this](ConstByteSpan payload) {
-      if (pwpb_on_next_) {
-        Response response{};
-        const Status status = serde_->response().Decode(payload, response);
-        if (status.ok()) {
-          pwpb_on_next_(response);
-        } else {
-          rpc_lock().lock();
-          HandleError(Status::DataLoss());
-        }
-      }
-    });
+    Call::set_on_next_locked(
+        [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+          DecodeToStructAndInvokeOnNext(
+              payload, serde_->response(), pwpb_on_next_);
+        });
   }
 
-  const PwpbMethodSerde* serde_;
-  Function<void(const Response&)> pwpb_on_next_;
+  const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
+  Function<void(const Response&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
 };
 
 }  // namespace internal
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
index 1bc1b59..c3acb7f 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
@@ -81,7 +81,9 @@
 
   // Give access to the serializer/deserializer object for converting requests
   // and responses between the wire format and pw_protobuf structs.
-  const PwpbMethodSerde& serde() const { return *serde_; }
+  const PwpbMethodSerde& serde() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+    return *serde_;
+  }
 
   // Allow derived classes to be constructed moving another instance.
   PwpbServerCall(PwpbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -120,7 +122,7 @@
   }
 
  private:
-  const PwpbMethodSerde* serde_;
+  const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
 };
 
 // internal::BasePwpbServerReader extends internal::PwpbServerCall further by
@@ -169,18 +171,14 @@
       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
     pwpb_on_next_ = std::move(on_next);
 
-    Call::set_on_next_locked([this](ConstByteSpan payload) {
-      if (pwpb_on_next_) {
-        Request request{};
-        const Status status = serde().request().Decode(payload, request);
-        if (status.ok()) {
-          pwpb_on_next_(request);
-        }
-      }
-    });
+    Call::set_on_next_locked(
+        [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+          DecodeToStructAndInvokeOnNext(
+              payload, serde().request(), pwpb_on_next_);
+        });
   }
 
-  Function<void(const Request&)> pwpb_on_next_;
+  Function<void(const Request&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
 };
 
 }  // namespace internal
diff --git a/pw_rpc/server_call.cc b/pw_rpc/server_call.cc
index 602ad83..747fae4 100644
--- a/pw_rpc/server_call.cc
+++ b/pw_rpc/server_call.cc
@@ -17,6 +17,8 @@
 namespace pw::rpc::internal {
 
 void ServerCall::MoveServerCallFrom(ServerCall& other) {
+  other.WaitUntilReadyToBeMoved();
+
   // If this call is active, finish it first.
   if (active_locked()) {
     CloseAndSendResponseLocked(OkStatus()).IgnoreError();