blob: 5942304f4c28d3fb5d13925a90210227f8facd9d [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <limits>
#include <mutex>
#include "pw_bytes/span.h"
#include "pw_function/function.h"
#include "pw_multisink/config.h"
#include "pw_result/result.h"
#include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"
namespace pw {
namespace multisink {
// An asynchronous single-writer multi-reader queue that ensures readers can
// poll for dropped message counts, which is useful for logging or similar
// scenarios where readers need to be aware of the input message sequence.
// This class is thread-safe but NOT IRQ-safe when
class MultiSink {
// An asynchronous reader which is attached to a MultiSink via AttachDrain.
// Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
// entry sequence information for clients when popping.
class Drain {
// Holds the context for a peeked entry, tha the user may pass to `PopEntry`
// to advance the drain.
class PeekedEntry {
// Provides access to the peeked entry's data.
ConstByteSpan entry() const { return entry_; }
friend MultiSink;
friend MultiSink::Drain;
constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id)
: entry_(entry), sequence_id_(sequence_id) {}
uint32_t sequence_id() const { return sequence_id_; }
const ConstByteSpan entry_;
const uint32_t sequence_id_;
constexpr Drain()
: last_handled_sequence_id_(0),
multisink_(nullptr) {}
// Returns the next available entry if it exists and acquires the latest
// drop count in parallel.
// If the read operation was successful or returned OutOfRange (i.e. no
// entries to read) then the `drop_count_out` is set to the number of
// entries that were dropped since the last call to PopEntry due to
// advancing the drain, and `ingress_drop_count_out` is set to the number of
// logs that were dropped before being added to the MultiSink. Otherwise,
// the drop counts are set to zero, so should always be processed.
// Drop counts are internally maintained with a 32-bit counter. If
// UINT32_MAX entries have been handled by the attached multisink between
// subsequent calls to PopEntry, the drop count will overflow and will
// report a lower count erroneously. Users should ensure that sinks call
// PopEntry at least once every UINT32_MAX entries.
// Example Usage:
// void ProcessEntriesFromDrain(Drain& drain) {
// std::array<std::byte, kEntryBufferSize> buffer;
// uint32_t drop_count = 0;
// // Example#1: Request the drain for a new entry.
// {
// const Result<ConstByteSpan> result = drain.PopEntry(buffer,
// drop_count);
// // If a non-zero drop count is received, process them.
// if (drop_count > 0) {
// ProcessDropCount(drop_count);
// }
// // If the call was successful, process the entry that was received.
// if (result.ok()) {
// ProcessEntry(result.value());
// }
// }
// // Example#2: Drain out all messages.
// {
// Result<ConstByteSpan> result = Status::OutOfRange();
// do {
// result = drain.PopEntry(buffer, drop_count);
// if (drop_count > 0) {
// ProcessDropCount(drop_count);
// }
// if (result.ok()) {
// ProcessEntry(result.value());
// }
// // Keep trying until we hit OutOfRange. Note that a new entry may
// // have arrived after the PopEntry call.
// } while (!result.IsOutOfRange());
// }
// }
// Precondition: the buffer data must not be corrupt, otherwise there will
// be a crash.
// Return values:
// OK - An entry was successfully read from the multisink.
// OUT_OF_RANGE - No entries were available.
// FAILED_PRECONDITION - The drain must be attached to a sink.
// RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
// the next available entry, which was discarded.
Result<ConstByteSpan> PopEntry(ByteSpan buffer,
uint32_t& drop_count_out,
uint32_t& ingress_drop_count)
// Overload that combines drop counts.
// TODO(cachinchilla): remove when downstream projects migrated to new API.
[[deprecated("Use PopEntry with different drop count outputs")]] Result<
PopEntry(ByteSpan buffer, uint32_t& drop_count_out)
PW_LOCKS_EXCLUDED(multisink_->lock_) {
uint32_t ingress_drop_count = 0;
Result<ConstByteSpan> result =
PopEntry(buffer, drop_count_out, ingress_drop_count);
drop_count_out += ingress_drop_count;
return result;
// Removes the previously peeked entry from the multisink.
// Example Usage:
// // Peek entry to send it, and remove entry from multisink on success.
// uint32_t drop_count;
// const Result<PeekedEntry> peek_result =
// PeekEntry(out_buffer, drop_count);
// if (!peek_result.ok()) {
// return peek_result.status();
// }
// Status send_status = UserSendFunction(peek_result.value().entry())
// if (!send_status.ok())
// return send_status;
// }
// PW_CHECK_OK(PopEntry(peek_result.value());
// Precondition: the buffer data must not be corrupt, otherwise there will
// be a crash.
// Return values:
// OK - the entry or entries were removed from the multisink succesfully.
// FAILED_PRECONDITION - The drain must be attached to a sink.
Status PopEntry(const PeekedEntry& entry)
// Returns a copy of the next available entry if it exists and acquires the
// latest drop count if the drain was advanced, and the latest ingress drop
// count, without moving the drain forward, except if there is a
// RESOURCE_EXHAUSTED error when peeking, in which case the drain is
// automatically advanced.
// The `drop_count_out` follows the same logic as `PopEntry`. The user must
// call `PopEntry` once the data in peek was used successfully.
// Precondition: the buffer data must not be corrupt, otherwise there will
// be a crash.
// Return values:
// OK - An entry was successfully read from the multisink.
// OUT_OF_RANGE - No entries were available.
// FAILED_PRECONDITION - The drain must be attached to a sink.
// RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
// the next available entry, which was discarded.
Result<PeekedEntry> PeekEntry(ByteSpan buffer,
uint32_t& drop_count_out,
uint32_t& ingress_drop_count)
// Drains are not copyable or movable.
Drain(const Drain&) = delete;
Drain& operator=(const Drain&) = delete;
Drain(Drain&&) = delete;
Drain& operator=(Drain&&) = delete;
friend MultiSink;
// The `reader_` and `last_handled_sequence_id_` are managed by attached
// multisink and are guarded by `multisink_->lock_` when used.
ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
uint32_t last_handled_sequence_id_;
uint32_t last_peek_sequence_id_;
uint32_t last_handled_ingress_drop_count_;
MultiSink* multisink_;
// A pure-virtual listener of a MultiSink, attached via AttachListener.
// MultiSink's invoke listeners when new data arrives, allowing them to
// schedule the draining of messages out of the MultiSink.
class Listener : public IntrusiveList<Listener>::Item {
constexpr Listener() {}
virtual ~Listener() = default;
// Listeners are not copyable or movable.
Listener(const Listener&) = delete;
Listener& operator=(const Drain&) = delete;
Listener(Listener&&) = delete;
Listener& operator=(Drain&&) = delete;
friend MultiSink;
// Invoked by the attached multisink when a new entry or drop count is
// available. The multisink lock is held during this call, so neither the
// multisink nor it's drains can be used during this callback.
virtual void OnNewEntryAvailable() = 0;
class iterator {
iterator& operator++() {
return *this;
iterator operator++(int) {
iterator original = *this;
return original;
ConstByteSpan& operator*() {
entry_ = (*it_).buffer;
return entry_;
ConstByteSpan* operator->() { return &operator*(); }
constexpr bool operator==(const iterator& rhs) const {
return it_ == rhs.it_;
constexpr bool operator!=(const iterator& rhs) const {
return it_ != rhs.it_;
// Returns the status of the last iteration operation. If the iterator
// fails to read an entry, it will move to iterator::end() and indicate
// the failure reason here.
// Return values:
// OK - iteration is successful and iterator points to the next entry.
// DATA_LOSS - Failed to read the metadata at this location.
Status status() const { return it_.status(); }
friend class MultiSink;
iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
: it_(reader) {}
iterator() {}
ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
ConstByteSpan entry_;
class UnsafeIterationWrapper {
using element_type = ConstByteSpan;
using value_type = std::remove_cv_t<ConstByteSpan>;
using pointer = ConstByteSpan*;
using reference = ConstByteSpan&;
using const_iterator = iterator; // Standard alias for iterable types.
iterator begin() const { return iterator(*reader_); }
iterator end() const { return iterator(); }
const_iterator cbegin() const { return begin(); }
const_iterator cend() const { return end(); }
friend class MultiSink;
ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
: reader_(&reader) {}
ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
return UnsafeIterationWrapper(oldest_entry_drain_.reader_);
// Constructs a multisink using a ring buffer backed by the provided buffer.
MultiSink(ByteSpan buffer)
: ring_buffer_(true), sequence_id_(0), total_ingress_drops_(0) {
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
// Write an entry to the multisink. If available space is less than the
// size of the entry, the internal ring buffer will push the oldest entries
// out to make space, so long as the entry is not larger than the buffer.
// The sequence ID of the multisink will always increment as a result of
// calling HandleEntry, regardless of whether pushing the entry succeeds.
// Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
// function must not be called from an interrupt context.
// Precondition: entry.size() <= `ring_buffer_` size
void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);
// Notifies the multisink of messages dropped before ingress. The writer
// may use this to signal to readers that an entry (or entries) failed
// before being sent to the multisink (e.g. the writer failed to encode
// the message). This API increments the sequence ID of the multisink by
// the provided `drop_count`.
void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);
// Attach a drain to the multisink. Drains may not be associated with more
// than one multisink at a time. Drains can consume entries pushed before
// the drain was attached, so long as they have not yet been evicted from
// the underlying ring buffer.
// Precondition: The drain must not be attached to a multisink.
void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
// Detaches a drain from the multisink. Drains may only be detached if they
// were previously attached to this multisink.
// Precondition: The drain must be attached to this multisink.
void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
// Attach a listener to the multisink. The listener will be notified
// immediately when attached, to allow late drain users to consume existing
// entries. If draining in response to the notification, ensure that the drain
// is attached prior to registering the listener; attempting to drain when
// unattached will crash. Once attached, listeners are invoked on all new
// messages.
// Precondition: The listener must not be attached to a multisink.
void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
// Detaches a listener from the multisink.
// Precondition: The listener must be attached to this multisink.
void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
// Removes all data from the internal buffer. The multisink's sequence ID is
// not modified, so readers may interpret this event as droppping entries.
void Clear() PW_LOCKS_EXCLUDED(lock_);
// Uses MultiSink's unsafe iteration to dump the contents to a user-provided
// callback. max_num_entries can be used to limit the dump to the N most
// recent entries.
// Returns:
// OK - Successfully dumped entire multisink.
// DATA_LOSS - Corruption detected, some entries may have been lost.
Status UnsafeForEachEntry(
const Function<void(ConstByteSpan)>& callback,
size_t max_num_entries = std::numeric_limits<size_t>::max());
friend Drain;
enum class Request { kPop, kPeek };
// Removes the previously peeked entry from the front of the multisink.
Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry)
// Gets a copy of the entry from the provided drain and unpacks sequence ID
// information. The entry is removed from the multisink when `request` is set
// to `Request::kPop`. Drains use this API to strip away sequence ID
// information for drop calculation.
// Precondition: the buffer data must not be corrupt, otherwise there will
// be a crash.
// Returns:
// OK - An entry was successfully read from the multisink. The
// `drop_count_out` is set to the difference between the current sequence ID
// and the last handled ID.
// FAILED_PRECONDITION - The drain is not attached to
// a multisink.
// RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
// the next available entry, which was discarded.
Result<ConstByteSpan> PeekOrPopEntry(Drain& drain,
ByteSpan buffer,
Request request,
uint32_t& drop_count_out,
uint32_t& ingress_drop_count_out,
uint32_t& entry_sequence_id_out)
// Notifies attached listeners of new entries or an updated drop count.
void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
Drain oldest_entry_drain_ PW_GUARDED_BY(lock_);
uint32_t sequence_id_ PW_GUARDED_BY(lock_);
uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_);
LockType lock_;
} // namespace multisink
} // namespace pw