| // Copyright 2022 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| #include "pw_sync/condition_variable.h" |
| |
| #include <chrono> |
| #include <functional> |
| |
| #include "gtest/gtest.h" |
| #include "pw_containers/vector.h" |
| #include "pw_sync/mutex.h" |
| #include "pw_sync/timed_thread_notification.h" |
| #include "pw_thread/sleep.h" |
| #include "pw_thread/test_threads.h" |
| #include "pw_thread/thread.h" |
| |
| namespace pw::sync { |
| namespace { |
| |
| using namespace std::chrono_literals; |
| |
| // A timeout for tests where successful behaviour involves waiting. |
| constexpr auto kRequiredTimeout = 100ms; |
| |
| // Maximum extra wait time allowed for test that ensure something waits for |
| // `kRequiredTimeout`. |
| const auto kAllowedSlack = kRequiredTimeout * 1.5; |
| |
| // A timeout that should only be hit if something goes wrong. |
| constexpr auto kFailureTimeout = 5s; |
| |
| using StateLock = std::unique_lock<Mutex>; |
| |
| struct ThreadInfo { |
| explicit ThreadInfo(int id) : thread_id(id) {} |
| |
| // waiting_notifier is signalled in predicates to indicate that the predicate |
| // has been evaluated. This guarantees (via insider information) that the |
| // thread will acquire the internal ThreadNotification. |
| TimedThreadNotification waiting_notifier; |
| |
| // Signals when the worker thread is done. |
| TimedThreadNotification done_notifier; |
| |
| // The result of the predicate the worker thread uses with wait*(). Set from |
| // the main test thread and read by the worker thread. |
| bool predicate_result = false; |
| |
| // Stores the result of ConditionVariable::wait_for() or ::wait_until() for |
| // use in test asserts. |
| bool wait_result = false; |
| |
| // For use in recording the order in which threads block on a condition. |
| const int thread_id; |
| |
| // Returns a function which will return the current value of |
| //`predicate_result` and release `waiting_notifier`. |
| std::function<bool()> Predicate() { |
| return [this]() { |
| bool result = this->predicate_result; |
| this->waiting_notifier.release(); |
| return result; |
| }; |
| } |
| }; |
| |
| // A `ThreadCore` implementation that delegates to an `std::function`. |
| class LambdaThreadCore : public pw::thread::ThreadCore { |
| public: |
| explicit LambdaThreadCore(std::function<void()> work) |
| : work_(std::move(work)) {} |
| |
| private: |
| void Run() override { work_(); } |
| |
| std::function<void()> work_; |
| }; |
| |
| class LambdaThread { |
| public: |
| // Starts a new thread which runs `work`, joining the thread on destruction. |
| explicit LambdaThread( |
| std::function<void()> work, |
| pw::thread::Options options = pw::thread::test::TestOptionsThread0()) |
| : thread_core_(std::move(work)), thread_(options, thread_core_) {} |
| ~LambdaThread() { thread_.join(); } |
| LambdaThread(const LambdaThread&) = delete; |
| LambdaThread(LambdaThread&&) = delete; |
| LambdaThread& operator=(const LambdaThread&) = delete; |
| LambdaThread&& operator=(LambdaThread&&) = delete; |
| |
| private: |
| LambdaThreadCore thread_core_; |
| pw::thread::Thread thread_; |
| }; |
| |
| TEST(Wait, PredicateTrueNoWait) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| condvar.wait(l, [] { return true; }); |
| |
| info.done_notifier.release(); |
| }); |
| EXPECT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| } |
| |
| TEST(NotifyOne, BlocksUntilSignaled) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| condvar.wait(l, info.Predicate()); |
| info.done_notifier.release(); |
| }); |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| { |
| StateLock l{mutex}; |
| thread_info.predicate_result = true; |
| } |
| condvar.notify_one(); |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| } |
| |
| TEST(NotifyOne, UnblocksOne) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)}; |
| pw::Vector<int, 2> wait_order; |
| |
| LambdaThread thread_1( |
| [&mutex, &condvar, &info = thread_info[0], &wait_order] { |
| StateLock l{mutex}; |
| auto predicate = [&info, &wait_order] { |
| wait_order.push_back(info.thread_id); |
| auto result = info.predicate_result; |
| info.waiting_notifier.release(); |
| return result; |
| }; |
| condvar.wait(l, predicate); |
| info.done_notifier.release(); |
| }, |
| pw::thread::test::TestOptionsThread0()); |
| LambdaThread thread_2( |
| [&mutex, &condvar, &info = thread_info[1], &wait_order] { |
| StateLock l{mutex}; |
| auto predicate = [&info, &wait_order] { |
| wait_order.push_back(info.thread_id); |
| auto result = info.predicate_result; |
| info.waiting_notifier.release(); |
| return result; |
| }; |
| condvar.wait(l, predicate); |
| info.done_notifier.release(); |
| }, |
| pw::thread::test::TestOptionsThread1()); |
| |
| ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout)); |
| |
| { |
| StateLock l{mutex}; |
| thread_info[1].predicate_result = true; |
| thread_info[0].predicate_result = true; |
| } |
| condvar.notify_one(); |
| ASSERT_TRUE(thread_info[wait_order[0]].done_notifier.try_acquire_for( |
| kFailureTimeout)); |
| ASSERT_FALSE(thread_info[wait_order[0]].done_notifier.try_acquire()); |
| condvar.notify_one(); |
| ASSERT_TRUE(thread_info[wait_order[1]].done_notifier.try_acquire_for( |
| kFailureTimeout)); |
| } |
| |
| TEST(NotifyAll, UnblocksMultiple) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)}; |
| |
| LambdaThread thread_1( |
| [&mutex, &condvar, &info = thread_info[0]] { |
| StateLock l{mutex}; |
| condvar.wait(l, info.Predicate()); |
| info.done_notifier.release(); |
| }, |
| pw::thread::test::TestOptionsThread0()); |
| LambdaThread thread_2( |
| [&mutex, &condvar, &info = thread_info[1]] { |
| StateLock l{mutex}; |
| condvar.wait(l, info.Predicate()); |
| info.done_notifier.release(); |
| }, |
| pw::thread::test::TestOptionsThread1()); |
| |
| ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout)); |
| { |
| StateLock l{mutex}; |
| thread_info[0].predicate_result = true; |
| thread_info[1].predicate_result = true; |
| } |
| condvar.notify_all(); |
| ASSERT_TRUE(thread_info[0].done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info[1].done_notifier.try_acquire_for(kFailureTimeout)); |
| } |
| |
| TEST(WaitFor, ReturnsTrueIfSignalled) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| info.wait_result = condvar.wait_for(l, kFailureTimeout, info.Predicate()); |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| { |
| StateLock l{mutex}; |
| thread_info.predicate_result = true; |
| } |
| condvar.notify_one(); |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info.wait_result); |
| } |
| |
| TEST(WaitFor, ReturnsFalseIfTimesOut) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| info.wait_result = condvar.wait_for(l, 0ms, info.Predicate()); |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_FALSE(thread_info.wait_result); |
| } |
| |
| // NOTE: This test waits even in successful circumstances. |
| TEST(WaitFor, TimeoutApproximatelyCorrect) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| pw::chrono::SystemClock::duration wait_duration{}; |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] { |
| StateLock l{mutex}; |
| auto start = pw::chrono::SystemClock::now(); |
| info.wait_result = condvar.wait_for(l, kRequiredTimeout, info.Predicate()); |
| wait_duration = pw::chrono::SystemClock::now() - start; |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| // Wake up thread multiple times. Make sure the timeout is observed. |
| for (int i = 0; i < 5; ++i) { |
| condvar.notify_one(); |
| pw::this_thread::sleep_for(kRequiredTimeout / 6); |
| } |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| EXPECT_FALSE(thread_info.wait_result); |
| EXPECT_GE(wait_duration, kRequiredTimeout); |
| EXPECT_LT(wait_duration, (kRequiredTimeout + kAllowedSlack)); |
| } |
| |
| TEST(WaitUntil, ReturnsTrueIfSignalled) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| info.wait_result = condvar.wait_until( |
| l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| { |
| StateLock l{mutex}; |
| thread_info.predicate_result = true; |
| } |
| condvar.notify_one(); |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info.wait_result); |
| } |
| |
| // NOTE: This test waits even in successful circumstances. |
| TEST(WaitUntil, ReturnsFalseIfTimesOut) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info] { |
| StateLock l{mutex}; |
| info.wait_result = condvar.wait_until( |
| l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_FALSE(thread_info.wait_result); |
| } |
| |
| // NOTE: This test waits even in successful circumstances. |
| TEST(WaitUntil, TimeoutApproximatelyCorrect) { |
| Mutex mutex; |
| ConditionVariable condvar; |
| ThreadInfo thread_info(0); |
| pw::chrono::SystemClock::duration wait_duration{}; |
| |
| LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] { |
| StateLock l{mutex}; |
| auto start = pw::chrono::SystemClock::now(); |
| info.wait_result = condvar.wait_until( |
| l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); |
| wait_duration = pw::chrono::SystemClock::now() - start; |
| info.done_notifier.release(); |
| }); |
| |
| ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); |
| // Wake up thread multiple times. Make sure the timeout is observed. |
| for (int i = 0; i < 5; ++i) { |
| condvar.notify_one(); |
| pw::this_thread::sleep_for(kRequiredTimeout / 6); |
| } |
| ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); |
| ASSERT_FALSE(thread_info.wait_result); |
| ASSERT_GE(wait_duration, kRequiredTimeout); |
| ASSERT_LE(wait_duration, kRequiredTimeout + kAllowedSlack); |
| } |
| |
| } // namespace |
| } // namespace pw::sync |