pw_ring_buffer: Support multi-reader ring buffer

Add support for single-writer multi-reader ring buffers.

Change-Id: I57510836639c36d010612850fd55047c5766f80e
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/32321
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_ring_buffer/BUILD.gn b/pw_ring_buffer/BUILD.gn
index 2fd0f1e..f5ac741 100644
--- a/pw_ring_buffer/BUILD.gn
+++ b/pw_ring_buffer/BUILD.gn
@@ -30,7 +30,10 @@
   ]
   sources = [ "prefixed_entry_ring_buffer.cc" ]
   public = [ "public/pw_ring_buffer/prefixed_entry_ring_buffer.h" ]
-  deps = [ "$dir_pw_varint" ]
+  deps = [
+    "$dir_pw_assert:pw_assert",
+    "$dir_pw_varint",
+  ]
 }
 
 pw_test_group("tests") {
diff --git a/pw_ring_buffer/prefixed_entry_ring_buffer.cc b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
index 82f76d0..400e7d0 100644
--- a/pw_ring_buffer/prefixed_entry_ring_buffer.cc
+++ b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
@@ -14,22 +14,27 @@
 
 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
 
+#include <algorithm>
 #include <cstring>
 
+#include "pw_assert/light.h"
 #include "pw_varint/varint.h"
 
 namespace pw {
 namespace ring_buffer {
 
 using std::byte;
+using Reader = PrefixedEntryRingBufferMulti::Reader;
 
-void PrefixedEntryRingBuffer::Clear() {
-  read_idx_ = 0;
+void PrefixedEntryRingBufferMulti::Clear() {
   write_idx_ = 0;
-  entry_count_ = 0;
+  for (Reader& reader : readers_) {
+    reader.read_idx = 0;
+    reader.entry_count = 0;
+  }
 }
 
-Status PrefixedEntryRingBuffer::SetBuffer(std::span<byte> buffer) {
+Status PrefixedEntryRingBufferMulti::SetBuffer(std::span<byte> buffer) {
   if ((buffer.data() == nullptr) ||  //
       (buffer.size_bytes() == 0) ||  //
       (buffer.size_bytes() > kMaxBufferBytes)) {
@@ -43,9 +48,35 @@
   return OkStatus();
 }
 
-Status PrefixedEntryRingBuffer::InternalPushBack(std::span<const byte> data,
-                                                 byte user_preamble_data,
-                                                 bool drop_elements_if_needed) {
+Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
+  if (reader.buffer != nullptr) {
+    return Status::InvalidArgument();
+  }
+  reader.buffer = this;
+
+  // Note that a newly attached reader sees the buffer as empty,
+  // and is not privy to entries pushed before being attached.
+  reader.read_idx = write_idx_;
+  reader.entry_count = 0;
+  readers_.push_back(reader);
+  return OkStatus();
+}
+
+Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
+  if (reader.buffer != this) {
+    return Status::InvalidArgument();
+  }
+  reader.buffer = nullptr;
+  reader.read_idx = 0;
+  reader.entry_count = 0;
+  readers_.remove(reader);
+  return OkStatus();
+}
+
+Status PrefixedEntryRingBufferMulti::InternalPushBack(
+    std::span<const byte> data,
+    byte user_preamble_data,
+    bool drop_elements_if_needed) {
   if (buffer_ == nullptr) {
     return Status::FailedPrecondition();
   }
@@ -66,7 +97,7 @@
     // PushBack() case: evict items as needed.
     // Drop old entries until we have space for the new entry.
     while (RawAvailableBytes() < total_write_bytes) {
-      PopFront();
+      InternalPopFrontAll();
     }
   } else if (RawAvailableBytes() < total_write_bytes) {
     // TryPushBack() case: don't evict items.
@@ -79,7 +110,11 @@
   }
   RawWrite(std::span(varint_buf, varint_bytes));
   RawWrite(data);
-  entry_count_++;
+
+  // Update all readers of the new count.
+  for (Reader& reader : readers_) {
+    reader.entry_count++;
+  }
   return OkStatus();
 }
 
@@ -95,40 +130,45 @@
   };
 }
 
-Status PrefixedEntryRingBuffer::PeekFront(std::span<byte> data,
-                                          size_t* bytes_read) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
+                                                       std::span<byte> data,
+                                                       size_t* bytes_read) {
   *bytes_read = 0;
-  return InternalRead(GetOutput(data, bytes_read), false);
+  return InternalRead(reader, GetOutput(data, bytes_read), false);
 }
 
-Status PrefixedEntryRingBuffer::PeekFront(ReadOutput output) {
-  return InternalRead(output, false);
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
+                                                       ReadOutput output) {
+  return InternalRead(reader, output, false);
 }
 
-Status PrefixedEntryRingBuffer::PeekFrontWithPreamble(std::span<byte> data,
-                                                      size_t* bytes_read) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
+    Reader& reader, std::span<byte> data, size_t* bytes_read) {
   *bytes_read = 0;
-  return InternalRead(GetOutput(data, bytes_read), true);
+  return InternalRead(reader, GetOutput(data, bytes_read), true);
 }
 
