blob: 035eeeba417ada01877fee57aa45bce1c0f0cfa2 [file] [log] [blame]
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_async2/dispatcher_base.h"
#include <iterator>
#include <mutex>
#include "pw_assert/check.h"
#include "pw_async2/internal/config.h"
#include "pw_async2/waker.h"
#include "pw_log/tokenized_args.h"
#define PW_LOG_MODULE_NAME PW_ASYNC2_CONFIG_LOG_MODULE_NAME
#define PW_LOG_LEVEL PW_ASYNC2_CONFIG_LOG_LEVEL
#include "pw_log/log.h"
namespace pw::async2 {
void NativeDispatcherBase::Deregister() {
std::lock_guard lock(impl::dispatcher_lock());
UnpostTaskList(woken_);
UnpostTaskList(sleeping_);
}
void NativeDispatcherBase::Post(Task& task) {
bool wake_dispatcher = false;
{
std::lock_guard lock(impl::dispatcher_lock());
PW_DASSERT(task.state_ == Task::State::kUnposted);
PW_DASSERT(task.dispatcher_ == nullptr);
task.state_ = Task::State::kWoken;
task.dispatcher_ = this;
woken_.push_back(task);
if (wants_wake_) {
wake_dispatcher = true;
wants_wake_ = false;
}
}
// Note: unlike in ``WakeTask``, here we know that the ``Dispatcher`` will
// not be destroyed out from under our feet because we're in a method being
// called on the ``Dispatcher`` by a user.
if (wake_dispatcher) {
Wake();
}
}
NativeDispatcherBase::SleepInfo NativeDispatcherBase::AttemptRequestWake(
bool allow_empty) {
std::lock_guard lock(impl::dispatcher_lock());
// Don't allow sleeping if there are already tasks waiting to be run.
if (!woken_.empty()) {
PW_LOG_DEBUG("Dispatcher will not sleep due to nonempty task queue");
return SleepInfo::DontSleep();
}
if (!allow_empty && sleeping_.empty()) {
PW_LOG_DEBUG("Dispatcher will not sleep due to empty sleep queue");
return SleepInfo::DontSleep();
}
/// Indicate that the ``Dispatcher`` is sleeping and will need a ``DoWake``
/// call once more work can be done.
wants_wake_ = true;
sleep_count_.Increment();
// Once timers are added, this should check them.
return SleepInfo::Indefinitely();
}
NativeDispatcherBase::RunOneTaskResult NativeDispatcherBase::RunOneTask(
Dispatcher& dispatcher, Task* task_to_look_for) {
std::lock_guard task_lock(task_execution_lock_);
Task* task;
{
std::lock_guard lock(impl::dispatcher_lock());
task = PopWokenTask();
if (task == nullptr) {
PW_LOG_DEBUG("Dispatcher has no woken tasks to run");
bool all_complete = woken_.empty() && sleeping_.empty();
return RunOneTaskResult(
/*completed_all_tasks=*/all_complete,
/*completed_main_task=*/false,
/*ran_a_task=*/false);
}
task->state_ = Task::State::kRunning;
}
bool complete;
bool requires_waker;
{
Waker waker(*task);
Context context(dispatcher, waker);
tasks_polled_.Increment();
complete = task->Pend(context).IsReady();
requires_waker = context.requires_waker_;
}
if (complete) {
tasks_completed_.Increment();
bool all_complete;
{
std::lock_guard lock(impl::dispatcher_lock());
switch (task->state_) {
case Task::State::kUnposted:
case Task::State::kSleeping:
PW_DASSERT(false);
PW_UNREACHABLE;
case Task::State::kRunning:
break;
case Task::State::kWoken:
RemoveWokenTaskLocked(*task);
break;
}
task->state_ = Task::State::kUnposted;
task->dispatcher_ = nullptr;
task->RemoveAllWakersLocked();
all_complete = woken_.empty() && sleeping_.empty();
}
task->DoDestroy();
return RunOneTaskResult(
/*completed_all_tasks=*/all_complete,
/*completed_main_task=*/task == task_to_look_for,
/*ran_a_task=*/true);
}
std::lock_guard lock(impl::dispatcher_lock());
if (task->state_ == Task::State::kRunning) {
if (task->name_ != log::kDefaultToken) {
PW_LOG_DEBUG(
"Dispatcher adding task " PW_LOG_TOKEN_FMT() ":%p to sleep queue",
task->name_,
static_cast<const void*>(task));
} else {
PW_LOG_DEBUG("Dispatcher adding task (anonymous):%p to sleep queue",
static_cast<const void*>(task));
}
if (requires_waker) {
PW_CHECK(!task->wakers_.empty(),
"Task %p returned Pending() without registering a waker",
static_cast<const void*>(task));
task->state_ = Task::State::kSleeping;
sleeping_.push_front(*task);
} else {
// Require the task to be manually re-posted.
task->state_ = Task::State::kUnposted;
task->dispatcher_ = nullptr;
}
}
return RunOneTaskResult(
/*completed_all_tasks=*/false,
/*completed_main_task=*/false,
/*ran_a_task=*/true);
}
void NativeDispatcherBase::UnpostTaskList(IntrusiveList<Task>& list) {
while (!list.empty()) {
Task& task = list.front();
task.state_ = Task::State::kUnposted;
task.dispatcher_ = nullptr;
task.RemoveAllWakersLocked();
list.pop_front();
}
}
void NativeDispatcherBase::RemoveWokenTaskLocked(Task& task) {
woken_.remove(task);
}
void NativeDispatcherBase::RemoveSleepingTaskLocked(Task& task) {
sleeping_.remove(task);
}
void NativeDispatcherBase::WakeTask(Task& task) {
if (task.name_ != log::kDefaultToken) {
PW_LOG_DEBUG("Dispatcher waking task " PW_LOG_TOKEN_FMT() ":%p",
task.name_,
static_cast<const void*>(&task));
} else {
PW_LOG_DEBUG("Dispatcher waking task (anonymous):%p",
static_cast<const void*>(&task));
}
switch (task.state_) {
case Task::State::kWoken:
// Do nothing-- this has already been woken.
return;
case Task::State::kUnposted:
// This should be unreachable.
PW_CHECK(false);
case Task::State::kRunning:
// Wake again to indicate that this task should be run once more,
// as the state of the world may have changed since the task
// started running.
break;
case Task::State::kSleeping:
RemoveSleepingTaskLocked(task);
// Wake away!
break;
}
task.state_ = Task::State::kWoken;
woken_.push_back(task);
if (wants_wake_) {
// Note: it's quite annoying to make this call under the lock, as it can
// result in extra thread wakeup/sleep cycles.
//
// However, releasing the lock first would allow for the possibility that
// the ``Dispatcher`` has been destroyed, making the call invalid.
Wake();
}
}
Task* NativeDispatcherBase::PopWokenTask() {
if (woken_.empty()) {
return nullptr;
}
Task& task = woken_.front();
woken_.pop_front();
return &task;
}
void NativeDispatcherBase::LogRegisteredTasks() {
PW_LOG_INFO("pw::async2::Dispatcher");
std::lock_guard lock(impl::dispatcher_lock());
PW_LOG_INFO("Woken tasks:");
for (const Task& task : woken_) {
if (task.name_ != log::kDefaultToken) {
PW_LOG_INFO(" - " PW_LOG_TOKEN_FMT() ":%p",
task.name_,
static_cast<const void*>(&task));
} else {
PW_LOG_INFO(" - (anonymous):%p", static_cast<const void*>(&task));
}
}
PW_LOG_INFO("Sleeping tasks:");
for (const Task& task : sleeping_) {
int waker_count = static_cast<int>(
std::distance(task.wakers_.begin(), task.wakers_.end()));
if (task.name_ != log::kDefaultToken) {
PW_LOG_INFO(" - " PW_LOG_TOKEN_FMT() ":%p (%d wakers)",
task.name_,
static_cast<const void*>(&task),
waker_count);
} else {
PW_LOG_INFO(" - (anonymous):%p (%d wakers)",
static_cast<const void*>(&task),
waker_count);
}
#if PW_ASYNC2_DEBUG_WAIT_REASON
LogTaskWakers(task);
#endif // PW_ASYNC2_DEBUG_WAIT_REASON
}
}
#if PW_ASYNC2_DEBUG_WAIT_REASON
void NativeDispatcherBase::LogTaskWakers(const Task& task) {
int i = 0;
for (const Waker& waker : task.wakers_) {
i++;
if (waker.wait_reason_ != log::kDefaultToken) {
PW_LOG_INFO(" * Waker %d: " PW_LOG_TOKEN_FMT("pw_async2"),
i,
waker.wait_reason_);
}
}
}
#endif // PW_ASYNC2_DEBUG_WAIT_REASON
} // namespace pw::async2