blob: a620610d8c9b6bbc5de1e812d64547fda05793c8 [file] [log] [blame]
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <atomic>
#include <cstddef>
#include "pw_bytes/span.h"
#include "pw_result/result.h"
#include "pw_rpc/channel.h"
#include "pw_rpc_transport/internal/packet_buffer_queue.h"
#include "pw_rpc_transport/rpc_transport.h"
#include "pw_status/status.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/thread_core.h"
#include "rpc_transport.h"
namespace pw::rpc {
namespace internal {
void LogNoRpcServiceRegistryError();
void LogPacketSizeTooLarge(size_t packet_size, size_t max_packet_size);
void LogEgressThreadNotRunningError();
void LogFailedToProcessPacket(Status status);
void LogFailedToAccessPacket(Status status);
void LogNoPacketAvailable(Status status);
} // namespace internal
// Handles RPC packets destined for the local receiver.
template <size_t kPacketQueueSize, size_t kMaxPacketSize>
class LocalRpcEgress : public RpcEgressHandler,
public ChannelOutput,
public thread::ThreadCore {
using PacketBuffer =
typename internal::PacketBufferQueue<kMaxPacketSize>::PacketBuffer;
public:
LocalRpcEgress() : ChannelOutput("RPC local egress") {}
~LocalRpcEgress() override { Stop(); }
// Packet processor cannot be passed as a construction dependency as it would
// create a circular dependency in the RPC transport configuration.
void set_packet_processor(RpcPacketProcessor& packet_processor) {
packet_processor_ = &packet_processor;
}
// Adds the packet to the transmit queue. The queue is continuously processed
// by another thread. Implements RpcEgressHandler.
Status SendRpcPacket(ConstByteSpan rpc_packet) override;
// Implements ChannelOutput.
Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); }
// Once stopped, LocalRpcEgress will no longer process data and
// will report errors on SendPacket().
void Stop() {
if (stopped_) {
return;
}
stopped_ = true;
// Unblock the processing thread and let it finish gracefully.
process_queue_.release();
}
private:
void Run() override;
sync::ThreadNotification process_queue_;
RpcPacketProcessor* packet_processor_ = nullptr;
std::array<PacketBuffer, kPacketQueueSize> packet_storage_;
internal::PacketBufferQueue<kMaxPacketSize> packet_queue_{packet_storage_};
internal::PacketBufferQueue<kMaxPacketSize> transmit_queue_ = {};
std::atomic<bool> stopped_ = false;
};
template <size_t kPacketQueueSize, size_t kMaxPacketSize>
Status LocalRpcEgress<kPacketQueueSize, kMaxPacketSize>::SendRpcPacket(
ConstByteSpan packet) {
if (!packet_processor_) {
internal::LogNoRpcServiceRegistryError();
return Status::FailedPrecondition();
}
if (packet.size() > kMaxPacketSize) {
internal::LogPacketSizeTooLarge(packet.size(), kMaxPacketSize);
return Status::InvalidArgument();
}
if (stopped_) {
internal::LogEgressThreadNotRunningError();
return Status::FailedPrecondition();
}
// Grab a free packet from the egress' pool, copy incoming frame and
// push it the queue for processing.
auto packet_buffer = packet_queue_.Pop();
if (!packet_buffer.ok()) {
internal::LogNoPacketAvailable(packet_buffer.status());
return packet_buffer.status();
}
PW_TRY(packet_buffer.value()->CopyPacket(packet));
transmit_queue_.Push(**packet_buffer);
process_queue_.release();
if (stopped_) {
internal::LogEgressThreadNotRunningError();
return Status::DataLoss();
}
return OkStatus();
}
template <size_t kPacketQueueSize, size_t kMaxPacketSize>
void LocalRpcEgress<kPacketQueueSize, kMaxPacketSize>::Run() {
while (!stopped_) {
// Wait until a client has signaled that there is data in the packet queue.
process_queue_.acquire();
while (true) {
Result<PacketBuffer*> packet_buffer = transmit_queue_.Pop();
if (!packet_buffer.ok()) {
break;
}
Result<ConstByteSpan> packet = (*packet_buffer)->GetPacket();
if (packet.ok()) {
if (const auto status = packet_processor_->ProcessRpcPacket(*packet);
!status.ok()) {
internal::LogFailedToProcessPacket(status);
}
} else {
internal::LogFailedToAccessPacket(packet.status());
}
packet_queue_.Push(**packet_buffer);
}
}
}
} // namespace pw::rpc