blob: 489eb3d59490dadba8dc8e63d9942fa12212946b [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <array>
#include <cstdint>
#include "pw_containers/inline_queue.h"
#include "pw_function/function.h"
#include "pw_metric/metric.h"
#include "pw_span/span.h"
#include "pw_status/status.h"
#include "pw_sync/interrupt_spin_lock.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/thread_core.h"
namespace pw::work_queue {
using WorkItem = Function<void()>;
/// Enables threads and interrupts to enqueue work as a
/// `pw::work_queue::WorkItem` for execution by the work queue.
///
/// **Queue sizing**: The number of outstanding work requests is limited
/// based on the internal queue size. The queue size is set through either
/// the size of the `queue_storage` buffer passed into the constructor or by
/// using the templated `pw::work_queue::WorkQueueWithBuffer` helper. When the
/// queue is full, the queue will not accept further work.
///
/// **Cooperative thread cancellation**: The class is a
/// `pw::thread::ThreadCore`, meaning it should be executed as a single thread.
/// To facilitate clean shutdown, it provides a `RequestStop()` method for
/// cooperative cancellation which should be invoked before joining the thread.
/// Once a stop has been requested the queue will no longer accept further work.
///
/// The entire API is thread-safe and interrupt-safe.
class WorkQueue : public thread::ThreadCore {
public:
/// @param[in] queue The work entries to enqueue.
///
/// @param[in] queue_capacity The internal queue size which limits the number
/// of outstanding work requests.
///
/// @note The `ThreadNotification` prevents this from being `constexpr`.
WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity)
: stop_requested_(false), queue_(queue) {
min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity));
}
/// Enqueues a `work_item` for execution by the work queue thread.
///
/// @param[in] work_item The entry to enqueue.
///
/// @returns @rst
///
/// .. pw-status-codes::
///
/// OK: Success. Entry was enqueued for execution.
///
/// FAILED_PRECONDITION: The work queue is shutting down. Entries are no
/// longer permitted.
///
/// RESOURCE_EXHAUSTED: Internal work queue is full. Entry was not
/// enqueued.
///
/// @endrst
Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
return InternalPushWork(std::move(work_item));
}
/// Queues work for execution. Crashes if the work cannot be queued due to a
/// full queue or a stopped worker thread.
///
/// This call is recommended where possible since it saves error handling code
/// at the callsite; and in many practical cases, it is a bug if the work
/// queue is full (and so a crash is useful to detect the problem).
///
/// @param[in] work_item The entry to enqueue.
///
/// @pre
/// * The queue must not overflow, i.e. be full.
/// * The queue must not have been requested to stop, i.e. it must
/// not be in the process of shutting down.
void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_);
/// Locks the queue to prevent further work enqueing, finishes outstanding
/// work, then shuts down the worker thread.
///
/// The `WorkQueue` cannot be resumed after stopping because the `ThreadCore`
/// thread returns and may be joined. The `WorkQueue` must be reconstructed
/// for re-use after the thread has been joined.
void RequestStop() PW_LOCKS_EXCLUDED(lock_);
private:
void Run() override PW_LOCKS_EXCLUDED(lock_);
Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_);
sync::InterruptSpinLock lock_;
bool stop_requested_ PW_GUARDED_BY(lock_);
InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
sync::ThreadNotification work_notification_;
// TODO(ewout): The group and/or its name token should be passed as a ctor
// arg instead. Depending on the approach here the group should be exposed
// While doing this evaluate whether perhaps we should instead construct
// TypedMetric<uint32_t>s directly, avoiding the macro usage given the
// min_queue_remaining_ initial value requires dependency injection.
// And lastly when the restructure is finalized add unit tests to ensure these
// metrics work as intended.
PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
};
template <size_t kWorkQueueEntries>
class WorkQueueWithBuffer : public WorkQueue {
public:
constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {}
private:
InlineQueue<WorkItem, kWorkQueueEntries> queue_;
};
} // namespace pw::work_queue