pw_multisink: Send notifications to listeners

Listeners can now be attached to multisinks, which are notified when
entries are pushed into the ring buffer or dropped on ingress. Listener
implementations might use this to schedule tasks to fetch entries from
the corresponding drains.

No-Docs-Update-Reason: Doc updates coming in follow-up change.
Change-Id: I16be7bf5792c829bcda38edce5a5be7b5a0f373d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/39463
Commit-Queue: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_log_sink/log_sink_test.cc b/pw_log_sink/log_sink_test.cc
index 52f8bbe..05ee925 100644
--- a/pw_log_sink/log_sink_test.cc
+++ b/pw_log_sink/log_sink_test.cc
@@ -20,7 +20,6 @@
 #include "pw_log/levels.h"
 #include "pw_log_proto/log.pwpb.h"
 #include "pw_log_sink/multisink_adapter.h"
-#include "pw_multisink/drain.h"
 #include "pw_multisink/multisink.h"
 #include "pw_protobuf/decoder.h"
 
@@ -138,7 +137,7 @@
   std::byte buffer[kMultiSinkBufferSize];
   std::byte entry_buffer[kMultiSinkBufferSize];
   pw::multisink::MultiSink multisink(buffer);
-  pw::multisink::Drain drain;
+  pw::multisink::MultiSink::Drain drain;
   MultiSinkAdapter multisink_adapter(multisink);
 
   multisink.AttachDrain(drain);
