| // Copyright 2020 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| #include "pw_rpc/internal/call.h" |
| |
| #include "pw_assert/check.h" |
| #include "pw_log/log.h" |
| #include "pw_preprocessor/util.h" |
| #include "pw_rpc/client.h" |
| #include "pw_rpc/internal/endpoint.h" |
| #include "pw_rpc/internal/method.h" |
| #include "pw_rpc/server.h" |
| |
| // If the callback timeout is enabled, count the number of iterations of the |
| // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS. |
| #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0 |
| #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) \ |
| iterations += 1; \ |
| PW_CHECK( \ |
| iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS, \ |
| "A callback for RPC %u:%08x/%08x has not finished after " \ |
| PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS) \ |
| " ticks. This may indicate that an RPC callback attempted to " \ |
| timeout_source \ |
| " its own call object, which is not permitted. Fix this condition or " \ |
| "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this " \ |
| "crash. See https://pigweed.dev/pw_rpc" \ |
| "#destructors-moves-wait-for-callbacks-to-complete for details.", \ |
| static_cast<unsigned>(channel_id_), \ |
| static_cast<unsigned>(service_id_), \ |
| static_cast<unsigned>(method_id_)) |
| #else |
| #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source) static_cast<void>(iterations) |
| #endif // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0 |
| |
| namespace pw::rpc::internal { |
| |
| using pwpb::PacketType; |
| |
| // Creates an active server-side Call. |
| Call::Call(const LockedCallContext& context, CallProperties properties) |
| : Call(context.server().ClaimLocked(), |
| context.call_id(), |
| context.channel_id(), |
| UnwrapServiceId(context.service().service_id()), |
| context.method().id(), |
| properties) {} |
| |
| // Creates an active client-side call, assigning it a new ID. |
| Call::Call(LockedEndpoint& client, |
| uint32_t channel_id, |
| uint32_t service_id, |
| uint32_t method_id, |
| CallProperties properties) |
| : Call(client, |
| client.NewCallId(), |
| channel_id, |
| service_id, |
| method_id, |
| properties) {} |
| |
| Call::Call(LockedEndpoint& endpoint_ref, |
| uint32_t call_id, |
| uint32_t channel_id, |
| uint32_t service_id, |
| uint32_t method_id, |
| CallProperties properties) |
| : endpoint_(&endpoint_ref), |
| channel_id_(channel_id), |
| id_(call_id), |
| service_id_(service_id), |
| method_id_(method_id), |
| rpc_state_(kActive), |
| client_stream_state_(HasClientStream(properties.method_type()) |
| ? kClientStreamActive |
| : kClientStreamInactive), |
| callbacks_executing_(0), |
| properties_(properties) { |
| PW_CHECK_UINT_NE(channel_id, |
| Channel::kUnassignedChannelId, |
| "Calls cannot be created with channel ID 0 " |
| "(Channel::kUnassignedChannelId)"); |
| endpoint().RegisterCall(*this); |
| } |
| |
| Call::~Call() { |
| // Note: this explicit deregistration is necessary to ensure that |
| // modifications to the endpoint call list occur while holding rpc_lock. |
| // Removing this explicit registration would result in unsynchronized |
| // modification of the endpoint call list via the destructor of the |
| // superclass `IntrusiveList<Call>::Item`. |
| LockGuard lock(rpc_lock()); |
| |
| // This `active_locked()` guard is necessary to ensure that `endpoint()` is |
| // still valid. |
| if (active_locked()) { |
| endpoint().UnregisterCall(*this); |
| } |
| |
| // Help prevent dangling references in callbacks by waiting for callbacks to |
| // complete before deleting this call. |
| int iterations = 0; |
| while (CallbacksAreRunning()) { |
| PW_RPC_CHECK_FOR_DEADLOCK("destroy"); |
| YieldRpcLock(); |
| } |
| } |
| |
| void Call::MoveFrom(Call& other) { |
| PW_DCHECK(!active_locked()); |
| |
| if (!other.active_locked()) { |
| return; // Nothing else to do; this call is already closed. |
| } |
| |
| // An active call with an executing callback cannot be moved. Derived call |
| // classes must wait for callbacks to finish before calling MoveFrom. |
| PW_DCHECK(!other.CallbacksAreRunning()); |
| |
| // Copy all members from the other call. |
| endpoint_ = other.endpoint_; |
| channel_id_ = other.channel_id_; |
| id_ = other.id_; |
| service_id_ = other.service_id_; |
| method_id_ = other.method_id_; |
| |
| rpc_state_ = other.rpc_state_; |
| client_stream_state_ = other.client_stream_state_; |
| properties_ = other.properties_; |
| |
| // callbacks_executing_ is not moved since it is associated with the object in |
| // memory, not the call. |
| |
| on_error_ = std::move(other.on_error_); |
| on_next_ = std::move(other.on_next_); |
| |
| // Mark the other call inactive, unregister it, and register this one. |
| other.MarkClosed(); |
| |
| endpoint().UnregisterCall(other); |
| endpoint().RegisterUniqueCall(*this); |
| } |
| |
| void Call::WaitUntilReadyToBeMoved() const { |
| int iterations = 0; |
| while (CallbacksAreRunning() && active_locked()) { |
| PW_RPC_CHECK_FOR_DEADLOCK("move"); |
| YieldRpcLock(); |
| } |
| } |
| |
| void Call::CallOnError(Status error) { |
| auto on_error_local = std::move(on_error_); |
| |
| CallbackStarted(); |
| |
| rpc_lock().unlock(); |
| if (on_error_local) { |
| on_error_local(error); |
| } |
| |
| // This mutex lock could be avoided by making callbacks_executing_ atomic. |
| LockGuard lock(rpc_lock()); |
| CallbackFinished(); |
| } |
| |
| Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) { |
| if (!active_locked()) { |
| return Status::FailedPrecondition(); |
| } |
| |
| Channel* channel = endpoint_->GetInternalChannel(channel_id_); |
| if (channel == nullptr) { |
| return Status::Unavailable(); |
| } |
| return channel->Send(MakePacket(type, payload, status)); |
| } |
| |
| Status Call::CloseAndSendFinalPacketLocked(PacketType type, |
| ConstByteSpan response, |
| Status status) { |
| const Status send_status = SendPacket(type, response, status); |
| UnregisterAndMarkClosed(); |
| return send_status; |
| } |
| |
| Status Call::WriteLocked(ConstByteSpan payload) { |
| return SendPacket(properties_.call_type() == kServerCall |
| ? PacketType::SERVER_STREAM |
| : PacketType::CLIENT_STREAM, |
| payload); |
| } |
| |
| void Call::HandlePayload(ConstByteSpan payload) { |
| // pw_rpc only supports handling packets for a particular RPC one at a time. |
| // Check if any callbacks are running and drop the packet if they are. |
| // |
| // The on_next callback cannot support multiple packets at once since it is |
| // moved before it is invoked. on_error and on_completed are only called |
| // after the call is closed. |
| if (CallbacksAreRunning()) { |
| PW_LOG_WARN( |
| "Received stream packet for %u:%08x/%08x before the callback for a " |
| "previous packet completed! This packet will be dropped. This can be " |
| "avoided by handling packets for a particular RPC on only one thread.", |
| static_cast<unsigned>(channel_id_), |
| static_cast<unsigned>(service_id_), |
| static_cast<unsigned>(method_id_)); |
| rpc_lock().unlock(); |
| return; |
| } |
| |
| if (on_next_ == nullptr) { |
| rpc_lock().unlock(); |
| return; |
| } |
| |
| const uint32_t original_id = id(); |
| auto on_next_local = std::move(on_next_); |
| CallbackStarted(); |
| |
| if (hold_lock_while_invoking_callback_with_payload()) { |
| on_next_local(payload); |
| } else { |
| rpc_lock().unlock(); |
| on_next_local(payload); |
| rpc_lock().lock(); |
| } |
| |
| CallbackFinished(); |
| |
| // Restore the original callback if the original call is still active and |
| // the callback has not been replaced. |
| // NOLINTNEXTLINE(bugprone-use-after-move) |
| if (active_locked() && id() == original_id && on_next_ == nullptr) { |
| on_next_ = std::move(on_next_local); |
| } |
| rpc_lock().unlock(); |
| } |
| |
| void Call::UnregisterAndMarkClosed() { |
| if (active_locked()) { |
| endpoint().UnregisterCall(*this); |
| MarkClosed(); |
| } |
| } |
| |
| } // namespace pw::rpc::internal |