modules/pubsub: Use queue of events

This updates the pubsub system to have a queue of N events instead of
just one, and removes the blocking Publish operation in favor of a
fallible, non-blocking API.

Change-Id: I0f79b2218fa52b6a568a230455e9fd2e22419dec
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/showcase/rp2/+/220779
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Lint: Lint 🤖 <android-build-ayeaye@system.gserviceaccount.com>
diff --git a/modules/pubsub/BUILD.bazel b/modules/pubsub/BUILD.bazel
index d23c891..147dfea 100644
--- a/modules/pubsub/BUILD.bazel
+++ b/modules/pubsub/BUILD.bazel
@@ -23,9 +23,10 @@
     name = "pubsub",
     hdrs = ["pubsub.h"],
     deps = [
+        "@pigweed//pw_containers:inline_deque",
         "@pigweed//pw_function",
-        "@pigweed//pw_sync:binary_semaphore",
         "@pigweed//pw_sync:interrupt_spin_lock",
+        "@pigweed//pw_sync:lock_annotations",
         "@pigweed//pw_work_queue",
     ],
 )
diff --git a/modules/pubsub/pubsub.h b/modules/pubsub/pubsub.h
index 01523e7..d4aa104 100644
--- a/modules/pubsub/pubsub.h
+++ b/modules/pubsub/pubsub.h
@@ -18,9 +18,10 @@
 #include <optional>
 #include <type_traits>
 
+#include "pw_containers/inline_deque.h"
 #include "pw_function/function.h"
-#include "pw_sync/binary_semaphore.h"
 #include "pw_sync/interrupt_spin_lock.h"