diff --git a/pw_multisink/BUILD b/pw_multisink/BUILD
index f7840a2..a27edf1 100644
--- a/pw_multisink/BUILD
+++ b/pw_multisink/BUILD
@@ -25,18 +25,17 @@
 pw_cc_library(
     name = "pw_multisink",
     srcs = [
-        "drain.cc",
         "multisink.cc",
     ],
     hdrs = [
         "public/pw_multisink/config.h",
-        "public/pw_multisink/drain.h",
         "public/pw_multisink/multisink.h",
     ],
     includes = ["public"],
     deps = [
         "//pw_assert",
         "//pw_bytes",
+        "//pw_containers",
         "//pw_sync:interrupt_spin_lock",
         "//pw_sync:lock_annotations",
         "//pw_sync:mutex",
diff --git a/pw_multisink/BUILD.gn b/pw_multisink/BUILD.gn
index dac134c..20d5c24 100644
--- a/pw_multisink/BUILD.gn
+++ b/pw_multisink/BUILD.gn
@@ -27,11 +27,11 @@
   public_configs = [ ":default_config" ]
   public = [
     "public/pw_multisink/config.h",
-    "public/pw_multisink/drain.h",
     "public/pw_multisink/multisink.h",
   ]
   public_deps = [
     "$dir_pw_bytes",
+    "$dir_pw_containers",
     "$dir_pw_result",
     "$dir_pw_ring_buffer",
     "$dir_pw_status",
@@ -43,10 +43,7 @@
     "$dir_pw_assert",
     "$dir_pw_varint",
   ]
-  sources = [
-    "drain.cc",
-    "multisink.cc",
-  ]
+  sources = [ "multisink.cc" ]
 }
 
 pw_doc_group("docs") {
diff --git a/pw_multisink/drain.cc b/pw_multisink/drain.cc
deleted file mode 100644
index cc684f1..0000000
--- a/pw_multisink/drain.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2021 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#include "pw_multisink/drain.h"
-
-#include "pw_assert/check.h"
-
-namespace pw {
-namespace multisink {
-
-Result<ConstByteSpan> Drain::GetEntry(ByteSpan entry,
-                                      uint32_t& drop_count_out) {
-  PW_DCHECK_NOTNULL(multisink_);
-  uint32_t entry_sequence_id = 0;
-  drop_count_out = 0;
-
-  const Result<ConstByteSpan> result =
-      multisink_->GetEntry(*this, entry, entry_sequence_id);
-
-  // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the
-  // entry_sequence_id cannot be used for computation. Later invocations to
-  // GetEntry will permit readers to determine how far the sequence ID moved
-  // forward.
-  if (!result.ok() && !result.status().IsOutOfRange()) {
-    return result;
-  }
-
-  // Compute the drop count delta by comparing this entry's sequence ID with the
-  // last sequence ID this drain successfully read.
-  //
-  // The drop count calculation simply computes the difference between the
-  // current and last sequence IDs. Consecutive successful reads will always
-  // differ by one at least, so it is subtracted out. If the read was not
-  // successful, the difference is not adjusted.
-  drop_count_out =
-      entry_sequence_id - last_handled_sequence_id_ - (result.ok() ? 1 : 0);
-
-  last_handled_sequence_id_ = entry_sequence_id;
-  return result;
-}
-
-}  // namespace multisink
-}  // namespace pw
diff --git a/pw_multisink/multisink.cc b/pw_multisink/multisink.cc
index c1eca3c..b590957 100644
--- a/pw_multisink/multisink.cc
+++ b/pw_multisink/multisink.cc
@@ -16,46 +16,112 @@
 #include <cstring>
 
 #include "pw_assert/check.h"
-#include "pw_multisink/drain.h"
 #include "pw_status/try.h"
 #include "pw_varint/varint.h"
 
 namespace pw {
 namespace multisink {
 
+void MultiSink::HandleEntry(ConstByteSpan entry) {
+  std::lock_guard lock(lock_);
+  PW_DCHECK_OK(ring_buffer_.PushBack(entry, sequence_id_++));
+  NotifyListeners();
+}
+
+void MultiSink::HandleDropped(uint32_t drop_count) {
+  std::lock_guard lock(lock_);
+  sequence_id_ += drop_count;
+  NotifyListeners();
+}
+
 Result<ConstByteSpan> MultiSink::GetEntry(Drain& drain,
                                           ByteSpan buffer,
-                                          uint32_t& sequence_id_out) {
+                                          uint32_t& drop_count_out) {
   size_t bytes_read = 0;
+  uint32_t entry_sequence_id = 0;
+  drop_count_out = 0;
 
   std::lock_guard lock(lock_);
   PW_DCHECK_PTR_EQ(drain.multisink_, this);
 
-  const Status status =
-      drain.reader_.PeekFrontWithPreamble(buffer, sequence_id_out, bytes_read);
-  if (status.IsOutOfRange()) {
+  const Status peek_status = drain.reader_.PeekFrontWithPreamble(
+      buffer, entry_sequence_id, bytes_read);
+  if (peek_status.IsOutOfRange()) {
     // If the drain has caught up, report the last handled sequence ID so that
     // it can still process any dropped entries.
-    sequence_id_out = sequence_id_ - 1;
-    return status;
+    entry_sequence_id = sequence_id_ - 1;
+  } else if (!peek_status.ok()) {
+    // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the
+    // entry_entry_sequence_id cannot be used for computation. Later invocations
+    // to GetEntry will permit readers to determine how far the sequence ID
+    // moved forward.
+    return peek_status;
   }
+
+  // Compute the drop count delta by comparing this entry's sequence ID with the
+  // last sequence ID this drain successfully read.
+  //
+  // The drop count calculation simply computes the difference between the
+  // current and last sequence IDs. Consecutive successful reads will always
+  // differ by one at least, so it is subtracted out. If the read was not
+  // successful, the difference is not adjusted.
+  drop_count_out = entry_sequence_id - drain.last_handled_sequence_id_ -
+                   (peek_status.ok() ? 1 : 0);
+  drain.last_handled_sequence_id_ = entry_sequence_id;
+
+  // The Peek above may have failed due to OutOfRange, now that we've set the
+  // drop count see if we should return before attempting to pop.
+  if (peek_status.IsOutOfRange()) {
+    return peek_status;
+  }
+
+  // Success, pop the oldest entry!
   PW_CHECK(drain.reader_.PopFront().ok());
   return std::as_bytes(buffer.first(bytes_read));
 }
 
-Status MultiSink::AttachDrain(Drain& drain) {
+void MultiSink::AttachDrain(Drain& drain) {
   std::lock_guard lock(lock_);
   PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
   drain.multisink_ = this;
   drain.last_handled_sequence_id_ = sequence_id_ - 1;
-  return ring_buffer_.AttachReader(drain.reader_);
+  PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_));
 }
 
-Status MultiSink::DetachDrain(Drain& drain) {
+void MultiSink::DetachDrain(Drain& drain) {
   std::lock_guard lock(lock_);
   PW_DCHECK_PTR_EQ(drain.multisink_, this);
   drain.multisink_ = nullptr;
-  return ring_buffer_.DetachReader(drain.reader_);
+  PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_),
+              "The drain wasn't already attached.");
+}
+
+void MultiSink::AttachListener(Listener& listener) {
+  std::lock_guard lock(lock_);
+  listeners_.push_back(listener);
+}
+
+void MultiSink::DetachListener(Listener& listener) {
+  std::lock_guard lock(lock_);
+  [[maybe_unused]] bool was_detached = listeners_.remove(listener);
+  PW_DCHECK(was_detached, "The listener was already attached.");
+}
+
+void MultiSink::Clear() {
+  std::lock_guard lock(lock_);
+  ring_buffer_.Clear();
+}
+
+void MultiSink::NotifyListeners() {
+  for (auto& listener : listeners_) {
+    listener.OnNewEntryAvailable();
+  }
+}
+
+Result<ConstByteSpan> MultiSink::Drain::GetEntry(ByteSpan buffer,
+                                                 uint32_t& drop_count_out) {
+  PW_DCHECK_NOTNULL(multisink_);
+  return multisink_->GetEntry(*this, buffer, drop_count_out);
 }
 
 }  // namespace multisink
diff --git a/pw_multisink/multisink_test.cc b/pw_multisink/multisink_test.cc
index 4e0f553..c3d2583 100644
--- a/pw_multisink/multisink_test.cc
+++ b/pw_multisink/multisink_test.cc
@@ -15,15 +15,29 @@
 #include "pw_multisink/multisink.h"
 
 #include "gtest/gtest.h"
-#include "pw_multisink/drain.h"
 
 namespace pw::multisink {
+using Drain = MultiSink::Drain;
+using Listener = MultiSink::Listener;
+
+class CountingListener : public Listener {
+ public:
+  void OnNewEntryAvailable() override { notification_count_++; }
+
+  size_t GetNotificationCount() { return notification_count_; }
+
+  void ResetNotificationCount() { notification_count_ = 0; }
+
+ private:
+  size_t notification_count_ = 0;
+};
 
 class MultiSinkTest : public ::testing::Test {
  protected:
   static constexpr std::byte kMessage[] = {
       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
   static constexpr size_t kMaxDrains = 3;
+  static constexpr size_t kMaxListeners = 3;
   static constexpr size_t kEntryBufferSize = 1024;
   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
 
@@ -46,45 +60,61 @@
     EXPECT_EQ(drop_count, expected_drop_count);
   }
 
+  void ExpectNotificationCount(CountingListener& listener,
+                               size_t expected_notification_count) {
+    EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
+    listener.ResetNotificationCount();
+  }
+
   std::byte buffer_[kBufferSize];
   std::byte entry_buffer_[kEntryBufferSize];
+  CountingListener listeners_[kMaxListeners];
   Drain drains_[kMaxDrains];
   MultiSink multisink_;
 };
 
 TEST_F(MultiSinkTest, SingleDrain) {
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[0]));
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachListener(listeners_[0]);
+  multisink_.HandleEntry(kMessage);
 
   // Single entry push and pop.
+  ExpectNotificationCount(listeners_[0], 1u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
 
   // Multiple entries with intermittent drops.
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
+  ExpectNotificationCount(listeners_[0], 3u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
 
   // Send drops only.
   multisink_.HandleDropped();
+  ExpectNotificationCount(listeners_[0], 1u);
   ExpectMessageAndDropCount(drains_[0], {}, 1u);
 
   // Confirm out-of-range if no entries are expected.
+  ExpectNotificationCount(listeners_[0], 0u);
   ExpectMessageAndDropCount(drains_[0], {}, 0u);
 }
 
 TEST_F(MultiSinkTest, MultipleDrain) {
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[0]));
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[1]));
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachDrain(drains_[1]);
+  multisink_.AttachListener(listeners_[0]);
+  multisink_.AttachListener(listeners_[1]);
 
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
+  multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
 
   // Drain one drain entirely.
