Multithreaded tests are reenabled
diff --git a/include/benchmark/benchmark.h b/include/benchmark/benchmark.h
index 4a23b4f..b2d34ad 100644
--- a/include/benchmark/benchmark.h
+++ b/include/benchmark/benchmark.h
@@ -129,6 +129,7 @@
// Teardown code here.
}
}
+BENCHMARK(BM_MultiThreaded)->Threads(4);
*/
#ifndef BENCHMARK_BENCHMARK_H_
@@ -137,6 +138,7 @@
#include <stdint.h>
#include <functional>
+#include <memory>
#include <string>
#include <vector>
@@ -226,6 +228,7 @@
private:
class FastClock;
struct SharedState;
+ struct ThreadStats;
State(FastClock* clock, SharedState* s, int t);
bool StartRunning();
@@ -234,7 +237,10 @@
void NewInterval();
bool AllStarting();
+ static void* RunWrapper(void* arg);
void Run();
+ void RunAsThread();
+ void Wait();
enum EState {
STATE_INITIAL, // KeepRunning hasn't been called
@@ -242,7 +248,9 @@
STATE_RUNNING, // Running and being timed
STATE_STOPPING, // Not being timed but waiting for other threads
STATE_STOPPED, // Stopped
- } state_;
+ };
+
+ EState state_;
FastClock* clock_;
@@ -250,6 +258,8 @@
// BenchmarkInstance
SharedState* shared_;
+ pthread_t thread_;
+
// Custom label set by the user.
std::string label_;
@@ -274,6 +284,8 @@
// True if the current interval is the continuation of a previous one.
bool is_continuation_;
+ std::unique_ptr<ThreadStats> stats_;
+
friend class internal::Benchmark;
DISALLOW_COPY_AND_ASSIGN(State);
};
@@ -345,8 +357,7 @@
// of some piece of code.
// Run one instance of this benchmark concurrently in t threads.
- // TODO(dominic): Allow multithreaded benchmarks
- //Benchmark* Threads(int t);
+ Benchmark* Threads(int t);
// Pick a set of values T from [min_threads,max_threads].
// min_threads and max_threads are always included in T. Run this
@@ -360,10 +371,10 @@
// Foo in 4 threads
// Foo in 8 threads
// Foo in 16 threads
- // Benchmark* ThreadRange(int min_threads, int max_threads);
+ Benchmark* ThreadRange(int min_threads, int max_threads);
// Equivalent to ThreadRange(NumCPUs(), NumCPUs())
- //Benchmark* ThreadPerCpu();
+ Benchmark* ThreadPerCpu();
// TODO(dominic): Control whether or not real-time is used for this benchmark
@@ -372,7 +383,6 @@
// Used inside the benchmark implementation
struct Instance;
- struct ThreadStats;
// Extract the list of benchmark instances that match the specified
// regular expression.
diff --git a/include/benchmark/macros.h b/include/benchmark/macros.h
index ac3d963..7d4aefb 100644
--- a/include/benchmark/macros.h
+++ b/include/benchmark/macros.h
@@ -36,6 +36,7 @@
#define CHECK(b) do { if (!(b)) assert(false); } while(0)
#define CHECK_EQ(a, b) CHECK((a) == (b))
+#define CHECK_NE(a, b) CHECK((a) != (b))
#define CHECK_GE(a, b) CHECK((a) >= (b))
#define CHECK_LE(a, b) CHECK((a) <= (b))
#define CHECK_GT(a, b) CHECK((a) > (b))
diff --git a/src/benchmark.cc b/src/benchmark.cc
index 068dc29..4ad386c 100644
--- a/src/benchmark.cc
+++ b/src/benchmark.cc
@@ -174,34 +174,6 @@
return ToBinaryStringFullySpecified(n, 1.1, 1);
}
-} // end namespace
-
-namespace internal {
-struct Benchmark::ThreadStats {
- int64_t bytes_processed;
- int64_t items_processed;
-
- ThreadStats() { Reset(); }
-
- void Reset() {
- bytes_processed = 0;
- items_processed = 0;
- }
-
- void Add(const ThreadStats& other) {
- bytes_processed += other.bytes_processed;
- items_processed += other.items_processed;
- }
-};
-
-} // end namespace internal
-
-namespace {
-
-// Per-thread stats
-pthread_key_t thread_stats_key;
-internal::Benchmark::ThreadStats* thread_stats = nullptr;
-
// For non-dense Range, intermediate values are powers of kRangeMultiplier.
static const int kRangeMultiplier = 8;
@@ -210,6 +182,9 @@
static pthread_mutex_t benchmark_mutex;
static std::vector<internal::Benchmark*>* families = NULL;
+pthread_mutex_t starting_mutex;
+pthread_cond_t starting_cv;
+
bool running_benchmark = false;
// Should this benchmark report memory usage?
@@ -222,10 +197,6 @@
// Overhead of an empty benchmark.
double overhead = 0.0;
-void DeleteThreadStats(void* p) {
- delete (internal::Benchmark::ThreadStats*) p;
-}
-
// Return prefix to print in front of each reported line
const char* Prefix() {
#ifdef NDEBUG
@@ -534,9 +505,24 @@
DISALLOW_COPY_AND_ASSIGN(FastClock);
};
-namespace internal {
+struct State::ThreadStats {
+ int64_t bytes_processed;
+ int64_t items_processed;
-const int Benchmark::kNumCpuMarker;
+ ThreadStats() { Reset(); }
+
+ void Reset() {
+ bytes_processed = 0;
+ items_processed = 0;
+ }
+
+ void Add(const ThreadStats& other) {
+ bytes_processed += other.bytes_processed;
+ items_processed += other.items_processed;
+ }
+};
+
+namespace internal {
// Information kept per benchmark we may want to run
struct Benchmark::Instance {
@@ -563,12 +549,13 @@
int starting; // Number of threads that have entered STARTING state
int stopping; // Number of threads that have entered STOPPING state
int threads; // Number of total threads that are running concurrently
- internal::Benchmark::ThreadStats stats;
+ ThreadStats stats;
std::vector<internal::BenchmarkRunData> runs; // accumulated runs
std::string label;
- SharedState(const internal::Benchmark::Instance* b, int t)
- : instance(b), starting(0), stopping(0), threads(t) {
+ explicit SharedState(const internal::Benchmark::Instance* b)
+ : instance(b), starting(0), stopping(0),
+ threads(b == nullptr ? 1 : b->threads) {
pthread_mutex_init(&mu, nullptr);
}
@@ -647,7 +634,7 @@
custom_arguments(this);
return this;
}
-/*
+
Benchmark* Benchmark::Threads(int t) {
CHECK_GT(t, 0);
mutex_lock l(&benchmark_mutex);
@@ -666,10 +653,10 @@
Benchmark* Benchmark::ThreadPerCpu() {
mutex_lock l(&benchmark_mutex);
- thread_counts_.push_back(kNumCpuMarker);
+ thread_counts_.push_back(NumCPUs());
return this;
}
-*/
+
void Benchmark::AddRange(std::vector<int>* dst, int lo, int hi, int mult) {
CHECK_GE(lo, 0);
CHECK_GE(hi, lo);
@@ -697,12 +684,10 @@
std::vector<Benchmark::Instance> instances;
const bool is_multithreaded = (!thread_counts_.empty());
- const std::vector<int>* thread_counts =
- (is_multithreaded ? &thread_counts_ : &one_thread);
- for (size_t t = 0; t < thread_counts->size(); ++t) {
- int num_threads = (*thread_counts)[t];
- if (num_threads == kNumCpuMarker)
- num_threads = NumCPUs();
+ const std::vector<int>& thread_counts =
+ (is_multithreaded ? thread_counts_ : one_thread);
+ for (size_t t = 0; t < thread_counts.size(); ++t) {
+ int num_threads = thread_counts[t];
Instance instance;
instance.name = name_;
@@ -785,7 +770,7 @@
void Benchmark::MeasureOverhead() {
State::FastClock clock(State::FastClock::CPU_TIME);
- State::SharedState state(NULL, 1);
+ State::SharedState state(nullptr);
State runner(&clock, &state, 0);
while (runner.KeepRunning()) {}
overhead = state.runs[0].real_accumulated_time /
@@ -802,28 +787,22 @@
State::FastClock clock(State::FastClock::CPU_TIME);
// Initialize the test runners.
- State::SharedState state(&b, b.threads);
+ State::SharedState state(&b);
{
std::unique_ptr<State> runners[b.threads];
- // TODO: create thread objects
for (int i = 0; i < b.threads; ++i)
runners[i].reset(new State(&clock, &state, i));
// Run them all.
for (int i = 0; i < b.threads; ++i) {
- State* r = runners[i].release();
- if (b.multithreaded()) {
- // TODO: start pthreads (member of state?) and set up thread local
- // pointers to stats
- //pool->Add(base::NewCallback(r, &State::Run));
- } else {
- pthread_setspecific(thread_stats_key, thread_stats);
- r->Run();
- }
+ if (b.multithreaded())
+ runners[i]->RunAsThread();
+ else
+ runners[i]->Run();
}
if (b.multithreaded()) {
- // TODO: join all the threads
- //pool->JoinAll();
+ for (int i = 0; i < b.threads; ++i)
+ runners[i]->Wait();
}
}
/*
@@ -849,7 +828,6 @@
for (internal::BenchmarkRunData& report : state.runs) {
double seconds = (use_real_time ? report.real_accumulated_time :
report.cpu_accumulated_time);
- // TODO: add the thread index here?
report.benchmark_name = b.name;
report.report_label = state.label;
report.bytes_per_second = state.stats.bytes_processed / seconds;
@@ -890,7 +868,7 @@
}
} // end namespace internal
-
+
State::State(FastClock* clock, SharedState* s, int t)
: thread_index(t),
state_(STATE_INITIAL),
@@ -905,9 +883,12 @@
total_iterations_(0),
interval_micros_(
static_cast<int64_t>(kNumMicrosPerSecond * FLAGS_benchmark_min_time /
- FLAGS_benchmark_repetitions)) {
+ FLAGS_benchmark_repetitions)),
+ is_continuation_(false),
+ stats_(new ThreadStats()) {
CHECK(clock != nullptr);
CHECK(s != nullptr);
+
}
bool State::KeepRunning() {
@@ -941,17 +922,13 @@
void State::SetBytesProcessed(int64_t bytes) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
- internal::Benchmark::ThreadStats* thread_stats =
- (internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
- thread_stats->bytes_processed = bytes;
+ stats_->bytes_processed = bytes;
}
void State::SetItemsProcessed(int64_t items) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
- internal::Benchmark::ThreadStats* thread_stats =
- (internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
- thread_stats->items_processed = items;
+ stats_->items_processed = items;
}
void State::SetLabel(const std::string& label) {
@@ -980,6 +957,7 @@
}
bool State::StartRunning() {
+ bool last_thread = false;
{
mutex_lock l(&shared_->mu);
CHECK_EQ(state_, STATE_INITIAL);
@@ -987,29 +965,40 @@
is_continuation_ = false;
CHECK_LT(shared_->starting, shared_->threads);
++shared_->starting;
- if (shared_->starting == shared_->threads) {
- // Last thread to start.
- clock_->InitType(
- use_real_time ? FastClock::REAL_TIME : FastClock::CPU_TIME);
- } else {
- // Wait for others.
- // TODO(dominic): semaphore!
- // while (pthread_getsemaphore(shared_->starting_sem_) !=
- // shared_->threads) { }
- //shared_->mu.Await(base::Condition(this, &State::AllStarting));
- }
- CHECK_EQ(state_, STATE_STARTING);
- state_ = STATE_RUNNING;
+#ifdef DEBUG
+ std::cout << "[" << thread_index << "] "
+ << shared_->starting << "/" << shared_->threads << " starting\n";
+#endif
+ last_thread = shared_->starting == shared_->threads;
}
+
+ if (last_thread) {
+ clock_->InitType(
+ use_real_time ? FastClock::REAL_TIME : FastClock::CPU_TIME);
+#ifdef DEBUG
+ std::cout << "[" << thread_index << "] unlocking\n";
+#endif
+ {
+ mutex_lock l(&starting_mutex);
+ pthread_cond_broadcast(&starting_cv);
+ }
+ } else {
+#ifdef DEBUG
+ std::cout << "[" << thread_index << "] waiting\n";
+#endif
+ mutex_lock l(&starting_mutex);
+ pthread_cond_wait(&starting_cv, &starting_mutex);
+#ifdef DEBUG
+ std::cout << "[" << thread_index << "] unlocked\n";
+#endif
+ }
+ CHECK_EQ(state_, STATE_STARTING);
+ state_ = STATE_RUNNING;
+
NewInterval();
return true;
}
-bool State::AllStarting() {
- CHECK_LE(shared_->starting, shared_->threads);
- return shared_->starting == shared_->threads;
-}
-
void State::NewInterval() {
stop_time_micros_ = clock_->NowMicros() + interval_micros_;
if (!is_continuation_) {
@@ -1107,16 +1096,30 @@
}
void State::Run() {
- internal::Benchmark::ThreadStats* thread_stats =
- (internal::Benchmark::ThreadStats*) pthread_getspecific(thread_stats_key);
- thread_stats->Reset();
+ stats_->Reset();
shared_->instance->bm->function_(*this);
{
mutex_lock l(&shared_->mu);
- shared_->stats.Add(*thread_stats);
+ shared_->stats.Add(*stats_);
}
}
+void State::RunAsThread() {
+ CHECK_EQ(0, pthread_create(&thread_, nullptr, &State::RunWrapper, this));
+}
+
+void State::Wait() {
+ CHECK_EQ(0, pthread_join(thread_, nullptr));
+}
+
+// static
+void* State::RunWrapper(void* arg) {
+ State* that = (State*)arg;
+ CHECK(that != nullptr);
+ that->Run();
+ return nullptr;
+}
+
namespace internal {
void RunMatchingBenchmarks(const std::string& spec,
@@ -1185,13 +1188,16 @@
spec = "."; // Regexp that matches all benchmarks
internal::ConsoleReporter default_reporter;
internal::RunMatchingBenchmarks(spec, &default_reporter);
+ pthread_cond_destroy(&starting_cv);
+ pthread_mutex_destroy(&starting_mutex);
+ pthread_mutex_destroy(&benchmark_mutex);
}
void Initialize(int* argc, const char** argv) {
//AtomicOps_Internalx86CPUFeaturesInit();
pthread_mutex_init(&benchmark_mutex, nullptr);
- pthread_key_create(&thread_stats_key, DeleteThreadStats);
- thread_stats = new internal::Benchmark::ThreadStats();
+ pthread_mutex_init(&starting_mutex, nullptr);
+ pthread_cond_init(&starting_cv, nullptr);
walltime::Initialize();
internal::ParseCommandLineFlags(argc, argv);
internal::Benchmark::MeasureOverhead();
diff --git a/test/benchmark_test.cc b/test/benchmark_test.cc
index 16864e4..74994d6 100644
--- a/test/benchmark_test.cc
+++ b/test/benchmark_test.cc
@@ -3,6 +3,7 @@
#include <math.h>
#include <stdint.h>
+#include <iostream>
#include <limits>
#include <list>
#include <map>
@@ -33,7 +34,8 @@
return s;
}
-static std::vector<int>* test_vector = NULL;
+pthread_mutex_t test_vector_mu;
+std::vector<int>* test_vector = nullptr;
} // end namespace
@@ -57,7 +59,7 @@
state.SetLabel(ss.str());
}
BENCHMARK_RANGE(BM_CalculatePiRange, 1, 1024 * 1024);
-/*
+
static void BM_CalculatePi(benchmark::State& state) {
static const int depth = 1024;
double pi ATTRIBUTE_UNUSED = 0.0;
@@ -68,7 +70,7 @@
BENCHMARK(BM_CalculatePi)->Threads(8);
BENCHMARK(BM_CalculatePi)->ThreadRange(1, 32);
BENCHMARK(BM_CalculatePi)->ThreadPerCpu();
-*/
+
static void BM_SetInsert(benchmark::State& state) {
while (state.KeepRunning()) {
state.PauseTiming();
@@ -107,16 +109,27 @@
BENCHMARK(BM_StringCompare)->Range(1, 1<<20);
static void BM_SetupTeardown(benchmark::State& state) {
- if (state.thread_index == 0)
+ if (state.thread_index == 0) {
+ pthread_mutex_init(&test_vector_mu, nullptr);
+ // No need to lock test_vector_mu here as this is running single-threaded.
test_vector = new std::vector<int>();
- while (state.KeepRunning())
- test_vector->push_back(0);
+ }
+ int i = 0;
+ while (state.KeepRunning()) {
+ pthread_mutex_lock(&test_vector_mu);
+ if (i%2 == 0)
+ test_vector->push_back(i);
+ else
+ test_vector->pop_back();
+ pthread_mutex_unlock(&test_vector_mu);
+ ++i;
+ }
if (state.thread_index == 0) {
delete test_vector;
- test_vector = NULL;
+ pthread_mutex_destroy(&test_vector_mu);
}
}
-BENCHMARK(BM_SetupTeardown);
+BENCHMARK(BM_SetupTeardown)->ThreadPerCpu();
static void BM_LongTest(benchmark::State& state) {
double tracker = 0.0;