pw_rpc: Fix callback synchronization issues
- Move the on_next callback to a local variable while the lock is
held, then invoke it. Restore on_next when done, if the call has not
been closed or on_next changed since.
- Track whether callbacks are executing in the call object.
- If a new stream packet arrives while another callback is running, drop
it since the on_next callback is not available.
- Hold the lock while invoking on_next or on_completed callback wrappers
that decode to a Nanopb or pwpb struct.
- Guard the serde objects with the RPC lock.
- Wait until callbacks complete before moving or destroying a call
object. This helps prevent dangling references in RPC callbacks.
Bug: b/234876851, b/262774186
Change-Id: I9d853001dc27abef0c1fade0057d1da635f622cb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126919
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Khalil Estell <kammce@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_rpc/BUILD.bazel b/pw_rpc/BUILD.bazel
index 004c60d..303c5ed 100644
--- a/pw_rpc/BUILD.bazel
+++ b/pw_rpc/BUILD.bazel
@@ -117,6 +117,7 @@
"//pw_containers:intrusive_list",
"//pw_function",
"//pw_log",
+ "//pw_preprocessor",
"//pw_result",
"//pw_span",
"//pw_status",
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index 1c7bbdb..1b2fced 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -110,6 +110,7 @@
deps = [
":log_config",
dir_pw_log,
+ dir_pw_preprocessor,
]
public = [
"public/pw_rpc/client.h",
diff --git a/pw_rpc/CMakeLists.txt b/pw_rpc/CMakeLists.txt
index 2d55a68..f4496cc 100644
--- a/pw_rpc/CMakeLists.txt
+++ b/pw_rpc/CMakeLists.txt
@@ -154,6 +154,7 @@
packet_meta.cc
PRIVATE_DEPS
pw_log
+ pw_preprocessor
pw_rpc.log_config
)
if(NOT "${pw_sync.mutex_BACKEND}" STREQUAL "")
diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc
index e96fd79..db93311 100644
--- a/pw_rpc/call.cc
+++ b/pw_rpc/call.cc
@@ -15,11 +15,35 @@
#include "pw_rpc/internal/call.h"
#include "pw_assert/check.h"
+#include "pw_log/log.h"
+#include "pw_preprocessor/util.h"
#include "pw_rpc/client.h"
#include "pw_rpc/internal/endpoint.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/server.h"
+// If the callback timeout is enabled, count the number of iterations of the
+// waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
+#if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
+#define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) \
+ iterations += 1; \
+ PW_CHECK( \
+ iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS, \
+ "A callback for RPC %u:%08x/%08x has not finished after " \
+ PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS) \
+ " ticks. This may indicate that an RPC callback attempted to " \
+ timeout_source \
+ " its own call object, which is not permitted. Fix this condition or " \
+ "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this " \
+ "crash. See https://pigweed.dev/pw_rpc" \
+ "#destructors-moves-wait-for-callbacks-to-complete for details.", \
+ static_cast<unsigned>(channel_id_), \
+ static_cast<unsigned>(service_id_), \
+ static_cast<unsigned>(method_id_))
+#else
+#define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) static_cast<void>(iterations)
+#endif // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
+
namespace pw::rpc::internal {
using pwpb::PacketType;
@@ -61,6 +85,7 @@
client_stream_state_(HasClientStream(properties.method_type())
? kClientStreamActive
: kClientStreamInactive),
+ callbacks_executing_(0),
properties_(properties) {
PW_CHECK_UINT_NE(channel_id,
Channel::kUnassignedChannelId,
@@ -82,6 +107,14 @@
if (active_locked()) {
endpoint().UnregisterCall(*this);
}
+
+ // Help prevent dangling references in callbacks by waiting for callbacks to
+ // complete before deleting this call.
+ int iterations = 0;
+ while (CallbacksAreRunning()) {
+ PW_RPC_CHECK_FOR_DEADLOCK("destroy");
+ YieldRpcLock();
+ }
}
void Call::MoveFrom(Call& other) {
@@ -91,6 +124,10 @@
return; // Nothing else to do; this call is already closed.
}
+ // An active call with an executing callback cannot be moved. Derived call
+ // classes must wait for callbacks to finish before calling MoveFrom.
+ PW_DCHECK(!other.CallbacksAreRunning());
+
// Copy all members from the other call.
endpoint_ = other.endpoint_;
channel_id_ = other.channel_id_;
@@ -102,6 +139,9 @@
client_stream_state_ = other.client_stream_state_;
properties_ = other.properties_;
+ // callbacks_executing_ is not moved since it is associated with the object in
+ // memory, not the call.
+
on_error_ = std::move(other.on_error_);
on_next_ = std::move(other.on_next_);
@@ -112,6 +152,29 @@
endpoint().RegisterUniqueCall(*this);
}
+void Call::WaitUntilReadyToBeMoved() const {
+ int iterations = 0;
+ while (CallbacksAreRunning() && active_locked()) {
+ PW_RPC_CHECK_FOR_DEADLOCK("move");
+ YieldRpcLock();
+ }
+}
+
+void Call::CallOnError(Status error) {
+ auto on_error_local = std::move(on_error_);
+
+ CallbackStarted();
+
+ rpc_lock().unlock();
+ if (on_error_local) {
+ on_error_local(error);
+ }
+
+ // This mutex lock could be avoided by making callbacks_executing_ atomic.
+ LockGuard lock(rpc_lock());
+ CallbackFinished();
+}
+
Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
if (!active_locked()) {
return Status::FailedPrecondition();
@@ -139,6 +202,53 @@
payload);
}
+void Call::HandlePayload(ConstByteSpan payload) {
+ // pw_rpc only supports handling packets for a particular RPC one at a time.
+ // Check if any callbacks are running and drop the packet if they are.
+ //
+ // The on_next callback cannot support multiple packets at once since it is
+ // moved before it is invoked. on_error and on_completed are only called
+ // after the call is closed.
+ if (CallbacksAreRunning()) {
+ PW_LOG_WARN(
+ "Received stream packet for %u:%08x/%08x before the callback for a "
+ "previous packet completed! This packet will be dropped. This can be "
+ "avoided by handling packets for a particular RPC on only one thread.",
+ static_cast<unsigned>(channel_id_),
+ static_cast<unsigned>(service_id_),
+ static_cast<unsigned>(method_id_));
+ rpc_lock().unlock();
+ return;
+ }
+
+ if (on_next_ == nullptr) {
+ rpc_lock().unlock();
+ return;
+ }
+
+ const uint32_t original_id = id();
+ auto on_next_local = std::move(on_next_);
+ CallbackStarted();
+
+ if (hold_lock_while_invoking_callback_with_payload()) {
+ on_next_local(payload);
+ } else {
+ rpc_lock().unlock();
+ on_next_local(payload);
+ rpc_lock().lock();
+ }
+
+ CallbackFinished();
+
+ // Restore the original callback if the original call is still active and
+ // the callback has not been replaced.
+ // NOLINTNEXTLINE(bugprone-use-after-move)
+ if (active_locked() && id() == original_id && on_next_ == nullptr) {
+ on_next_ = std::move(on_next_local);
+ }
+ rpc_lock().unlock();
+}
+
void Call::UnregisterAndMarkClosed() {
if (active_locked()) {
endpoint().UnregisterCall(*this);
diff --git a/pw_rpc/callback_test.cc b/pw_rpc/callback_test.cc
index 0ff915e..6adf040 100644
--- a/pw_rpc/callback_test.cc
+++ b/pw_rpc/callback_test.cc
@@ -46,7 +46,7 @@
~CallbacksTest() override {
EXPECT_FALSE(callback_thread_.joinable()); // Tests must join the thread!
- EXPECT_TRUE(callback_executed_);
+ EXPECT_GT(callback_executed_, 0);
}
void RespondToCall(const RawClientReaderWriter& call) {
@@ -60,7 +60,7 @@
thread::Thread callback_thread_;
// Must be set to true by the RPC callback in each test.
- volatile bool callback_executed_ = false;
+ volatile int callback_executed_ = 0;
// Variables optionally used by tests. These are in this object so lambads
// only need to capture [this] to access them.
@@ -81,7 +81,7 @@
const RawClientReaderWriter* respond_to_call_ = &call_1_;
};
-TEST_F(CallbacksTest, DISABLED_DestructorWaitsUntilCallbacksComplete) {
+TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
{
RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
context_.client(), context_.channel().id());
@@ -99,7 +99,7 @@
// block in the call's destructor until this callback completes.
EXPECT_TRUE(call_is_in_scope_);
- callback_executed_ = true;
+ callback_executed_ = callback_executed_ + 1;
});
// Start the callback thread so it can invoke the callback.
@@ -116,17 +116,17 @@
// Wait for the callback thread to finish.
callback_thread_.join();
- EXPECT_TRUE(callback_executed_);
+ EXPECT_EQ(callback_executed_, 1);
}
-TEST_F(CallbacksTest, DISABLED_MoveActiveCall_WaitsForCallbackToComplete) {
+TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
call_1_ = TestService::TestBidirectionalStreamRpc(
context_.client(), context_.channel().id(), [this](ConstByteSpan) {
main_thread_sem_.release(); // Confirm that this thread started
YieldToOtherThread();
- callback_executed_ = true;
+ callback_executed_ = callback_executed_ + 1;
});
// Start the callback thread so it can invoke the callback.
@@ -142,7 +142,7 @@
// The callback should already have finished. This thread should have waited
// for it to finish during the move.
- EXPECT_TRUE(callback_executed_);
+ EXPECT_EQ(callback_executed_, 1);
EXPECT_FALSE(call_1_.active());
EXPECT_TRUE(call_2_.active());
@@ -156,7 +156,7 @@
call_1_ = std::move(call_2_);
- callback_executed_ = true;
+ callback_executed_ = callback_executed_ + 1;
});
call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
@@ -184,7 +184,7 @@
EXPECT_EQ(OkStatus(), call_1_.Cancel());
call_2_ = std::move(call_1_);
- callback_executed_ = true;
+ callback_executed_ = callback_executed_ + 1;
});
call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
@@ -201,5 +201,33 @@
EXPECT_FALSE(call_2_.active());
}
+TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
+ call_1_ = TestService::TestBidirectionalStreamRpc(
+ context_.client(), context_.channel().id(), [this](ConstByteSpan) {
+ main_thread_sem_.release(); // Confirm that this thread started
+
+ callback_thread_sem_.acquire(); // Wait for the main thread to release
+
+ callback_executed_ = callback_executed_ + 1;
+ });
+
+ // Start the callback thread.
+ callback_thread_sem_.release();
+
+ main_thread_sem_.acquire(); // Confirm that the callback is running
+
+ // Handle a few packets for this call, which should be dropped.
+ for (int i = 0; i < 5; ++i) {
+ context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
+ {}, call_1_.id());
+ }
+
+ // Wait for the callback thread to finish.
+ callback_thread_sem_.release();
+ callback_thread_.join();
+
+ EXPECT_EQ(callback_executed_, 1);
+}
+
} // namespace
} // namespace pw::rpc
diff --git a/pw_rpc/client_call.cc b/pw_rpc/client_call.cc
index d8e57e8..ad5fd21 100644
--- a/pw_rpc/client_call.cc
+++ b/pw_rpc/client_call.cc
@@ -23,4 +23,49 @@
UnregisterAndMarkClosed();
}
+void ClientCall::MoveClientCallFrom(ClientCall& other)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ other.WaitUntilReadyToBeMoved();
+ CloseClientCall();
+ MoveFrom(other);
+}
+
+void UnaryResponseClientCall::HandleCompleted(
+ ConstByteSpan response, Status status) PW_NO_LOCK_SAFETY_ANALYSIS {
+ UnregisterAndMarkClosed();
+
+ auto on_completed_local = std::move(on_completed_);
+ CallbackStarted();
+
+ // The lock is only released when calling into user code. If the callback is
+ // wrapped, this on_completed is an internal function that expects the lock to
+ // be held, and releases it before invoking user code.
+ if (!hold_lock_while_invoking_callback_with_payload()) {
+ rpc_lock().unlock();
+ }
+
+ if (on_completed_local) {
+ on_completed_local(response, status);
+ }
+
+ // This mutex lock could be avoided by making callbacks_executing_ atomic.
+ LockGuard lock(rpc_lock());
+ CallbackFinished();
+}
+
+void StreamResponseClientCall::HandleCompleted(Status status) {
+ UnregisterAndMarkClosed();
+ auto on_completed_local = std::move(on_completed_);
+ CallbackStarted();
+ rpc_lock().unlock();
+
+ if (on_completed_local) {
+ on_completed_local(status);
+ }
+
+ // This mutex lock could be avoided by making callbacks_executing_ atomic.
+ LockGuard lock(rpc_lock());
+ CallbackFinished();
+}
+
} // namespace pw::rpc::internal
diff --git a/pw_rpc/docs.rst b/pw_rpc/docs.rst
index 133f413..b8beb25 100644
--- a/pw_rpc/docs.rst
+++ b/pw_rpc/docs.rst
@@ -1569,7 +1569,8 @@
- ``PW_RPC_YIELD_MODE_BUSY_LOOP``. Do nothing. Release and reacquire the RPC
lock in a busy loop. :c:macro:`PW_RPC_USE_GLOBAL_MUTEX` must be 0 as well.
- - ``PW_RPC_YIELD_MODE_SLEEP``. Yield with 1-tick calls to
+ - ``PW_RPC_YIELD_MODE_SLEEP``. Yield with repeated
+ :c:macro:`PW_RPC_YIELD_SLEEP_DURATION`-length calls to
:cpp:func:`pw::this_thread::sleep_for`. A backend must be configured for
pw_thread:sleep.
- ``PW_RPC_YIELD_MODE_YIELD``. Yield with :cpp:func:`pw::this_thread::yield`.
@@ -1577,6 +1578,13 @@
platforms, :cpp:func:`pw::this_thread::yield` does not yield to lower
priority tasks and should not be used here.
+.. c:macro:: PW_RPC_YIELD_SLEEP_DURATION
+
+ If :c:macro:`PW_RPC_YIELD_MODE` is :c:macro:`PW_RPC_YIELD_MODE_SLEEP`,
+ ``PW_RPC_YIELD_SLEEP_DURATION`` sets how long to sleep during each iteration
+ of the yield loop. The value must be a constant expression that converts to a
+ :cpp:type:`pw::chrono::SystemClock::duration`.
+
.. c:macro:: PW_RPC_CALLBACK_TIMEOUT_TICKS
pw_rpc call objects wait for their callbacks to complete before they are moved
diff --git a/pw_rpc/endpoint.cc b/pw_rpc/endpoint.cc
index a3d922c..7fbccc9 100644
--- a/pw_rpc/endpoint.cc
+++ b/pw_rpc/endpoint.cc
@@ -18,6 +18,8 @@
#include "pw_rpc/internal/endpoint.h"
// clang-format on
+#include <chrono>
+
#include "pw_log/log.h"
#include "pw_rpc/internal/lock.h"
#include "pw_toolchain/no_destructor.h"
@@ -80,7 +82,9 @@
void YieldRpcLock() {
rpc_lock().unlock();
#if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
- this_thread::sleep_for(chrono::SystemClock::duration(1));
+ static constexpr chrono::SystemClock::duration kSleepDuration =
+ PW_RPC_YIELD_SLEEP_DURATION;
+ this_thread::sleep_for(kSleepDuration);
#elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
this_thread::yield();
#endif // PW_RPC_YIELD_MODE
@@ -126,9 +130,9 @@
calls_.push_front(call);
if (existing_call != nullptr) {
- // TODO(b/234876851): Ensure call object is locked when calling callback.
- // For on_error, could potentially move the callback and call it after
- // the lock is released.
+ // TODO(b/260922913): The HandleError() call needs to be deferred to avoid
+ // releasing the lock before finishing state updates. Could move the call
+ // to the planned calls_to_abort list and clean up later.
existing_call->HandleError(Status::Cancelled());
rpc_lock().lock();
}
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
index 8eb7afd..4216420 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h
@@ -101,24 +101,14 @@
nanopb_on_completed_ = std::move(on_completed);
UnaryResponseClientCall::set_on_completed_locked(
- [this](ConstByteSpan payload, Status status) {
- rpc_lock().lock();
- auto nanopb_on_completed_local = std::move(nanopb_on_completed_);
- rpc_lock().unlock();
-
- if (nanopb_on_completed_local) {
- Response response_struct{};
- if (serde_->response().Decode(payload, &response_struct).ok()) {
- nanopb_on_completed_local(response_struct, status);
- } else {
- rpc_lock().lock();
- HandleError(Status::DataLoss());
- }
- }
- });
+ [this](ConstByteSpan payload, Status status)
+ PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnCompleted(
+ payload, serde_->response(), nanopb_on_completed_, status);
+ });
}
- const NanopbMethodSerde* serde_;
+ const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
Function<void(const Response&, Status)> nanopb_on_completed_
PW_GUARDED_BY(rpc_lock());
};
@@ -199,23 +189,15 @@
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
nanopb_on_next_ = std::move(on_next);
- internal::Call::set_on_next_locked([this](ConstByteSpan payload) {
- if (nanopb_on_next_) {
- Response response_struct{};
- if (serde_->response().Decode(payload, &response_struct).ok()) {
- nanopb_on_next_(response_struct);
- } else {
- // TODO(hepler): This should send a DATA_LOSS error and call the
- // error callback.
- rpc_lock().lock();
- HandleError(Status::DataLoss());
- }
- }
- });
+ Call::set_on_next_locked(
+ [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnNext(
+ payload, serde_->response(), nanopb_on_next_);
+ });
}
- const NanopbMethodSerde* serde_;
- Function<void(const Response&)> nanopb_on_next_;
+ const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
+ Function<void(const Response&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
};
} // namespace internal
diff --git a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
index 02cd8df..a9e6994 100644
--- a/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
+++ b/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
@@ -51,7 +51,10 @@
return SendFinalResponse(*this, payload, status);
}
- const NanopbMethodSerde& serde() const { return *serde_; }
+ const NanopbMethodSerde& serde() const
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ return *serde_;
+ }
protected:
NanopbServerCall(NanopbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -77,7 +80,7 @@
}
private:
- const NanopbMethodSerde* serde_;
+ const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
};
// The BaseNanopbServerReader serves as the base for the ServerReader and
@@ -119,17 +122,14 @@
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
nanopb_on_next_ = std::move(on_next);
- internal::Call::set_on_next_locked([this](ConstByteSpan payload) {
- if (nanopb_on_next_) {
- Request request_struct{};
- if (serde().request().Decode(payload, &request_struct).ok()) {
- nanopb_on_next_(request_struct);
- }
- }
- });
+ Call::set_on_next_locked(
+ [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnNext(
+ payload, serde().request(), nanopb_on_next_);
+ });
}
- Function<void(const Request&)> nanopb_on_next_;
+ Function<void(const Request&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
};
} // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h
index 2a3b79a..fef9fa8 100644
--- a/pw_rpc/public/pw_rpc/internal/call.h
+++ b/pw_rpc/public/pw_rpc/internal/call.h
@@ -214,16 +214,7 @@
// Whenever a payload arrives (in a server/client stream or in a response),
// call the on_next_ callback.
// Precondition: rpc_lock() must be held.
- void HandlePayload(ConstByteSpan message) const
- PW_UNLOCK_FUNCTION(rpc_lock()) {
- const bool invoke = on_next_ != nullptr;
- // TODO(b/234876851): Ensure on_next_ is properly guarded.
- rpc_lock().unlock();
-
- if (invoke) {
- on_next_(message);
- }
- }
+ void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
// Handles an error condition for the call. This closes the call and calls the
// on_error callback, if set.
@@ -268,6 +259,7 @@
method_id_{},
rpc_state_{},
client_stream_state_{},
+ callbacks_executing_{},
properties_{} {}
// Creates an active server-side Call.
@@ -281,6 +273,14 @@
uint32_t method_id,
CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+ void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ callbacks_executing_ += 1;
+ }
+
+ void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ callbacks_executing_ -= 1;
+ }
+
// This call must be in a closed state when this is called.
void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
@@ -340,6 +340,85 @@
constexpr operator Writer&();
constexpr operator const Writer&() const;
+ // Indicates if the on_next and unary on_completed callbacks are internal
+ // wrappers that decode the raw proto before invoking the user's callback. If
+ // they are, the lock must be held when they are invoked.
+ bool hold_lock_while_invoking_callback_with_payload() const
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ return properties_.callback_proto_type() == kProtoStruct;
+ }
+
+ // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
+ // pwpb or Nanopb version of the on_next callback.
+ //
+ // This must ONLY be called from derived classes the wrap the on_next
+ // callback. These classes MUST indicate that they call calls in their
+ // constructor.
+ template <typename Decoder, typename ProtoStruct>
+ void DecodeToStructAndInvokeOnNext(
+ ConstByteSpan payload,
+ const Decoder& decoder,
+ Function<void(const ProtoStruct&)>& proto_on_next)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ if (proto_on_next == nullptr) {
+ return;
+ }
+
+ ProtoStruct proto_struct{};
+
+ if (!decoder.Decode(payload, proto_struct).ok()) {
+ HandleError(Status::DataLoss());
+ rpc_lock().lock();
+ return;
+ }
+
+ const uint32_t original_id = id();
+ auto proto_on_next_local = std::move(proto_on_next);
+
+ rpc_lock().unlock();
+ proto_on_next_local(proto_struct);
+ rpc_lock().lock();
+
+ // Restore the original callback if the original call is still active and
+ // the callback has not been replaced.
+ // NOLINTNEXTLINE(bugprone-use-after-move)
+ if (active_locked() && id() == original_id && proto_on_next == nullptr) {
+ proto_on_next = std::move(proto_on_next_local);
+ }
+ }
+
+ // The call is already unregistered and closed.
+ template <typename Decoder, typename ProtoStruct>
+ void DecodeToStructAndInvokeOnCompleted(
+ ConstByteSpan payload,
+ const Decoder& decoder,
+ Function<void(const ProtoStruct&, Status)>& proto_on_completed,
+ Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
+ // Always move proto_on_completed so it goes out of scope in this function.
+ auto proto_on_completed_local = std::move(proto_on_completed);
+
+ // Move on_error in case an error occurs.
+ auto on_error_local = std::move(on_error_);
+
+ // Release the lock before decoding, since decoder is a global.
+ rpc_lock().unlock();
+
+ if (proto_on_completed_local == nullptr) {
+ return;
+ }
+
+ ProtoStruct proto_struct{};
+ if (decoder.Decode(payload, proto_struct).ok()) {
+ proto_on_completed_local(proto_struct, status);
+ } else if (on_error_local != nullptr) {
+ on_error_local(Status::DataLoss());
+ }
+ }
+
+ // An active call cannot be moved if its callbacks are running. This function
+ // must be called on the call being moved before updating any state.
+ void WaitUntilReadyToBeMoved() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+
private:
// Common constructor for server & client calls.
Call(LockedEndpoint& endpoint,
@@ -371,14 +450,7 @@
// Calls the on_error callback without closing the RPC. This is used when the
// call has already completed.
- void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) {
- auto on_error_local = std::move(on_error_);
-
- rpc_lock().unlock();
- if (on_error_local) {
- on_error_local(error);
- }
- }
+ void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
// Sends a payload with the specified type. The payload may either be in a
// previously acquired buffer or in a standalone buffer.
@@ -394,6 +466,10 @@
Status status)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
+ bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ return callbacks_executing_ != 0u;
+ }
+
internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
uint32_t id_ PW_GUARDED_BY(rpc_lock());
@@ -405,6 +481,11 @@
kClientStreamInactive,
kClientStreamActive,
} client_stream_state_ PW_GUARDED_BY(rpc_lock());
+
+ // Tracks how many of this call's callbacks are running. Must be 0 for the
+ // call to be destroyed.
+ uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
+
CallProperties properties_ PW_GUARDED_BY(rpc_lock());
// Called when the RPC is terminated due to an error.
@@ -412,7 +493,7 @@
// Called when a request is received. Only used for RPCs with client streams.
// The raw payload buffer is passed to the callback.
- Function<void(ConstByteSpan payload)> on_next_;
+ Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
};
} // namespace internal
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index 3efe98e..1f671e1 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -64,10 +64,7 @@
void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
void MoveClientCallFrom(ClientCall& other)
- PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
- CloseClientCall();
- MoveFrom(other);
- }
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
};
// Unary response client calls receive both a payload and the status in their
@@ -92,15 +89,7 @@
}
void HandleCompleted(ConstByteSpan response, Status status)
- PW_UNLOCK_FUNCTION(rpc_lock()) {
- UnregisterAndMarkClosed();
- auto on_completed_local = std::move(on_completed_);
- rpc_lock().unlock();
-
- if (on_completed_local) {
- on_completed_local(response, status);
- }
- }
+ PW_UNLOCK_FUNCTION(rpc_lock());
protected:
constexpr UnaryResponseClientCall() = default;
@@ -172,15 +161,7 @@
return call;
}
- void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
- UnregisterAndMarkClosed();
- auto on_completed_local = std::move(on_completed_);
- rpc_lock().unlock();
-
- if (on_completed_local) {
- on_completed_local(status);
- }
- }
+ void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock());
protected:
constexpr StreamResponseClientCall() = default;
diff --git a/pw_rpc/public/pw_rpc/internal/config.h b/pw_rpc/public/pw_rpc/internal/config.h
index 8b4f6ba..b047e4c 100644
--- a/pw_rpc/public/pw_rpc/internal/config.h
+++ b/pw_rpc/public/pw_rpc/internal/config.h
@@ -71,6 +71,45 @@
#define PW_RPC_YIELD_MODE PW_RPC_YIELD_MODE_SLEEP
#endif // PW_RPC_YIELD_MODE
+// If PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP, PW_RPC_YIELD_SLEEP_DURATION
+// sets how long to sleep during each iteration of the yield loop. The value
+// must be a constant expression that converts to a
+// pw::chrono::SystemClock::duration.
+#ifndef PW_RPC_YIELD_SLEEP_DURATION
+
+// When building for a desktop operating system, use a 1ms sleep by default.
+// 1-tick duration sleeps can result in spurious timeouts.
+#if defined(_WIN32) || defined(__APPLE__) || defined(__linux__)
+#define PW_RPC_YIELD_SLEEP_DURATION std::chrono::milliseconds(1)
+#else
+#define PW_RPC_YIELD_SLEEP_DURATION pw::chrono::SystemClock::duration(1)
+#endif // defined(_WIN32) || defined(__APPLE__) || defined(__linux__)
+
+#endif // PW_RPC_YIELD_SLEEP_DURATION
+
+// PW_RPC_YIELD_SLEEP_DURATION is not needed for non-sleep yield modes.
+#if PW_RPC_YIELD_MODE != PW_RPC_YIELD_MODE_SLEEP
+#undef PW_RPC_YIELD_SLEEP_DURATION
+#endif // PW_RPC_YIELD_MODE != PW_RPC_YIELD_MODE_SLEEP
+
+// pw_rpc call objects wait for their callbacks to complete before they are
+// moved or destoyed. Deadlocks occur if a callback:
+//
+// - attempts to destroy its call object,
+// - attempts to move its call object while the call is still active, or
+// - never returns.
+//
+// If PW_RPC_CALLBACK_TIMEOUT_TICKS is greater than 0, then PW_CRASH is invoked
+// if a thread waits for an RPC callback to complete for more than the specified
+// tick count.
+//
+// A "tick" in this context is one iteration of a loop that yields releases the
+// RPC lock and yields the thread according to PW_RPC_YIELD_MODE. By default,
+// the thread yields with a 1-tick call to pw::this_thread::sleep_for.
+#ifndef PW_RPC_CALLBACK_TIMEOUT_TICKS
+#define PW_RPC_CALLBACK_TIMEOUT_TICKS 10000
+#endif // PW_RPC_CALLBACK_TIMEOUT_TICKS
+
// Whether pw_rpc should use dynamic memory allocation internally. If enabled,
// pw_rpc dynamically allocates channels and its encoding buffers. RPC users may
// use dynamic allocation independently of this option (e.g. to allocate pw_rpc
diff --git a/pw_rpc/public/pw_rpc/internal/server_call.h b/pw_rpc/public/pw_rpc/internal/server_call.h
index 2e29b40..409d4a7 100644
--- a/pw_rpc/public/pw_rpc/internal/server_call.h
+++ b/pw_rpc/public/pw_rpc/internal/server_call.h
@@ -28,13 +28,17 @@
#if PW_RPC_CLIENT_STREAM_END_CALLBACK
auto on_client_stream_end_local = std::move(on_client_stream_end_);
+ CallbackStarted();
rpc_lock().unlock();
+
if (on_client_stream_end_local) {
on_client_stream_end_local();
}
-#else
- rpc_lock().unlock();
+
+ rpc_lock().lock();
+ CallbackFinished();
#endif // PW_RPC_CLIENT_STREAM_END_CALLBACK
+ rpc_lock().unlock();
}
protected:
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index 0d39ecd..002e498 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -128,26 +128,14 @@
pwpb_on_completed_ = std::move(on_completed);
UnaryResponseClientCall::set_on_completed_locked(
- [this](ConstByteSpan payload, Status status) {
- rpc_lock().lock();
- auto pwpb_on_completed_local = std::move(pwpb_on_completed_);
- rpc_lock().unlock();
-
- if (pwpb_on_completed_local) {
- Response response{};
- const Status decode_status =
- serde_->response().Decode(payload, response);
- if (decode_status.ok()) {
- pwpb_on_completed_local(response, status);
- } else {
- rpc_lock().lock();
- HandleError(Status::DataLoss());
- }
- }
- });
+ [this](ConstByteSpan payload, Status status)
+ PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnCompleted(
+ payload, serde_->response(), pwpb_on_completed_, status);
+ });
}
- const PwpbMethodSerde* serde_;
+ const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
Function<void(const Response&, Status)> pwpb_on_completed_
PW_GUARDED_BY(rpc_lock());
};
@@ -253,22 +241,15 @@
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
pwpb_on_next_ = std::move(on_next);
- Call::set_on_next_locked([this](ConstByteSpan payload) {
- if (pwpb_on_next_) {
- Response response{};
- const Status status = serde_->response().Decode(payload, response);
- if (status.ok()) {
- pwpb_on_next_(response);
- } else {
- rpc_lock().lock();
- HandleError(Status::DataLoss());
- }
- }
- });
+ Call::set_on_next_locked(
+ [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnNext(
+ payload, serde_->response(), pwpb_on_next_);
+ });
}
- const PwpbMethodSerde* serde_;
- Function<void(const Response&)> pwpb_on_next_;
+ const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
+ Function<void(const Response&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
};
} // namespace internal
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
index 1bc1b59..c3acb7f 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
@@ -81,7 +81,9 @@
// Give access to the serializer/deserializer object for converting requests
// and responses between the wire format and pw_protobuf structs.
- const PwpbMethodSerde& serde() const { return *serde_; }
+ const PwpbMethodSerde& serde() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
+ return *serde_;
+ }
// Allow derived classes to be constructed moving another instance.
PwpbServerCall(PwpbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
@@ -120,7 +122,7 @@
}
private:
- const PwpbMethodSerde* serde_;
+ const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
};
// internal::BasePwpbServerReader extends internal::PwpbServerCall further by
@@ -169,18 +171,14 @@
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
pwpb_on_next_ = std::move(on_next);
- Call::set_on_next_locked([this](ConstByteSpan payload) {
- if (pwpb_on_next_) {
- Request request{};
- const Status status = serde().request().Decode(payload, request);
- if (status.ok()) {
- pwpb_on_next_(request);
- }
- }
- });
+ Call::set_on_next_locked(
+ [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
+ DecodeToStructAndInvokeOnNext(
+ payload, serde().request(), pwpb_on_next_);
+ });
}
- Function<void(const Request&)> pwpb_on_next_;
+ Function<void(const Request&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
};
} // namespace internal
diff --git a/pw_rpc/server_call.cc b/pw_rpc/server_call.cc
index 602ad83..747fae4 100644
--- a/pw_rpc/server_call.cc
+++ b/pw_rpc/server_call.cc
@@ -17,6 +17,8 @@
namespace pw::rpc::internal {
void ServerCall::MoveServerCallFrom(ServerCall& other) {
+ other.WaitUntilReadyToBeMoved();
+
// If this call is active, finish it first.
if (active_locked()) {
CloseAndSendResponseLocked(OkStatus()).IgnoreError();