blob: d72afc953e3fe4af610d4652e8e6bc8c8f19b57a [file]
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef HIGHWAYHASH_DATA_PARALLEL_H_
#define HIGHWAYHASH_DATA_PARALLEL_H_
// Portable C++11 alternative to OpenMP for data-parallel computations:
// provides low-overhead ThreadPool, plus PerThread with support for reduction.
#include <stdio.h>
#include <algorithm> // find_if
#include <atomic>
#include <condition_variable> //NOLINT
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <mutex> //NOLINT
#include <thread> //NOLINT
#include <utility>
#include <vector>
#define DATA_PARALLEL_CHECK(condition) \
while (!(condition)) { \
printf("data_parallel check failed at line %d\n", __LINE__); \
abort(); \
}
namespace highwayhash {
// Highly scalable thread pool, especially suitable for data-parallel
// computations in the fork-join model, where clients need to know when all
// tasks have completed.
//
// Thread pools usually store small numbers of heterogeneous tasks in a queue.
// When tasks are identical or differ only by an integer input parameter, it is
// much faster to store just one function of an integer parameter and call it
// for each value.
//
// This thread pool can efficiently load-balance millions of tasks using an
// atomic counter, thus avoiding per-task syscalls. With 48 hyperthreads and
// 1M tasks that add to an atomic counter, overall runtime is 10-20x higher
// when using std::async, and up to 200x for a queue-based ThreadPool.
//
// Usage:
// ThreadPool pool;
// pool.Run(0, 1000000, [](const int i) { Func1(i); });
// // When Run returns, all of its tasks have finished.
//
// pool.RunTasks({Func2, Func3, Func4});
// // The destructor waits until all worker threads have exited cleanly.
class ThreadPool {
public:
// Starts the given number of worker threads and blocks until they are ready.
// "num_threads" defaults to one per hyperthread.
explicit ThreadPool(
const int num_threads = std::thread::hardware_concurrency())
: num_threads_(num_threads) {
DATA_PARALLEL_CHECK(num_threads_ > 0);
threads_.reserve(num_threads_);
for (int i = 0; i < num_threads_; ++i) {
threads_.emplace_back(ThreadFunc, this);
}
padding_[0] = 0; // avoid unused member warning.
WorkersReadyBarrier();
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator&(const ThreadPool&) = delete;
// Waits for all threads to exit.
~ThreadPool() {
StartWorkers(kWorkerExit);
for (std::thread& thread : threads_) {
thread.join();
}
}
// Runs func(i) on worker thread(s) for every i in [begin, end).
// Not thread-safe - no two calls to Run and RunTasks may overlap.
// Subsequent calls will reuse the same threads.
//
// Precondition: 0 <= begin <= end.
template <class Func>
void Run(const int begin, const int end, const Func& func) {
DATA_PARALLEL_CHECK(0 <= begin && begin <= end);
if (begin == end) {
return;
}
const WorkerCommand worker_command = (WorkerCommand(end) << 32) + begin;
// Ensure the inputs do not result in a reserved command.
DATA_PARALLEL_CHECK(worker_command != kWorkerWait);
DATA_PARALLEL_CHECK(worker_command != kWorkerExit);
// If Func is large (many captures), this will allocate memory, but it is
// still slower to use a std::ref wrapper.
task_ = func;
num_reserved_.store(0);
StartWorkers(worker_command);
WorkersReadyBarrier();
}
// Runs each task (closure, typically a lambda function) on worker thread(s).
// Not thread-safe - no two calls to Run and RunTasks may overlap.
// Subsequent calls will reuse the same threads.
//
// This is a more conventional interface for heterogeneous tasks that may be
// independent/unrelated.
void RunTasks(const std::vector<std::function<void(void)>>& tasks) {
Run(0, static_cast<int>(tasks.size()),
[&tasks](const int i) { tasks[i](); });
}
// Statically (and deterministically) splits [begin, end) into ranges and
// calls "func" for each of them. Useful when "func" involves some overhead
// (e.g. for PerThread::Get or random seeding) that should be amortized over
// a range of values. "func" is void(int chunk, uint32_t begin, uint32_t end).
template <class Func>
void RunRanges(const uint32_t begin, const uint32_t end, const Func& func) {
const uint32_t length = end - begin;
// Use constant rather than num_threads_ for machine-independent splitting.
const uint32_t chunk = std::max(1U, (length + 127) / 128);
std::vector<std::pair<uint32_t, uint32_t>> ranges; // begin/end
ranges.reserve(length / chunk + 1);
for (uint32_t i = 0; i < length; i += chunk) {
ranges.emplace_back(begin + i, begin + std::min(i + chunk, length));
}
Run(0, static_cast<int>(ranges.size()), [&ranges, func](const int i) {
func(i, ranges[i].first, ranges[i].second);
});
}
private:
// After construction and between calls to Run, workers are "ready", i.e.
// waiting on worker_start_cv_. They are "started" by sending a "command"
// and notifying all worker_start_cv_ waiters. (That is why all workers
// must be ready/waiting - otherwise, the notification will not reach all of
// them and the main thread waits in vain for them to report readiness.)
using WorkerCommand = uint64_t;
// Special values; all others encode the begin/end parameters.
static constexpr WorkerCommand kWorkerWait = 0;
static constexpr WorkerCommand kWorkerExit = ~0ULL;
void WorkersReadyBarrier() {
std::unique_lock<std::mutex> lock(mutex_);
workers_ready_cv_.wait(lock,
[this]() { return workers_ready_ == num_threads_; });
workers_ready_ = 0;
}
// Precondition: all workers are ready.
void StartWorkers(const WorkerCommand worker_command) {
std::unique_lock<std::mutex> lock(mutex_);
worker_start_command_ = worker_command;
// Workers will need this lock, so release it before they wake up.
lock.unlock();
worker_start_cv_.notify_all();
}
// Attempts to reserve and perform some work from the global range of tasks,
// which is encoded within "command". Returns after all tasks are reserved.
static void RunRange(ThreadPool* self, const WorkerCommand command) {
const int begin = command & 0xFFFFFFFF;
const int end = command >> 32;
const int num_tasks = end - begin;
// OpenMP introduced several "schedule" strategies:
// "single" (static assignment of exactly one chunk per thread): slower.
// "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
// "guided" (allocates k tasks, decreases k): computing k = remaining/n
// is faster than halving k each iteration. We prefer this strategy
// because it avoids user-specified parameters.
for (;;) {
const int num_reserved = self->num_reserved_.load();
const int num_remaining = num_tasks - num_reserved;
const int my_size = std::max(num_remaining / (self->num_threads_ * 2), 1);
const int my_begin = begin + self->num_reserved_.fetch_add(my_size);
const int my_end = std::min(my_begin + my_size, begin + num_tasks);
// Another thread already reserved the last task.
if (my_begin >= my_end) {
break;
}
for (int i = my_begin; i < my_end; ++i) {
self->task_(i);
}
}
}
static void ThreadFunc(ThreadPool* self) {
// Until kWorkerExit command received:
for (;;) {
std::unique_lock<std::mutex> lock(self->mutex_);
// Notify main thread that this thread is ready.
if (++self->workers_ready_ == self->num_threads_) {
self->workers_ready_cv_.notify_one();
}
RESUME_WAIT:
// Wait for a command.
self->worker_start_cv_.wait(lock);
const WorkerCommand command = self->worker_start_command_;
switch (command) {
case kWorkerWait: // spurious wakeup:
goto RESUME_WAIT; // lock still held, avoid incrementing ready.
case kWorkerExit:
return; // exits thread
}
lock.unlock();
RunRange(self, command);
}
}
const int num_threads_;
// Unmodified after ctor, but cannot be const because we call thread::join().
std::vector<std::thread> threads_;
std::mutex mutex_; // guards both cv and their variables.
std::condition_variable workers_ready_cv_;
int workers_ready_ = 0;
std::condition_variable worker_start_cv_;
WorkerCommand worker_start_command_;
// Written by main thread, read by workers (after mutex lock/unlock).
std::function<void(int)> task_;
// Updated by workers; alignment/padding avoids false sharing.
alignas(64) std::atomic<int> num_reserved_{0};
int padding_[15];
};
// Thread-local storage with support for reduction (combining into one result).
// The "T" type must be unique to the call site because the list of threads'
// copies is a static member. (With knowledge of the underlying threads, we
// could eliminate this list and T allocations, but that is difficult to
// arrange and we prefer this to be usable independently of ThreadPool.)
//
// Usage:
// for (int i = 0; i < N; ++i) {
// // in each thread:
// T& my_copy = PerThread<T>::Get();
// my_copy.Modify();
//
// // single-threaded:
// T& combined = PerThread<T>::Reduce();
// Use(combined);
// PerThread<T>::Destroy();
// }
//
// T is duck-typed and implements the following interface:
//
// // Returns true if T is default-initialized or Destroy was called without
// // any subsequent re-initialization.
// bool IsNull() const;
//
// // Releases any resources. Postcondition: IsNull() == true.
// void Destroy();
//
// // Merges in data from "victim". Precondition: !IsNull() && !victim.IsNull().
// void Assimilate(const T& victim);
template <class T>
class PerThread {
public:
// Returns reference to this thread's T instance (dynamically allocated,
// so its address is unique). Callers are responsible for any initialization
// beyond the default ctor.
static T& Get() {
static thread_local T* t;
if (t == nullptr) {
t = new T;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
Threads().push_back(t);
}
return *t;
}
// Returns vector of all per-thread T. Used inside Reduce() or by clients
// that require direct access to T instead of Assimilating them.
// Function wrapper avoids separate static member variable definition.
static std::vector<T*>& Threads() {
static std::vector<T*> threads;
return threads;
}
// Returns the first non-null T after assimilating all other threads' T
// into it. Precondition: at least one non-null T exists (caller must have
// called Get() and initialized the result).
static T& Reduce() {
std::vector<T*>& threads = Threads();
// Find first non-null T
const auto it = std::find_if(threads.begin(), threads.end(),
[](const T* t) { return !t->IsNull(); });
if (it == threads.end()) {
abort();
}
T* const first = *it;
for (const T* t : threads) {
if (t != first && !t->IsNull()) {
first->Assimilate(*t);
}
}
return *first;
}
// Calls each thread's T::Destroy to release resources and/or prepare for
// reuse by the same threads/ThreadPool. Note that all T remain allocated
// (we need thread-independent pointers for iterating over each thread's T,
// and deleting them would leave dangling pointers in each thread, which is
// unacceptable because the same thread may call Get() again later.)
static void Destroy() {
for (T* t : Threads()) {
t->Destroy();
}
}
};
} // namespace highwayhash
#endif // HIGHWAYHASH_DATA_PARALLEL_H_