+  ExpectNotificationCount(listeners_[0], 5u);
+  ExpectNotificationCount(listeners_[1], 5u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
@@ -92,6 +122,8 @@
   ExpectMessageAndDropCount(drains_[0], {}, 0u);
 
   // Confirm the other drain can be drained separately.
+  ExpectNotificationCount(listeners_[0], 0u);
+  ExpectNotificationCount(listeners_[1], 0u);
   ExpectMessageAndDropCount(drains_[1], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[1], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[1], kMessage, 1u);
@@ -99,38 +131,67 @@
   ExpectMessageAndDropCount(drains_[1], {}, 0u);
 }
 
-TEST_F(MultiSinkTest, LateRegistration) {
-  // Confirm that entries pushed before attaching a drain are not seen by the
-  // drain.
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+TEST_F(MultiSinkTest, LateDrainRegistration) {
+  // Confirm that entries pushed before attaching a drain or listener are not
+  // seen by either.
+  multisink_.HandleEntry(kMessage);
 
   // The drain does not observe 'drops' as it did not see entries, and only sees
   // the one entry that was added after attach.
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[0]));
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachListener(listeners_[0]);
+  ExpectNotificationCount(listeners_[0], 0u);
+
+  multisink_.HandleEntry(kMessage);
+  ExpectNotificationCount(listeners_[0], 1u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[0], {}, 0u);
 }
 
 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[0]));
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachListener(listeners_[0]);
 
   multisink_.HandleDropped();
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
   multisink_.HandleDropped();
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
 
   // Drain out one message and detach it.
