modules/pubsub: Use queue of events
This updates the pubsub system to have a queue of N events instead of
just one, and removes the blocking Publish operation in favor of a
fallible, non-blocking API.
Change-Id: I0f79b2218fa52b6a568a230455e9fd2e22419dec
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/showcase/rp2/+/220779
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Lint: Lint 🤖 <android-build-ayeaye@system.gserviceaccount.com>
diff --git a/modules/pubsub/BUILD.bazel b/modules/pubsub/BUILD.bazel
index d23c891..147dfea 100644
--- a/modules/pubsub/BUILD.bazel
+++ b/modules/pubsub/BUILD.bazel
@@ -23,9 +23,10 @@
name = "pubsub",
hdrs = ["pubsub.h"],
deps = [
+ "@pigweed//pw_containers:inline_deque",
"@pigweed//pw_function",
- "@pigweed//pw_sync:binary_semaphore",
"@pigweed//pw_sync:interrupt_spin_lock",
+ "@pigweed//pw_sync:lock_annotations",
"@pigweed//pw_work_queue",
],
)
diff --git a/modules/pubsub/pubsub.h b/modules/pubsub/pubsub.h
index 01523e7..d4aa104 100644
--- a/modules/pubsub/pubsub.h
+++ b/modules/pubsub/pubsub.h
@@ -18,9 +18,10 @@
#include <optional>
#include <type_traits>
+#include "pw_containers/inline_deque.h"
#include "pw_function/function.h"
-#include "pw_sync/binary_semaphore.h"
#include "pw_sync/interrupt_spin_lock.h"
+#include "pw_sync/lock_annotations.h"
#include "pw_work_queue/work_queue.h"
namespace am {
@@ -36,31 +37,22 @@
typename = std::enable_if_t<std::is_trivially_copyable_v<Event> &&
std::is_trivially_destructible_v<Event>>>
PubSubBase(pw::work_queue::WorkQueue& work_queue,
+ pw::InlineDeque<Event>& event_queue,
pw::span<SubscribeCallback> subscribers)
: work_queue_(&work_queue),
+ event_queue_(&event_queue),
subscribers_(subscribers),
- subscriber_count_(0) {
- event_lock_.release();
- }
-
- /// Pushes an event to the event queue, blocking until the event is
- /// accepted. This is **NOT** interrupt safe.
- void Publish(Event event) {
- // Wait for the current event to be finished processing.
- event_lock_.acquire();
- PublishLocked(event);
- }
+ subscriber_count_(0) {}
/// Attempts to push an event to the event queue, returning whether it was
- /// successfully published.
- /// The `PubSub` takes ownership of the event.
- bool TryPublish(Event event) {
- if (!event_lock_.try_acquire()) {
- return false;
+ /// successfully published. This is both thread safe and interrupt safe.
+ bool Publish(Event event) {
+ if (event_lock_.try_lock()) {
+ bool result = PublishLocked(event);
+ event_lock_.unlock();
+ return result;
}
-
- PublishLocked(event);
- return true;
+ return false;
}
/// Registers a callback to be run when events are received.
@@ -68,12 +60,10 @@
/// unsubscribe.
///
/// All subscribed callbacks are invoked from the context of the work queue
- /// provided to the constructor, while a lock is held.
- /// As a result, certain operations such as publishing to the work queue are
- /// disallowed, and generally, callbacks should avoid long blocking operations
- /// to not starve other callbacks or work queue tasks.
+ /// provided to the constructor. Callbacks should avoid long blocking
+ /// operations to not starve other callbacks or work queue tasks.
std::optional<SubscribeToken> Subscribe(SubscribeCallback&& callback) {
- std::lock_guard lock(subscriber_lock_);
+ std::lock_guard lock(subscribers_lock_);
auto slot = std::find_if(subscribers_.begin(),
subscribers_.end(),
@@ -89,7 +79,7 @@
/// Unregisters a previously registered subscriber.
bool Unsubscribe(SubscribeToken token) {
- std::lock_guard lock(subscriber_lock_);
+ std::lock_guard lock(subscribers_lock_);
const size_t index = static_cast<size_t>(token);
if (index >= subscribers_.size()) {
@@ -101,37 +91,52 @@
return true;
}
- constexpr size_t max_subscribers() const { return subscribers_.size(); }
+ constexpr size_t max_subscribers() const PW_NO_LOCK_SAFETY_ANALYSIS {
+ return subscribers_.size();
+ }
constexpr size_t subscriber_count() const { return subscriber_count_; }
private:
- void PublishLocked(Event event) {
- queued_event_ = event;
+ bool PublishLocked(Event event) PW_EXCLUSIVE_LOCKS_REQUIRED(event_lock_) {
+ if (event_queue_->full()) {
+ return false;
+ }
+
+ event_queue_->push_back(event);
work_queue_->PushWork([this]() { NotifySubscribers(); });
+ return true;
}
void NotifySubscribers() {
- subscriber_lock_.lock();
+ event_lock_.lock();
+
+ if (event_queue_->empty()) {
+ event_lock_.unlock();
+ return;
+ }
+
+ // Copy the event out of the queue so that the lock does not have to be held
+ // while running subscriber callbacks.
+ Event event = event_queue_->front();
+ event_queue_->pop_front();
+ event_lock_.unlock();
+
+ subscribers_lock_.lock();
for (auto& callback : subscribers_) {
if (callback != nullptr) {
- callback(queued_event_);
+ callback(event);
}
}
- subscriber_lock_.unlock();
-
- // Allow new events to be processed.
- event_lock_.release();
+ subscribers_lock_.unlock();
}
pw::work_queue::WorkQueue* work_queue_;
- // Lock protecting the event queue. Uses a semaphore as it is acquired and
- // released by different threads.
- pw::sync::BinarySemaphore event_lock_;
- Event queued_event_;
+ pw::sync::InterruptSpinLock event_lock_;
+ pw::InlineDeque<Event>* event_queue_ PW_GUARDED_BY(event_lock_);
- pw::sync::InterruptSpinLock subscriber_lock_;
- pw::span<SubscribeCallback> subscribers_;
+ pw::sync::InterruptSpinLock subscribers_lock_;
+ pw::span<SubscribeCallback> subscribers_ PW_GUARDED_BY(subscribers_lock_);
size_t subscriber_count_;
};
@@ -140,16 +145,17 @@
// TODO
struct Event {};
-template <size_t kMaxSubscribers>
+template <size_t kMaxEvents, size_t kMaxSubscribers>
class PubSub : public internal::PubSubBase<Event> {
public:
using PubSubBase::SubscribeCallback;
using PubSubBase::SubscribeToken;
constexpr PubSub(pw::work_queue::WorkQueue& work_queue)
- : PubSubBase(work_queue, subscribers_) {}
+ : PubSubBase(work_queue, event_queue_, subscribers_) {}
private:
+ pw::InlineDeque<Event, kMaxEvents> event_queue_;
std::array<SubscribeCallback, kMaxSubscribers> subscribers_;
};
diff --git a/modules/pubsub/pubsub_test.cc b/modules/pubsub/pubsub_test.cc
index 1cf867b..92c4b55 100644
--- a/modules/pubsub/pubsub_test.cc
+++ b/modules/pubsub/pubsub_test.cc
@@ -34,27 +34,30 @@
void TearDown() { StopWorkQueue(); }
protected:
+ pw::InlineDeque<TestEvent, 4> event_queue_;
std::array<PubSub::SubscribeCallback, 4> subscribers_buffer_;
int result_ = 0;
+ int events_processed_ = 0;
pw::sync::TimedThreadNotification notification_;
+ pw::sync::TimedThreadNotification work_queue_start_notification_;
};
TEST_F(PubSubTest, Publish_OneSubscriber) {
- PubSub pubsub(work_queue(), subscribers_buffer_);
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
pubsub.Subscribe([this](TestEvent event) {
result_ = event.value;
notification_.release();
});
- pubsub.Publish({.value = 42});
+ EXPECT_TRUE(pubsub.Publish({.value = 42}));
- notification_.try_acquire_for(500ms);
+ EXPECT_TRUE(notification_.try_acquire_for(50ms));
EXPECT_EQ(result_, 42);
}
TEST_F(PubSubTest, Publish_MultipleSubscribers) {
- PubSub pubsub(work_queue(), subscribers_buffer_);
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
for (size_t i = 0; i < subscribers_buffer_.size(); ++i) {
struct {
@@ -74,14 +77,75 @@
});
}
- pubsub.Publish({.value = 4});
+ EXPECT_TRUE(pubsub.Publish({.value = 4}));
- notification_.try_acquire_for(500ms);
+ EXPECT_TRUE(notification_.try_acquire_for(50ms));
EXPECT_EQ(result_, static_cast<int>(4 * subscribers_buffer_.size()));
}
+TEST_F(PubSubTest, Publish_MultipleEvents) {
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
+
+ pubsub.Subscribe([this](TestEvent event) {
+ result_ += event.value;
+ events_processed_++;
+
+ if (events_processed_ % 4 == 0) {
+ notification_.release();
+ }
+ });
+
+ EXPECT_TRUE(pubsub.Publish({.value = 1}));
+ EXPECT_TRUE(pubsub.Publish({.value = 2}));
+ EXPECT_TRUE(pubsub.Publish({.value = 3}));
+ EXPECT_TRUE(pubsub.Publish({.value = 4}));
+
+ EXPECT_TRUE(notification_.try_acquire_for(50ms));
+ EXPECT_EQ(result_, 10);
+ EXPECT_EQ(events_processed_, 4);
+
+ EXPECT_TRUE(pubsub.Publish({.value = 5}));
+ EXPECT_TRUE(pubsub.Publish({.value = 6}));
+ EXPECT_TRUE(pubsub.Publish({.value = 7}));
+ EXPECT_TRUE(pubsub.Publish({.value = 8}));
+
+ EXPECT_TRUE(notification_.try_acquire_for(50ms));
+ EXPECT_EQ(result_, 36);
+ EXPECT_EQ(events_processed_, 8);
+}
+
+TEST_F(PubSubTest, Publish_MultipleEvents_QueueFull) {
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
+
+ work_queue().PushWork([this]() {
+ // Block the work queue until all events are published.
+ PW_ASSERT(work_queue_start_notification_.try_acquire_for(1s));
+ });
+
+ pubsub.Subscribe([this](TestEvent event) {
+ result_ += event.value;
+ events_processed_++;
+
+ if (events_processed_ == 5) {
+ notification_.release();
+ }
+ });
+
+ EXPECT_TRUE(pubsub.Publish({.value = 10}));
+ EXPECT_TRUE(pubsub.Publish({.value = 11}));
+ EXPECT_TRUE(pubsub.Publish({.value = 12}));
+ EXPECT_TRUE(pubsub.Publish({.value = 13}));
+ EXPECT_FALSE(pubsub.Publish({.value = 14}));
+ work_queue_start_notification_.release();
+
+ // This should time out as the fifth event never gets sent.
+ EXPECT_FALSE(notification_.try_acquire_for(50ms));
+ EXPECT_EQ(events_processed_, 4);
+ EXPECT_EQ(result_, 46);
+}
+
TEST_F(PubSubTest, Subscribe_Full) {
- PubSub pubsub(work_queue(), subscribers_buffer_);
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
EXPECT_TRUE(pubsub.Subscribe([this](TestEvent) { notification_.release(); })
.has_value());
@@ -102,7 +166,7 @@
}
TEST_F(PubSubTest, Subscribe_Unsubscribe) {
- PubSub pubsub(work_queue(), subscribers_buffer_);
+ PubSub pubsub(work_queue(), event_queue_, subscribers_buffer_);
auto token1 =
pubsub.Subscribe([this](TestEvent) { notification_.release(); });