+#include "pw_sync/lock_annotations.h"
 #include "pw_work_queue/work_queue.h"
 
 namespace am {
@@ -36,31 +37,22 @@
       typename = std::enable_if_t<std::is_trivially_copyable_v<Event> &&
                                   std::is_trivially_destructible_v<Event>>>
   PubSubBase(pw::work_queue::WorkQueue& work_queue,
+             pw::InlineDeque<Event>& event_queue,
              pw::span<SubscribeCallback> subscribers)
       : work_queue_(&work_queue),
+        event_queue_(&event_queue),
         subscribers_(subscribers),
-        subscriber_count_(0) {
-    event_lock_.release();
-  }
-
-  /// Pushes an event to the event queue, blocking until the event is
-  /// accepted. This is **NOT** interrupt safe.
-  void Publish(Event event) {
-    // Wait for the current event to be finished processing.
-    event_lock_.acquire();
-    PublishLocked(event);
-  }
+        subscriber_count_(0) {}
 
   /// Attempts to push an event to the event queue, returning whether it was
-  /// successfully published.
-  /// The `PubSub` takes ownership of the event.
-  bool TryPublish(Event event) {
-    if (!event_lock_.try_acquire()) {
-      return false;
+  /// successfully published. This is both thread safe and interrupt safe.
+  bool Publish(Event event) {
+    if (event_lock_.try_lock()) {
+      bool result = PublishLocked(event);
+      event_lock_.unlock();
+      return result;
     }
-
-    PublishLocked(event);
-    return true;
+    return false;
   }
 
   /// Registers a callback to be run when events are received.
@@ -68,12 +60,10 @@
   /// unsubscribe.
   ///
   /// All subscribed callbacks are invoked from the context of the work queue
-  /// provided to the constructor, while a lock is held.
-  /// As a result, certain operations such as publishing to the work queue are
-  /// disallowed, and generally, callbacks should avoid long blocking operations
-  /// to not starve other callbacks or work queue tasks.
+  /// provided to the constructor. Callbacks should avoid long blocking
+  /// operations to not starve other callbacks or work queue tasks.
   std::optional<SubscribeToken> Subscribe(SubscribeCallback&& callback) {
-    std::lock_guard lock(subscriber_lock_);
+    std::lock_guard lock(subscribers_lock_);
 
     auto slot = std::find_if(subscribers_.begin(),
                              subscribers_.end(),
@@ -89,7 +79,7 @@
 
   /// Unregisters a previously registered subscriber.
   bool Unsubscribe(SubscribeToken token) {
-    std::lock_guard lock(subscriber_lock_);
+    std::lock_guard lock(subscribers_lock_);
 
     const size_t index = static_cast<size_t>(token);
     if (index >= subscribers_.size()) {
@@ -101,37 +91,52 @@
     return true;
   }
 
-  constexpr size_t max_subscribers() const { return subscribers_.size(); }
+  constexpr size_t max_subscribers() const PW_NO_LOCK_SAFETY_ANALYSIS {
+    return subscribers_.size();
+  }
   constexpr size_t subscriber_count() const { return subscriber_count_; }
 
  private:
-  void PublishLocked(Event event) {
-    queued_event_ = event;
+  bool PublishLocked(Event event) PW_EXCLUSIVE_LOCKS_REQUIRED(event_lock_) {
+    if (event_queue_->full()) {
+      return false;
+    }
+
+    event_queue_->push_back(event);
     work_queue_->PushWork([this]() { NotifySubscribers(); });
+    return true;
   }
 
   void NotifySubscribers() {
-    subscriber_lock_.lock();
+    event_lock_.lock();
+
+    if (event_queue_->empty()) {
+      event_lock_.unlock();
+      return;
+    }
+
+    // Copy the event out of the queue so that the lock does not have to be held
+    // while running subscriber callbacks.
+    Event event = event_queue_->front();
+    event_queue_->pop_front();
+    event_lock_.unlock();
+
+    subscribers_lock_.lock();
     for (auto& callback : subscribers_) {
       if (callback != nullptr) {
-        callback(queued_event_);
+        callback(event);
       }
     }
-    subscriber_lock_.unlock();
-
-    // Allow new events to be processed.
-    event_lock_.release();
+    subscribers_lock_.unlock();
   }
 
   pw::work_queue::WorkQueue* work_queue_;
 
-  // Lock protecting the event queue. Uses a semaphore as it is acquired and
-  // released by different threads.
-  pw::sync::BinarySemaphore event_lock_;
-  Event queued_event_;
+  pw::sync::InterruptSpinLock event_lock_;
+  pw::InlineDeque<Event>* event_queue_ PW_GUARDED_BY(event_lock_);
 
-  pw::sync::InterruptSpinLock subscriber_lock_;
-  pw::span<SubscribeCallback> subscribers_;
+  pw::sync::InterruptSpinLock subscribers_lock_;
+  pw::span<SubscribeCallback> subscribers_ PW_GUARDED_BY(subscribers_lock_);
   size_t subscriber_count_;
 };
 
@@ -140,16 +145,17 @@
 // TODO
 struct Event {};
 
-template <size_t kMaxSubscribers>
+template <size_t kMaxEvents, size_t kMaxSubscribers>
 class PubSub : public internal::PubSubBase<Event> {
  public:
   using PubSubBase::SubscribeCallback;
   using PubSubBase::SubscribeToken;
 
   constexpr PubSub(pw::work_queue::WorkQueue& work_queue)
-      : PubSubBase(work_queue, subscribers_) {}
+      : PubSubBase(work_queue, event_queue_, subscribers_) {}
 
  private:
+  pw::InlineDeque<Event, kMaxEvents> event_queue_;
   std::array<SubscribeCallback, kMaxSubscribers> subscribers_;
 };
 
diff --git a/modules/pubsub/pubsub_test.cc b/modules/pubsub/pubsub_test.cc
index 1cf867b..92c4b55 100644
--- a/modules/pubsub/pubsub_test.cc
+++ b/modules/pubsub/pubsub_test.cc
@@ -34,27 +34,30 @@
   void TearDown() { StopWorkQueue(); }
 
  protected:
+  pw::InlineDeque<TestEvent, 4> event_queue_;
   std::array<PubSub::SubscribeCallback, 4> subscribers_buffer_;
   int result_ = 0;
+  int events_processed_ = 0;
   pw::sync::TimedThreadNotification notification_;
+  pw::sync::TimedThreadNotification work_queue_start_notification_;
 };
 
 TEST_F(PubSubTest, Publish_OneSubscriber) {
-  PubSub pubsub(work_queue(), subscribers_buffer_);
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
 
   pubsub.Subscribe([this](TestEvent event) {
     result_ = event.value;
     notification_.release();
   });
 
-  pubsub.Publish({.value = 42});
+  EXPECT_TRUE(pubsub.Publish({.value = 42}));
 
-  notification_.try_acquire_for(500ms);
+  EXPECT_TRUE(notification_.try_acquire_for(50ms));
   EXPECT_EQ(result_, 42);
 }
 
 TEST_F(PubSubTest, Publish_MultipleSubscribers) {
-  PubSub pubsub(work_queue(), subscribers_buffer_);
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
 
   for (size_t i = 0; i < subscribers_buffer_.size(); ++i) {
     struct {
@@ -74,14 +77,75 @@
     });
   }
 
-  pubsub.Publish({.value = 4});
+  EXPECT_TRUE(pubsub.Publish({.value = 4}));
 
-  notification_.try_acquire_for(500ms);
+  EXPECT_TRUE(notification_.try_acquire_for(50ms));
   EXPECT_EQ(result_, static_cast<int>(4 * subscribers_buffer_.size()));
 }
 
+TEST_F(PubSubTest, Publish_MultipleEvents) {
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
+
+  pubsub.Subscribe([this](TestEvent event) {
+    result_ += event.value;
+    events_processed_++;
+
+    if (events_processed_ % 4 == 0) {
+      notification_.release();
+    }
+  });
+
+  EXPECT_TRUE(pubsub.Publish({.value = 1}));
+  EXPECT_TRUE(pubsub.Publish({.value = 2}));
+  EXPECT_TRUE(pubsub.Publish({.value = 3}));
+  EXPECT_TRUE(pubsub.Publish({.value = 4}));
+
+  EXPECT_TRUE(notification_.try_acquire_for(50ms));
+  EXPECT_EQ(result_, 10);
+  EXPECT_EQ(events_processed_, 4);
+
+  EXPECT_TRUE(pubsub.Publish({.value = 5}));
+  EXPECT_TRUE(pubsub.Publish({.value = 6}));
+  EXPECT_TRUE(pubsub.Publish({.value = 7}));
+  EXPECT_TRUE(pubsub.Publish({.value = 8}));
+
+  EXPECT_TRUE(notification_.try_acquire_for(50ms));
+  EXPECT_EQ(result_, 36);
+  EXPECT_EQ(events_processed_, 8);
+}
+
+TEST_F(PubSubTest, Publish_MultipleEvents_QueueFull) {
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
+
+  work_queue().PushWork([this]() {
+    // Block the work queue until all events are published.
+    PW_ASSERT(work_queue_start_notification_.try_acquire_for(1s));
+  });
+
+  pubsub.Subscribe([this](TestEvent event) {
+    result_ += event.value;
+    events_processed_++;
+
+    if (events_processed_ == 5) {
+      notification_.release();
+    }
+  });
+
+  EXPECT_TRUE(pubsub.Publish({.value = 10}));
+  EXPECT_TRUE(pubsub.Publish({.value = 11}));
+  EXPECT_TRUE(pubsub.Publish({.value = 12}));
+  EXPECT_TRUE(pubsub.Publish({.value = 13}));
+  EXPECT_FALSE(pubsub.Publish({.value = 14}));
+  work_queue_start_notification_.release();
+
+  // This should time out as the fifth event never gets sent.
+  EXPECT_FALSE(notification_.try_acquire_for(50ms));
+  EXPECT_EQ(events_processed_, 4);
+  EXPECT_EQ(result_, 46);
+}
+
 TEST_F(PubSubTest, Subscribe_Full) {
-  PubSub pubsub(work_queue(), subscribers_buffer_);
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
 
   EXPECT_TRUE(pubsub.Subscribe([this](TestEvent) { notification_.release(); })
                   .has_value());
@@ -102,7 +166,7 @@
 }
 
 TEST_F(PubSubTest, Subscribe_Unsubscribe) {
-  PubSub pubsub(work_queue(), subscribers_buffer_);
+  PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
 
   auto token1 =
       pubsub.Subscribe([this](TestEvent) { notification_.release(); });