+  ExpectNotificationCount(listeners_[0], 4u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
-  EXPECT_EQ(OkStatus(), multisink_.DetachDrain(drains_[0]));
+  multisink_.DetachDrain(drains_[0]);
+  multisink_.DetachListener(listeners_[0]);
 
   // Reattach the drain and confirm that you only see events after attaching.
-  EXPECT_EQ(OkStatus(), multisink_.AttachDrain(drains_[0]));
+  multisink_.AttachDrain(drains_[0]);
+  multisink_.AttachListener(listeners_[0]);
+  ExpectNotificationCount(listeners_[0], 0u);
   ExpectMessageAndDropCount(drains_[0], {}, 0u);
 
-  EXPECT_EQ(OkStatus(), multisink_.HandleEntry(kMessage));
+  multisink_.HandleEntry(kMessage);
+  ExpectNotificationCount(listeners_[0], 1u);
   ExpectMessageAndDropCount(drains_[0], kMessage, 0u);
   ExpectMessageAndDropCount(drains_[0], {}, 0u);
 }
 
+TEST_F(MultiSinkTest, TooSmallBuffer) {
+  multisink_.AttachDrain(drains_[0]);
+
+  // Insert an entry and a drop, then try to read into an insufficient buffer.
+  uint32_t drop_count = 0;
+  multisink_.HandleDropped();
+  multisink_.HandleEntry(kMessage);
+
+  // Attempting to acquire an entry should result in RESOURCE_EXHAUSTED.
+  Result<ConstByteSpan> result =
+      drains_[0].GetEntry(std::span(entry_buffer_, 1), drop_count);
+  EXPECT_EQ(result.status(), Status::ResourceExhausted());
+
+  // Verify that the multisink does not move the handled sequence ID counter
+  // forward and provides this data on the next call.
+  ExpectMessageAndDropCount(drains_[0], kMessage, 1u);
+  ExpectMessageAndDropCount(drains_[0], {}, 0u);
+}
+
 }  // namespace pw::multisink
