blob: 01523e7835f804c7302343ccd810407e1a086984 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <algorithm>
#include <mutex>
#include <optional>
#include <type_traits>
#include "pw_function/function.h"
#include "pw_sync/binary_semaphore.h"
#include "pw_sync/interrupt_spin_lock.h"
#include "pw_work_queue/work_queue.h"
namespace am {
namespace internal {
template <typename Event>
class PubSubBase {
public:
using SubscribeCallback = pw::Function<void(Event)>;
using SubscribeToken = size_t;
template <
typename = std::enable_if_t<std::is_trivially_copyable_v<Event> &&
std::is_trivially_destructible_v<Event>>>
PubSubBase(pw::work_queue::WorkQueue& work_queue,
pw::span<SubscribeCallback> subscribers)
: work_queue_(&work_queue),
subscribers_(subscribers),
subscriber_count_(0) {
event_lock_.release();
}
/// Pushes an event to the event queue, blocking until the event is
/// accepted. This is **NOT** interrupt safe.
void Publish(Event event) {
// Wait for the current event to be finished processing.
event_lock_.acquire();
PublishLocked(event);
}
/// Attempts to push an event to the event queue, returning whether it was
/// successfully published.
/// The `PubSub` takes ownership of the event.
bool TryPublish(Event event) {
if (!event_lock_.try_acquire()) {
return false;
}
PublishLocked(event);
return true;
}
/// Registers a callback to be run when events are received.
/// If registration was successful, returns a token which can be used to
/// unsubscribe.
///
/// All subscribed callbacks are invoked from the context of the work queue
/// provided to the constructor, while a lock is held.
/// As a result, certain operations such as publishing to the work queue are
/// disallowed, and generally, callbacks should avoid long blocking operations
/// to not starve other callbacks or work queue tasks.
std::optional<SubscribeToken> Subscribe(SubscribeCallback&& callback) {
std::lock_guard lock(subscriber_lock_);
auto slot = std::find_if(subscribers_.begin(),
subscribers_.end(),
[](auto& s) { return s == nullptr; });
if (slot == subscribers_.end()) {
return std::nullopt;
}
*slot = std::move(callback);
subscriber_count_++;
return SubscribeToken(slot - subscribers_.begin());
}
/// Unregisters a previously registered subscriber.
bool Unsubscribe(SubscribeToken token) {
std::lock_guard lock(subscriber_lock_);
const size_t index = static_cast<size_t>(token);
if (index >= subscribers_.size()) {
return false;
}
subscribers_[index] = nullptr;
subscriber_count_--;
return true;
}
constexpr size_t max_subscribers() const { return subscribers_.size(); }
constexpr size_t subscriber_count() const { return subscriber_count_; }
private:
void PublishLocked(Event event) {
queued_event_ = event;
work_queue_->PushWork([this]() { NotifySubscribers(); });
}
void NotifySubscribers() {
subscriber_lock_.lock();
for (auto& callback : subscribers_) {
if (callback != nullptr) {
callback(queued_event_);
}
}
subscriber_lock_.unlock();
// Allow new events to be processed.
event_lock_.release();
}
pw::work_queue::WorkQueue* work_queue_;
// Lock protecting the event queue. Uses a semaphore as it is acquired and
// released by different threads.
pw::sync::BinarySemaphore event_lock_;
Event queued_event_;
pw::sync::InterruptSpinLock subscriber_lock_;
pw::span<SubscribeCallback> subscribers_;
size_t subscriber_count_;
};
} // namespace internal
// TODO
struct Event {};
template <size_t kMaxSubscribers>
class PubSub : public internal::PubSubBase<Event> {
public:
using PubSubBase::SubscribeCallback;
using PubSubBase::SubscribeToken;
constexpr PubSub(pw::work_queue::WorkQueue& work_queue)
: PubSubBase(work_queue, subscribers_) {}
private:
std::array<SubscribeCallback, kMaxSubscribers> subscribers_;
};
} // namespace am