-Status PrefixedEntryRingBuffer::PeekFrontWithPreamble(ReadOutput output) {
-  return InternalRead(output, true);
+Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
+    Reader& reader, ReadOutput output) {
+  return InternalRead(reader, output, true);
 }
 
 // T should be similar to Status (*read_output)(std::span<const byte>)
 template <typename T>
-Status PrefixedEntryRingBuffer::InternalRead(T read_output, bool get_preamble) {
+Status PrefixedEntryRingBufferMulti::InternalRead(Reader& reader,
+                                                  T read_output,
+                                                  bool get_preamble) {
   if (buffer_ == nullptr) {
     return Status::FailedPrecondition();
   }
-  if (EntryCount() == 0) {
+  if (reader.entry_count == 0) {
     return Status::OutOfRange();
   }
 
   // Figure out where to start reading (wrapped); accounting for preamble.
-  EntryInfo info = FrontEntryInfo();
+  EntryInfo info = FrontEntryInfo(reader);
   size_t read_bytes = info.data_bytes;
-  size_t data_read_idx = read_idx_;
+  size_t data_read_idx = reader.read_idx;
   if (get_preamble) {
     read_bytes += info.preamble_bytes;
   } else {
@@ -148,67 +188,133 @@
   return status;
 }
 
-Status PrefixedEntryRingBuffer::PopFront() {
-  if (buffer_ == nullptr) {
-    return Status::FailedPrecondition();
+void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
+  // Forcefully pop all readers. Find the slowest reader, which must have
+  // the highest entry count, then pop all readers that have the same count.
+  size_t entry_count = GetSlowestReader().entry_count;
+  // If no readers have any entries left to read, return immediately.
+  PW_DASSERT(entry_count != 0);
+  // Otherwise, pop the readers that have the largest value.
+  for (Reader& reader : readers_) {
+    if (reader.entry_count == entry_count) {
+      reader.PopFront();
+    }
   }
-  if (EntryCount() == 0) {
-    return Status::OutOfRange();
-  }
-
-  // Advance the read pointer past the front entry to the next one.
-  EntryInfo info = FrontEntryInfo();
-  size_t entry_bytes = info.preamble_bytes + info.data_bytes;
-  read_idx_ = IncrementIndex(read_idx_, entry_bytes);
-  entry_count_--;
-  return OkStatus();
 }
 
-Status PrefixedEntryRingBuffer::Dering() {
-  if (buffer_ == nullptr) {
+Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() {
+  // Readers are guaranteed to be before the writer pointer (the class enforces
+  // this on every read/write operation that forces the write pointer ahead of
+  // an existing reader). To determine the slowest reader, we consider three
+  // scenarios:
+  //
+  // In all below cases, R1 is the slowest reader:
+  // [[R1 R2 R3 WH]] => Right-hand writer, slowest reader is left-most reader.
+  // [[WH R1 R2 R3]] => Left-hand writer, slowest reader is left-most reader.
+  // [[R3 WH R1 R2]] => Middle-writer, slowest reader is left-most reader after
+  // writer.
+  //
+  // Formally, choose the left-most reader after the writer (ex.2,3), but if
+  // that doesn't exist, choose the left-most reader before the writer (ex.1).
+  PW_DASSERT(readers_.size() > 0);
+  Reader* slowest_reader_after_writer = nullptr;
+  Reader* slowest_reader_before_writer = nullptr;
+  for (Reader& reader : readers_) {
+    if (reader.read_idx < write_idx_) {
+      if (!slowest_reader_before_writer ||
+          reader.read_idx < slowest_reader_before_writer->read_idx) {
+        slowest_reader_before_writer = &reader;
+      }
+    } else {
+      if (!slowest_reader_after_writer ||
+          reader.read_idx < slowest_reader_after_writer->read_idx) {
+        slowest_reader_after_writer = &reader;
+      }
+    }
+  }
+  return *(slowest_reader_after_writer ? slowest_reader_after_writer
+                                       : slowest_reader_before_writer);
+}
+
+Status PrefixedEntryRingBufferMulti::Dering() {
+  if (buffer_ == nullptr || readers_.size() == 0) {
     return Status::FailedPrecondition();
   }
+
   // Check if by luck we're already deringed.
-  if (read_idx_ == 0) {
+  Reader* slowest_reader = &GetSlowestReader();
+  if (slowest_reader->read_idx == 0) {
     return OkStatus();
   }
 
   auto buffer_span = std::span(buffer_, buffer_bytes_);
-  std::rotate(
-      buffer_span.begin(), buffer_span.begin() + read_idx_, buffer_span.end());
+  std::rotate(buffer_span.begin(),
+              buffer_span.begin() + slowest_reader->read_idx,
+              buffer_span.end());
 
   // If the new index is past the end of the buffer,
   // alias it back (wrap) to the start of the buffer.
-  if (write_idx_ < read_idx_) {
+  if (write_idx_ < slowest_reader->read_idx) {
     write_idx_ += buffer_bytes_;
   }
-  write_idx_ -= read_idx_;
-  read_idx_ = 0;
+  write_idx_ -= slowest_reader->read_idx;
+
+  for (Reader& reader : readers_) {
+    if (&reader == slowest_reader) {
+      continue;
+    }
+    if (reader.read_idx < slowest_reader->read_idx) {
+      reader.read_idx += buffer_bytes_;
+    }
+    reader.read_idx -= slowest_reader->read_idx;
+  }
+
+  slowest_reader->read_idx = 0;
   return OkStatus();
 }
 
-size_t PrefixedEntryRingBuffer::FrontEntryDataSizeBytes() {
-  if (EntryCount() == 0) {
-    return 0;
+Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
+  if (buffer_ == nullptr) {
+    return Status::FailedPrecondition();
   }
-  return FrontEntryInfo().data_bytes;
+  if (reader.entry_count == 0) {
+    return Status::OutOfRange();
+  }
+
+  // Advance the read pointer past the front entry to the next one.
+  EntryInfo info = FrontEntryInfo(reader);
+  size_t entry_bytes = info.preamble_bytes + info.data_bytes;
+  size_t prev_read_idx = reader.read_idx;
+  reader.read_idx = IncrementIndex(prev_read_idx, entry_bytes);
+  reader.entry_count--;
+  return OkStatus();
 }
 
-size_t PrefixedEntryRingBuffer::FrontEntryTotalSizeBytes() {
-  if (EntryCount() == 0) {
+size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
+    Reader& reader) {
+  if (reader.entry_count == 0) {
     return 0;
   }
-  EntryInfo info = FrontEntryInfo();
+  return FrontEntryInfo(reader).data_bytes;
+}
+
+size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
+    Reader& reader) {
+  if (reader.entry_count == 0) {
+    return 0;
+  }
+  EntryInfo info = FrontEntryInfo(reader);
   return info.preamble_bytes + info.data_bytes;
 }
 
-PrefixedEntryRingBuffer::EntryInfo PrefixedEntryRingBuffer::FrontEntryInfo() {
+PrefixedEntryRingBufferMulti::EntryInfo
+PrefixedEntryRingBufferMulti::FrontEntryInfo(Reader& reader) {
   // Entry headers consists of: (optional prefix byte, varint size, data...)
 
   // Read the entry header; extract the varint and it's size in bytes.
   byte varint_buf[kMaxEntryPreambleBytes];
   RawRead(varint_buf,
-          IncrementIndex(read_idx_, user_preamble_ ? 1 : 0),
+          IncrementIndex(reader.read_idx, user_preamble_ ? 1 : 0),
           kMaxEntryPreambleBytes);
   uint64_t entry_size;
   size_t varint_size = varint::Decode(varint_buf, &entry_size);
@@ -221,20 +327,33 @@
 
 // Comparisons ordered for more probable early exits, assuming the reader is
 // not far behind the writer compared to the size of the ring.
-size_t PrefixedEntryRingBuffer::RawAvailableBytes() {
+size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() {
+  // Compute slowest reader.
+  // TODO: Alternatively, the slowest reader could be actively mantained on
+  // every read operation, but reads are more likely than writes.
+  if (readers_.size() == 0) {
+    return buffer_bytes_;
+  }
+
+  size_t read_idx = GetSlowestReader().read_idx;
   // Case: Not wrapped.
-  if (read_idx_ < write_idx_) {
-    return buffer_bytes_ - (write_idx_ - read_idx_);
+  if (read_idx < write_idx_) {
+    return buffer_bytes_ - (write_idx_ - read_idx);
   }
   // Case: Wrapped
-  if (read_idx_ > write_idx_) {
-    return read_idx_ - write_idx_;
+  if (read_idx > write_idx_) {
+    return read_idx - write_idx_;
   }
   // Case: Matched read and write heads; empty or full.
-  return entry_count_ ? 0 : buffer_bytes_;
+  for (Reader& reader : readers_) {
+    if (reader.read_idx == read_idx && reader.entry_count != 0) {
+      return 0;
+    }
+  }
+  return buffer_bytes_;
 }
 
-void PrefixedEntryRingBuffer::RawWrite(std::span<const std::byte> source) {
+void PrefixedEntryRingBufferMulti::RawWrite(std::span<const std::byte> source) {
   // Write until the end of the source or the backing buffer.
   size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
   size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
@@ -248,9 +367,9 @@
   write_idx_ = IncrementIndex(write_idx_, source.size());
 }
 
-void PrefixedEntryRingBuffer::RawRead(byte* destination,
-                                      size_t source_idx,
-                                      size_t length_bytes) {
+void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
+                                           size_t source_idx,
+                                           size_t length_bytes) {
   // Read the pre-wrap bytes.
   size_t bytes_until_wrap = buffer_bytes_ - source_idx;
   size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
@@ -262,7 +381,8 @@
   }
 }
 
-size_t PrefixedEntryRingBuffer::IncrementIndex(size_t index, size_t count) {
+size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
+                                                    size_t count) {
   // Note: This doesn't use modulus (%) since the branch is cheaper, and we
   // guarantee that count will never be greater than buffer_bytes_.
   index += count;
diff --git a/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc b/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
index 18382d3..9b3f15f 100644
--- a/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
+++ b/pw_ring_buffer/prefixed_entry_ring_buffer_test.cc
@@ -370,7 +370,7 @@
 TEST(PrefixedEntryRingBuffer, DeringNoPreload) { DeringTest(false); }
 
 template <typename T>
-Status PushBack(PrefixedEntryRingBuffer& ring, T element) {
+Status PushBack(PrefixedEntryRingBufferMulti& ring, T element) {
   union {
     std::array<byte, sizeof(element)> buffer;
     T item;
@@ -380,7 +380,7 @@
 }
 
 template <typename T>
-Status TryPushBack(PrefixedEntryRingBuffer& ring, T element) {
+Status TryPushBack(PrefixedEntryRingBufferMulti& ring, T element) {
   union {
     std::array<byte, sizeof(element)> buffer;
     T item;
@@ -390,13 +390,13 @@
 }
 
 template <typename T>
-T PeekFront(PrefixedEntryRingBuffer& ring) {
+T PeekFront(PrefixedEntryRingBufferMulti::Reader& reader) {
   union {
     std::array<byte, sizeof(T)> buffer;
     T item;
   } aliased;
   size_t bytes_read = 0;
-  PW_CHECK_OK(ring.PeekFront(aliased.buffer, &bytes_read));
+  PW_CHECK_OK(reader.PeekFront(aliased.buffer, &bytes_read));
   PW_CHECK_INT_EQ(bytes_read, sizeof(T));
   return aliased.item;
 }
@@ -432,6 +432,160 @@
   EXPECT_EQ(PeekFront<int>(ring), 100);
 }
 
+TEST(PrefixedEntryRingBufferMulti, TryPushBack) {
+  PrefixedEntryRingBufferMulti ring;
+  byte test_buffer[kTestBufferSize];
+  EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+  PrefixedEntryRingBufferMulti::Reader fast_reader;
+  PrefixedEntryRingBufferMulti::Reader slow_reader;
+
+  EXPECT_EQ(ring.AttachReader(fast_reader), OkStatus());
+  EXPECT_EQ(ring.AttachReader(slow_reader), OkStatus());
+
+  // Fill up the ring buffer with an increasing count.
+  int total_items = 0;
+  while (true) {
+    Status status = TryPushBack<int>(ring, total_items);
+    if (status.ok()) {
+      total_items++;
+    } else {
+      EXPECT_EQ(status, Status::ResourceExhausted());
+      break;
+    }
+  }
+
+  // Run fast reader twice as fast as the slow reader.
+  for (int i = 0; i < total_items; ++i) {
+    if (i % 2 == 0) {
+      EXPECT_EQ(PeekFront<int>(slow_reader), i / 2);
+      EXPECT_EQ(slow_reader.PopFront(), OkStatus());
+    }
+    EXPECT_EQ(PeekFront<int>(fast_reader), i);
+    EXPECT_EQ(fast_reader.PopFront(), OkStatus());
+  }
+  EXPECT_EQ(fast_reader.PopFront(), Status::OutOfRange());
+
+  // Fill the buffer again, expect that the fast reader
+  // only sees half the entries as the slow reader.
+  size_t max_items = total_items;
+  while (true) {
+    Status status = TryPushBack<int>(ring, total_items);
+    if (status.ok()) {
+      total_items++;
+    } else {
+      EXPECT_EQ(status, Status::ResourceExhausted());
+      break;
+    }
+  }
+  EXPECT_EQ(slow_reader.EntryCount(), max_items);
+  EXPECT_EQ(fast_reader.EntryCount(), total_items - max_items);
+
+  for (int i = total_items - max_items; i < total_items; ++i) {
+    EXPECT_EQ(PeekFront<int>(slow_reader), i);
+    EXPECT_EQ(slow_reader.PopFront(), OkStatus());
+    if (static_cast<size_t>(i) >= max_items) {
+      EXPECT_EQ(PeekFront<int>(fast_reader), i);
+      EXPECT_EQ(fast_reader.PopFront(), OkStatus());
+    }
+  }
+  EXPECT_EQ(slow_reader.PopFront(), Status::OutOfRange());
+  EXPECT_EQ(fast_reader.PopFront(), Status::OutOfRange());
+}
+
+TEST(PrefixedEntryRingBufferMulti, PushBack) {
+  PrefixedEntryRingBufferMulti ring;
+  byte test_buffer[kTestBufferSize];
+  EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+  PrefixedEntryRingBufferMulti::Reader fast_reader;
+  PrefixedEntryRingBufferMulti::Reader slow_reader;
+
+  EXPECT_EQ(ring.AttachReader(fast_reader), OkStatus());
+  EXPECT_EQ(ring.AttachReader(slow_reader), OkStatus());
+
+  // Fill up the ring buffer with an increasing count.
+  size_t total_items = 0;
+  while (true) {
+    Status status = TryPushBack<uint32_t>(ring, total_items);
+    if (status.ok()) {
+      total_items++;
+    } else {
+      EXPECT_EQ(status, Status::ResourceExhausted());
+      break;
+    }
+  }
+  EXPECT_EQ(slow_reader.EntryCount(), total_items);
+
+  // The following test:
+  //  - Moves the fast reader forward by one entry.
+  //  - Writes a single entry that is guaranteed to be larger than the size of a
+  //    single entry in the buffer (uint64_t entry > uint32_t entry).
+  //  - Checks to see that both readers were moved forward.
+  EXPECT_EQ(fast_reader.PopFront(), OkStatus());
+  EXPECT_EQ(PushBack<uint64_t>(ring, 5u), OkStatus());
+  // The readers have moved past values 0 and 1.
+  EXPECT_EQ(PeekFront<uint32_t>(slow_reader), 2u);
+  EXPECT_EQ(PeekFront<uint32_t>(fast_reader), 2u);
+  // The readers have lost two entries, but gained an entry.
+  EXPECT_EQ(slow_reader.EntryCount(), total_items - 1);
+  EXPECT_EQ(fast_reader.EntryCount(), total_items - 1);
+}
+
+TEST(PrefixedEntryRingBufferMulti, ReaderAddRemove) {
+  PrefixedEntryRingBufferMulti ring;
+  byte test_buffer[kTestBufferSize];
+  EXPECT_EQ(ring.SetBuffer(test_buffer), OkStatus());
+
+  PrefixedEntryRingBufferMulti::Reader reader;
+  PrefixedEntryRingBufferMulti::Reader transient_reader;
+
+  EXPECT_EQ(ring.AttachReader(reader), OkStatus());
+
+  // Fill up the ring buffer with a constant value.
+  int total_items = 0;
+  while (true) {
+    Status status = TryPushBack<int>(ring, 5);
+    if (status.ok()) {
+      total_items++;
+    } else {
+      EXPECT_EQ(status, Status::ResourceExhausted());
+      break;
+    }
+  }
+  EXPECT_EQ(reader.EntryCount(), static_cast<size_t>(total_items));
+
+  // Add new reader after filling the buffer.
+  EXPECT_EQ(ring.AttachReader(transient_reader), OkStatus());
+  EXPECT_EQ(transient_reader.EntryCount(), 0u);
+
+  // Push a value into the buffer and confirm the transient reader
+  // sees that value, and only that value.
+  EXPECT_EQ(PushBack<int>(ring, 1), OkStatus());
+  EXPECT_EQ(PeekFront<int>(transient_reader), 1);
+  EXPECT_EQ(transient_reader.EntryCount(), 1u);
+
+  // Confirm that detaching and attaching a reader resets its state.
+  EXPECT_EQ(ring.DetachReader(transient_reader), OkStatus());
+  EXPECT_EQ(ring.AttachReader(transient_reader), OkStatus());
+  EXPECT_EQ(transient_reader.EntryCount(), 0u);
+}
+
+TEST(PrefixedEntryRingBufferMulti, SingleBufferPerReader) {
+  PrefixedEntryRingBufferMulti ring_one;
+  PrefixedEntryRingBufferMulti ring_two;
+  byte test_buffer[kTestBufferSize];
+  EXPECT_EQ(ring_one.SetBuffer(test_buffer), OkStatus());
+
+  PrefixedEntryRingBufferMulti::Reader reader;
+  EXPECT_EQ(ring_one.AttachReader(reader), OkStatus());
+  EXPECT_EQ(ring_two.AttachReader(reader), Status::InvalidArgument());
+
+  EXPECT_EQ(ring_one.DetachReader(reader), OkStatus());
+  EXPECT_EQ(ring_two.AttachReader(reader), OkStatus());
+  EXPECT_EQ(ring_one.AttachReader(reader), Status::InvalidArgument());
+}
+
 }  // namespace
 }  // namespace ring_buffer
 }  // namespace pw
diff --git a/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h b/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
index e164834..a627690 100644
--- a/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
+++ b/pw_ring_buffer/public/pw_ring_buffer/prefixed_entry_ring_buffer.h
@@ -16,6 +16,7 @@
 #include <cstddef>
 #include <span>
 
+#include "pw_containers/intrusive_list.h"
 #include "pw_status/status.h"
 
 namespace pw {
@@ -25,21 +26,114 @@
 // produces a buffer entry. Each entry consists of a preamble followed by an
 // arbitrary length data chunk. The preamble is comprised of an optional user
 // preamble byte and an always present varint. The varint encodes the number of
-// bytes in the data chunk.
+// bytes in the data chunk. This is a FIFO queue, with the oldest entries at
+// the 'front' (to be processed by readers) and the newest entries at the 'back'
+// (where the writer pushes to).
 //
-// The ring buffer holds the most recent entries stored in the buffer. Once
-// filled to capacity, incoming entries bump out the oldest entries to make
-// room. Entries are internally wrapped around as needed.
-class PrefixedEntryRingBuffer {
+// The ring buffer supports multiple readers, which can be attached/detached
+// from the buffer. Each reader has its own read pointer and can peek and pop
+// the entry at the head. Entries are not bumped out from the buffer until all
+// readers have moved past that entry, or if the buffer is at capacity and space
+// is needed to push a new entry. When making space, the buffer will push slow
+// readers forward to the new oldest entry. Entries are internally wrapped
+// around as needed.
+class PrefixedEntryRingBufferMulti {
  public:
   typedef Status (*ReadOutput)(std::span<const std::byte>);
 
-  PrefixedEntryRingBuffer(bool user_preamble = false)
+  // A reader that provides a single-reader interface into the multi-reader ring
+  // buffer it has been attached to via AttachReader(). Readers maintain their
+  // read position in the ring buffer as well as the remaining count of entries
+  // from that position. Readers are only able to consume entries that were
+  // pushed after the attach operation.
+  //
+  // Readers can peek and pop entries similar to the single-reader interface.
+  // When popping entries, although the reader moves forward and drops the
+  // entry, the entry is not removed from the ring buffer until all other
+  // attached readers have moved past that entry.
+  //
+  // When the attached ring buffer needs to make space, it may push the reader
+  // index forward. Users of this class should consider the possibility of data
+  // loss if they read slower than the writer.
+  class Reader : public IntrusiveList<Reader>::Item {
+   public:
+    Reader() : buffer(nullptr), read_idx(0), entry_count(0) {}
+
+    // TODO: Add locking to the internal functions. Who owns the lock? This
+    // class? Does this class need a lock if it's not a multi-reader? (One
+    // doesn't exist today but presumably nothing prevents push + pop operations
+    // from happening on two different threads).
+
+    // Read the oldest stored data chunk of data from the ring buffer to
+    // the provided destination std::span. The number of bytes read is written
+    // to bytes_read
+    //
+    // Return values:
+    // OK - Data successfully read from the ring buffer.
+    // FAILED_PRECONDITION - Buffer not initialized.
+    // OUT_OF_RANGE - No entries in ring buffer to read.
+    // RESOURCE_EXHAUSTED - Destination data std::span was smaller number of
+    // bytes than the data size of the data chunk being read.  Available
+    // destination bytes were filled, remaining bytes of the data chunk were
+    // ignored.
+    Status PeekFront(std::span<std::byte> data, size_t* bytes_read) {
+      return buffer->InternalPeekFront(*this, data, bytes_read);
+    }
+
+    Status PeekFront(ReadOutput output) {
+      return buffer->InternalPeekFront(*this, output);
+    }
+
+    // Same as PeekFront but includes the entry's preamble of optional user
+    // value and the varint of the data size.
+    Status PeekFrontWithPreamble(std::span<std::byte> data,
+                                 size_t* bytes_read) {
+      return buffer->InternalPeekFrontWithPreamble(*this, data, bytes_read);
+    }
+
+    Status PeekFrontWithPreamble(ReadOutput output) {
+      return buffer->InternalPeekFrontWithPreamble(*this, output);
+    }
+
+    // Pop and discard the oldest stored data chunk of data from the ring
+    // buffer.
+    //
+    // Return values:
+    // OK - Data successfully read from the ring buffer.
+    // FAILED_PRECONDITION - Buffer not initialized.
+    // OUT_OF_RANGE - No entries in ring buffer to pop.
+    Status PopFront() { return buffer->InternalPopFront(*this); }
+
+    // Get the size in bytes of the next chunk, not including preamble, to be
+    // read.
+    size_t FrontEntryDataSizeBytes() {
+      return buffer->InternalFrontEntryDataSizeBytes(*this);
+    }
+
+    // Get the size in bytes of the next chunk, including preamble and data
+    // chunk, to be read.
+    size_t FrontEntryTotalSizeBytes() {
+      return buffer->InternalFrontEntryTotalSizeBytes(*this);
+    }
+
+    // Get the number of variable-length entries currently in the ring buffer.
+    //
+    // Return value:
+    // Entry count.
+    size_t EntryCount() { return entry_count; }
+
+   protected:
+    friend PrefixedEntryRingBufferMulti;
+
+    PrefixedEntryRingBufferMulti* buffer;
+    size_t read_idx;
+    size_t entry_count;
+  };
+
+  PrefixedEntryRingBufferMulti(bool user_preamble = false)
       : buffer_(nullptr),
         buffer_bytes_(0),
         write_idx_(0),
-        read_idx_(0),
-        entry_count_(0),
         user_preamble_(user_preamble) {}
 
   // Set the raw buffer to be used by the ring buffer.
@@ -49,6 +143,23 @@
   // INVALID_ARGUMENT - Argument was nullptr, size zero, or too large.
   Status SetBuffer(std::span<std::byte> buffer);
 
+  // Attach reader to the ring buffer. Readers can only be attached to one
+  // ring buffer at a time.
+  //
+  // Return values:
+  // OK - Successfully configured reader for ring buffer.
+  // INVALID_ARGUMENT - Argument was already attached to another ring buffer.
+  Status AttachReader(Reader& reader);
+
+  // Detach reader from the ring buffer. Readers can only be detached if they
+  // were previously attached.
+  //
+  // Return values:
+  // OK - Successfully removed reader for ring buffer.
+  // INVALID_ARGUMENT - Argument was not previously attached to this ring
+  // buffer.
+  Status DetachReader(Reader& reader);
+
   // Removes all data from the ring buffer.
   void Clear();
 
@@ -88,6 +199,20 @@
     return InternalPushBack(data, user_preamble_data, false);
   }
 
+  // Get the size in bytes of all the current entries in the ring buffer,
+  // including preamble and data chunk.
+  size_t TotalUsedBytes() { return buffer_bytes_ - RawAvailableBytes(); }
+
+  // Dering the buffer by reordering entries internally in the buffer by
+  // rotating to have the oldest entry is at the lowest address/index with
+  // newest entry at the highest address.
+  //
+  // Return values:
+  // OK - Buffer data successfully deringed.
+  // FAILED_PRECONDITION - Buffer not initialized, or no readers attached.
+  Status Dering();
+
+ protected:
   // Read the oldest stored data chunk of data from the ring buffer to
   // the provided destination std::span. The number of bytes read is written to
   // bytes_read
@@ -99,15 +224,17 @@
   // RESOURCE_EXHAUSTED - Destination data std::span was smaller number of bytes
   // than the data size of the data chunk being read.  Available destination
   // bytes were filled, remaining bytes of the data chunk were ignored.
-  Status PeekFront(std::span<std::byte> data, size_t* bytes_read);
-
-  Status PeekFront(ReadOutput output);
+  Status InternalPeekFront(Reader& reader,
+                           std::span<std::byte> data,
+                           size_t* bytes_read);
+  Status InternalPeekFront(Reader& reader, ReadOutput output);
 
   // Same as Read but includes the entry's preamble of optional user value and
   // the varint of the data size
-  Status PeekFrontWithPreamble(std::span<std::byte> data, size_t* bytes_read);
-
-  Status PeekFrontWithPreamble(ReadOutput output);
+  Status InternalPeekFrontWithPreamble(Reader& reader,
+                                       std::span<std::byte> data,
+                                       size_t* bytes_read);
+  Status InternalPeekFrontWithPreamble(Reader& reader, ReadOutput output);
 
   // Pop and discard the oldest stored data chunk of data from the ring buffer.
   //
@@ -115,34 +242,20 @@
   // OK - Data successfully read from the ring buffer.
   // FAILED_PRECONDITION - Buffer not initialized.
   // OUT_OF_RANGE - No entries in ring buffer to pop.
-  Status PopFront();
-
-  // Dering the buffer by reordering entries internally in the buffer by
-  // rotating to have the oldest entry is at the lowest address/index with
-  // newest entry at the highest address.
-  //
-  // Return values:
-  // OK - Buffer data successfully deringed.
-  // FAILED_PRECONDITION - Buffer not initialized.
-  Status Dering();
-
-  // Get the number of variable-length entries currently in the ring buffer.
-  //
-  // Return value:
-  // Entry count.
-  size_t EntryCount() { return entry_count_; }
-
-  // Get the size in bytes of all the current entries in the ring buffer,
-  // including preamble and data chunk.
-  size_t TotalUsedBytes() { return buffer_bytes_ - RawAvailableBytes(); }
+  Status InternalPopFront(Reader& reader);
 
   // Get the size in bytes of the next chunk, not including preamble, to be
   // read.
-  size_t FrontEntryDataSizeBytes();
+  size_t InternalFrontEntryDataSizeBytes(Reader& reader);
 
   // Get the size in bytes of the next chunk, including preamble and data
   // chunk, to be read.
-  size_t FrontEntryTotalSizeBytes();
+  size_t InternalFrontEntryTotalSizeBytes(Reader& reader);
+
+  // Internal version of Read used by all the public interface versions. T
+  // should be of type ReadOutput.
+  template <typename T>
+  Status InternalRead(Reader& reader, T read_output, bool get_preamble);
 
  private:
   struct EntryInfo {
@@ -150,20 +263,23 @@
     size_t data_bytes;
   };
 
-  // Internal version of Read used by all the public interface versions. T
-  // should be of type ReadOutput.
-  template <typename T>
-  Status InternalRead(T read_output, bool get_preamble);
-
   // Push back implementation, which optionally discards front elements to fit
   // the incoming element.
   Status InternalPushBack(std::span<const std::byte> data,
                           std::byte user_preamble_data,
                           bool pop_front_if_needed);
 
+  // Internal function to pop all of the slowest readers. This function may pop
+  // multiple readers if multiple are slow.
+  void InternalPopFrontAll();
+
+  // Returns the slowest reader in the list. This function requires that at
+  // least one reader is attached.
+  Reader& GetSlowestReader();
+
   // Get info struct with the size of the preamble and data chunk for the next
   // entry to be read.
-  EntryInfo FrontEntryInfo();
+  EntryInfo FrontEntryInfo(Reader& reader);
 
   // Get the raw number of available bytes free in the ring buffer. This is
   // not available bytes for data, since there is a variable size preamble for
@@ -186,10 +302,11 @@
   size_t buffer_bytes_;
 
   size_t write_idx_;
-  size_t read_idx_;
-  size_t entry_count_;
   const bool user_preamble_;
 
+  // List of attached readers.
+  IntrusiveList<Reader> readers_;
+
   // Worst case size for the variable-sized preable that is prepended to
   // each entry.
   static constexpr size_t kMaxEntryPreambleBytes = sizeof(size_t) + 1;
@@ -200,5 +317,14 @@
       std::numeric_limits<size_t>::max() / 2;
 };
 
+class PrefixedEntryRingBuffer : public PrefixedEntryRingBufferMulti,
+                                public PrefixedEntryRingBufferMulti::Reader {
+ public:
+  PrefixedEntryRingBuffer(bool user_preamble = false)
+      : PrefixedEntryRingBufferMulti(user_preamble) {
+    AttachReader(*this);
+  }
+};
+
 }  // namespace ring_buffer
 }  // namespace pw