diff --git a/pw_multisink/public/pw_multisink/drain.h b/pw_multisink/public/pw_multisink/drain.h
deleted file mode 100644
index 90aae11..0000000
--- a/pw_multisink/public/pw_multisink/drain.h
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright 2021 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include "pw_multisink/multisink.h"
-#include "pw_status/status.h"
-
-namespace pw {
-namespace multisink {
-
-// An asynchronous reader which is attached to a MultiSink via AttachDrain.
-// Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
-// entry sequence information for clients.
-class Drain {
- public:
-  constexpr Drain() : last_handled_sequence_id_(0), multisink_(nullptr) {}
-
-  // Returns the next available entry if it exists and acquires the latest drop
-  // count in parallel.
-  //
-  // The `drop_count_out` is set to the number of entries that were dropped
-  // since the last call to GetEntry, if the read operation was successful or
-  // indicated that no entries were available to read. If the read operation
-  // fails otherwise, the `drop_count_out` is set to zero.
-  //
-  // Drop counts are internally maintained with a 32-bit counter. If UINT32_MAX
-  // entries have been handled by the attached multisink between subsequent
-  // calls to GetEntry, the drop count will overflow and will report a lower
-  // count erroneously. Users should ensure that sinks call GetEntry
-  // at least once every UINT32_MAX entries.
-  //
-  // Return values:
-  // Ok - An entry was successfully read from the multisink. The drop_count_out
-  // is set to the count of entries that were dropped since the last call
-  // to GetEntry.
-  // FailedPrecondition - The drain must be attached to a sink.
-  // OutOfRange - No entries were available, the drop_count_out is set to the
-  // number of entries that were dropped since the last call to GetEntry.
-  // ResourceExhausted - The provided buffer was not large enough to store the
-  // next available entry.
-  // DataLoss - An entry was read from the multisink, but did not match the
-  // expected format (i.e. failed to decode).
-  // InvalidArgument - The drain is not currently associated with a multisink.
-  Result<ConstByteSpan> GetEntry(ByteSpan entry, uint32_t& drop_count_out);
-
- protected:
-  friend MultiSink;
-  ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
-  uint32_t last_handled_sequence_id_;
-  MultiSink* multisink_;
-};
-
-}  // namespace multisink
-}  // namespace pw
diff --git a/pw_multisink/public/pw_multisink/multisink.h b/pw_multisink/public/pw_multisink/multisink.h
index d86ad35..be8f585 100644
--- a/pw_multisink/public/pw_multisink/multisink.h
+++ b/pw_multisink/public/pw_multisink/multisink.h
@@ -24,7 +24,6 @@
 
 namespace pw {
 namespace multisink {
-class Drain;
 
 // An asynchronous single-writer multi-reader queue that ensures readers can
 // poll for dropped message counts, which is useful for logging or similar
@@ -32,11 +31,120 @@
 //
 // This class is thread-safe but NOT IRQ-safe when
 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
-//
-// TODO(pwbug/342): Support notifying readers when the queue is readable,
-// rather than requiring them to poll to check for new entries.
 class MultiSink {
  public:
+  // An asynchronous reader which is attached to a MultiSink via AttachDrain.
+  // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
+  // entry sequence information for clients.
+  class Drain {
+   public:
+    constexpr Drain() : last_handled_sequence_id_(0), multisink_(nullptr) {}
+
+    // Returns the next available entry if it exists and acquires the latest
+    // drop count in parallel.
+    //
+    // The `drop_count_out` is set to the number of entries that were dropped
+    // since the last call to GetEntry, if the read operation was successful or
+    // returned OutOfRange (i.e. no entries to read). Otherwise, it is set to
+    // zero, so should always be processed.
+    //
+    // Drop counts are internally maintained with a 32-bit counter. If
+    // UINT32_MAX entries have been handled by the attached multisink between
+    // subsequent calls to GetEntry, the drop count will overflow and will
+    // report a lower count erroneously. Users should ensure that sinks call
+    // GetEntry at least once every UINT32_MAX entries.
+    //
+    // Example Usage:
+    //
+    // void ProcessEntriesFromDrain(Drain& drain) {
+    //   std::array<std::byte, kEntryBufferSize> buffer;
+    //   uint32_t drop_count = 0;
+    //
+    //   // Example#1: Request the drain for a new entry.
+    //   {
+    //     const Result<ConstByteSpan> result = drain.GetEntry(buffer,
+    //                                                         drop_count);
+    //
+    //     // If a non-zero drop count is received, process them.
+    //     if (drop_count > 0) {
+    //       ProcessDropCount(drop_count);
+    //     }
+    //
+    //     // If the call was successful, process the entry that was received.
+    //     if (result.ok()) {
+    //       ProcessEntry(result.value());
+    //     }
+    //   }
+    //
+    //   // Example#2: Drain out all messages.
+    //   {
+    //     Result<ConstByteSpan> result = Status::OutOfRange();
+    //     do {
+    //       result = drain.GetEntry(buffer, drop_count);
+    //
+    //       if (drop_count > 0) {
+    //         ProcessDropCount(drop_count);
+    //       }
+    //
+    //       if (result.ok()) {
+    //         ProcessEntry(result.value());
+    //       }
+    //
+    //       // Keep trying until we hit OutOfRange. Note that a new entry may
+    //       // have arrived after the GetEntry call.
+    //     } while (!result.IsOutOfRange());
+    //   }
+    // }
+    //
+    // Return values:
+    // Ok - An entry was successfully read from the multisink.
+    // OutOfRange - No entries were available.
+    // FailedPrecondition - The drain must be attached to a sink.
+    // ResourceExhausted - The provided buffer was not large enough to store
+    // the next available entry.
+    // DataLoss - An entry was read but did not match the expected format.
+    Result<ConstByteSpan> GetEntry(ByteSpan buffer, uint32_t& drop_count_out)
+        PW_LOCKS_EXCLUDED(multisink_->lock_);
+
+    // Drains are not copyable or movable.
+    Drain(const Drain&) = delete;
+    Drain& operator=(const Drain&) = delete;
+    Drain(Drain&&) = delete;
+    Drain& operator=(Drain&&) = delete;
+
+   protected:
+    friend MultiSink;
+
+    // The `reader_` and `last_handled_sequence_id_` are managed by attached
+    // multisink and are guarded by `multisink_->lock_` when used.
+    ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
+    uint32_t last_handled_sequence_id_;
+    MultiSink* multisink_;
+  };
+
+  // A pure-virtual listener of a MultiSink, attached via AttachListener.
+  // MultiSink's invoke listeners when new data arrives, allowing them to
+  // schedule the draining of messages out of the MultiSink.
+  class Listener : public IntrusiveList<Listener>::Item {
+   public:
+    constexpr Listener() {}
+    virtual ~Listener() = default;
+
+    // Listeners are not copyable or movable.
+    Listener(const Listener&) = delete;
+    Listener& operator=(const Drain&) = delete;
+    Listener(Listener&&) = delete;
+    Listener& operator=(Drain&&) = delete;
+
+   protected:
+    friend MultiSink;
+
+    // Invoked by the attached multisink when a new entry or drop count is
+    // available. The multisink lock is held during this call, so neither the
+    // multisink nor it's drains can be used during this callback.
+    virtual void OnNewEntryAvailable() = 0;
+  };
+
   // Constructs a multisink using a ring buffer backed by the provided buffer.
   MultiSink(ByteSpan buffer) : ring_buffer_(true), sequence_id_(0) {
     ring_buffer_.SetBuffer(buffer);
@@ -50,51 +158,46 @@
   //
   // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
   // function must not be called from an interrupt context.
-  //
-  // Return values:
-  // Ok - Entry was successfully pushed to the ring buffer.
-  // InvalidArgument - Size of data to write is zero bytes.
-  // OutOfRange - Size of data is greater than buffer size.
-  // FailedPrecondition - Buffer was not initialized.
-  Status HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_) {
-    std::lock_guard lock(lock_);
-    return ring_buffer_.PushBack(entry, sequence_id_++);
-  }
+  // Precondition: entry.size() > 0
+  // Precondition: entry.size() <= `ring_buffer_` size
+  void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);
 
   // Notifies the multisink of messages dropped before ingress. The writer
   // may use this to signal to readers that an entry (or entries) failed
   // before being sent to the multisink (e.g. the writer failed to encode
   // the message). This API increments the sequence ID of the multisink by
   // the provided `drop_count`.
-  void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_) {
-    std::lock_guard lock(lock_);
-    sequence_id_ += drop_count;
-  }
+  void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);
 
   // Attach a drain to the multisink. Drains may not be associated with more
   // than one multisink at a time. Entries pushed before the drain was attached
   // are not seen by the drain, so drains should be attached before entries
   // are pushed.
   //
