blob: d263081c685b3a6d441c9dc3daac6d5833104cb6 [file] [log] [blame]
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <sys/types.h>
#include <mutex>
#include "pw_bytes/span.h"
#include "pw_metric/metric.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/packet_meta.h"
#include "pw_rpc_transport/hdlc_framing.h"
#include "pw_rpc_transport/rpc_transport.h"
#include "pw_rpc_transport/simple_framing.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/mutex.h"
#include "rpc_transport.h"
namespace pw::rpc {
namespace internal {
void LogBadPacket();
void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id);
void LogMissingEgressForChannel(uint32_t channel_id);
void LogIngressSendFailure(uint32_t channel_id, Status status);
} // namespace internal
// Ties RPC transport and RPC frame encoder together.
template <typename Encoder>
class RpcEgress : public RpcEgressHandler, public ChannelOutput {
public:
RpcEgress(std::string_view channel_name, RpcFrameSender& transport)
: ChannelOutput(channel_name.data()), transport_(transport) {}
// Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the
// provided packet using the target transport's MTU as max frame size and
// sends it over that transport.
//
// Sending a packet may result in multiple RpcTransport::Write calls which
// must not be interleaved in order for the packet to be successfully
// reassembled from the transport-level frames by the receiver. RpcEgress
// is using a mutex to ensure this. Technically we could just rely on pw_rpc
// global lock but that would unnecessarily couple transport logic to pw_rpc
// internals.
Status SendRpcPacket(ConstByteSpan rpc_packet) override {
std::lock_guard lock(mutex_);
return encoder_.Encode(rpc_packet,
transport_.MaximumTransmissionUnit(),
[this](RpcFrame& frame) {
// Encoders must call this callback inline so that
// we're still holding `mutex_` here. Unfortunately
// the lock annotations cannot be used on
// `transport_` to enforce this.
return transport_.Send(frame);
});
}
// Implements ChannelOutput.
Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); }
private:
sync::Mutex mutex_;
RpcFrameSender& transport_;
Encoder encoder_ PW_GUARDED_BY(mutex_);
};
// Ties a channel id and the egress that packets on that channel should be sent
// to.
struct ChannelEgress {
ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler)
: channel_id(id), egress(&egress_handler) {}
const uint32_t channel_id;
RpcEgressHandler* const egress = nullptr;
};
// Handler for incoming RPC packets. RpcIngress is not thread-safe and must be
// accessed from a single thread (typically the RPC RX thread).
template <typename Decoder>
class RpcIngress : public RpcIngressHandler {
public:
static constexpr size_t kMaxChannelId = 64;
RpcIngress() = default;
explicit RpcIngress(span<ChannelEgress> channel_egresses) {
for (auto& channel : channel_egresses) {
PW_ASSERT(channel.channel_id <= kMaxChannelId);
channel_egresses_[channel.channel_id] = channel.egress;
}
}
const metric::Group& metrics() const { return metrics_; }
uint32_t num_bad_packets() const { return bad_packets_.value(); }
uint32_t num_overflow_channel_ids() const {
return overflow_channel_ids_.value();
}
uint32_t num_missing_egresses() const { return missing_egresses_.value(); }
uint32_t num_egress_errors() const { return egress_errors_.value(); }
// Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each
// packet and sends the packet to the egress registered for that channel.
Status ProcessIncomingData(ConstByteSpan buffer) override {
return decoder_.Decode(buffer, [this](ConstByteSpan packet) {
const auto packet_meta = rpc::PacketMeta::FromBuffer(packet);
if (!packet_meta.ok()) {
bad_packets_.Increment();
internal::LogBadPacket();
return;
}
if (packet_meta->channel_id() > kMaxChannelId) {
overflow_channel_ids_.Increment();
internal::LogChannelIdOverflow(packet_meta->channel_id(),
kMaxChannelId);
return;
}
auto* egress = channel_egresses_[packet_meta->channel_id()];
if (egress == nullptr) {
missing_egresses_.Increment();
internal::LogMissingEgressForChannel(packet_meta->channel_id());
return;
}
const auto status = egress->SendRpcPacket(packet);
if (!status.ok()) {
egress_errors_.Increment();
internal::LogIngressSendFailure(packet_meta->channel_id(), status);
}
});
}
private:
std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{};
Decoder decoder_;
PW_METRIC_GROUP(metrics_, "pw_rpc_transport");
PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u);
PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u);
PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u);
PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u);
};
template <size_t kMaxPacketSize>
using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>;
template <size_t kMaxPacketSize>
using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>;
template <size_t kMaxPacketSize>
using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>;
template <size_t kMaxPacketSize>
using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>;
} // namespace pw::rpc