-  // Return values:
-  // Ok - Drain was successfully attached.
-  // InvalidArgument - Drain is currently associated with another multisink.
-  Status AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
+  // Precondition: The drain must not be attached to a multisink.
+  void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
 
   // Detaches a drain from the multisink. Drains may only be detached if they
   // were previously attached to this multisink.
   //
-  // Return values:
-  // Ok - Drain was successfully detached.
-  // InvalidArgument - Drain is not currently associated with this multisink.
-  Status DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
+  // Precondition: The drain must be attached to this multisink.
+  void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
+
+  // Attach a listener to the multisink. Entries pushed before the listener was
+  // attached are not seen by the listener, so listeners should be attached
+  // before entries are pushed. Listeners are invoked on all new messages.
+  //
+  // Precondition: The listener must not be attached to a multisink.
+  void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
+
+  // Detaches a listener from the multisink.
+  //
+  // Precondition: The listener must be attached to this multisink.
+  void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
 
   // Removes all data from the internal buffer. The multisink's sequence ID is
   // not modified, so readers may interpret this event as droppping entries.
-  void Clear() PW_LOCKS_EXCLUDED(lock_) {
-    std::lock_guard lock(lock_);
-    ring_buffer_.Clear();
-  }
+  void Clear() PW_LOCKS_EXCLUDED(lock_);
 
  protected:
   friend Drain;
@@ -103,19 +206,22 @@
   // calculation.
   //
   // Returns:
-  // Ok - An entry was successfully read from the multisink. The `sequence_id`
-  // is set to the ID encoded in the oldest entry.
-  // FailedPrecondition - The drain is not attached to a multisink.
-  // ResourceExhausted - The provided buffer was not large enough to store
-  // the next available entry.
-  // DataLoss - An entry was read from the multisink, but did not contains an
-  // encoded sequence ID.
+  // Ok - An entry was successfully read from the multisink. The
+  // `drop_count_out` is set to the difference between the current sequence ID
+  // and the last handled ID. FailedPrecondition - The drain is not attached to
+  // a multisink. ResourceExhausted - The provided buffer was not large enough
+  // to store the next available entry. DataLoss - An entry was read from the
+  // multisink, but did not contains an encoded sequence ID.
   Result<ConstByteSpan> GetEntry(Drain& drain,
                                  ByteSpan buffer,
-                                 uint32_t& sequence_id_out)
+                                 uint32_t& drop_count_out)
       PW_LOCKS_EXCLUDED(lock_);
 
  private:
+  // Notifies attached listeners of new entries or an updated drop count.
+  void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+
+  IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
   uint32_t sequence_id_ PW_GUARDED_BY(lock_);
   LockType lock_;