Skip to content

Commit

Permalink
db_stress to add an option to periodically change background thread p…
Browse files Browse the repository at this point in the history
…ool size.

Summary: Add an option to indicates the variation of background threads. If the flag is not 0, for every 100 milliseconds, adjust thread pool size to a value within the range.

Test Plan: run db_stress

Reviewers: haobo, dhruba, igor, ljin

Reviewed By: ljin

Subscribers: leveldb, nkg-

Differential Revision: https://reviews.facebook.net/D18759
  • Loading branch information
siying committed Jun 2, 2014
1 parent 388d205 commit 593bb2c
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions tools/db_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ DEFINE_int32(max_background_compactions,
"The maximum number of concurrent background compactions "
"that can occur in parallel.");

DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
"The interval (in milliseconds) to adjust compaction thread pool "
"size. Don't change it periodically if the value is 0.");

DEFINE_int32(compaction_thread_pool_varations, 2,
"Range of bakground thread pool size variations when adjusted "
"periodically.");

DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
"The maximum number of concurrent background flushes "
"that can occur in parallel.");
Expand Down Expand Up @@ -567,6 +575,8 @@ class SharedState {
num_done_(0),
start_(false),
start_verify_(false),
should_stop_bg_thread_(false),
bg_thread_finished_(false),
stress_test_(stress_test),
verification_failure_(false) {
if (FLAGS_test_batches_snapshots) {
Expand Down Expand Up @@ -694,6 +704,14 @@ class SharedState {

uint32_t GetSeed() const { return seed_; }

void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }

bool ShoudStopBgThread() { return should_stop_bg_thread_; }

void SetBgThreadFinish() { bg_thread_finished_ = true; }

bool BgThreadFinished() const { return bg_thread_finished_; }

private:
port::Mutex mu_;
port::CondVar cv_;
Expand All @@ -707,6 +725,8 @@ class SharedState {
long num_done_;
bool start_;
bool start_verify_;
bool should_stop_bg_thread_;
bool bg_thread_finished_;
StressTest* stress_test_;
std::atomic<bool> verification_failure_;

Expand Down Expand Up @@ -777,6 +797,11 @@ class StressTest {
threads[i] = new ThreadState(i, &shared);
FLAGS_env->StartThread(ThreadBody, threads[i]);
}
ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
}

// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
Expand Down Expand Up @@ -829,6 +854,14 @@ class StressTest {
}
PrintStatistics();

if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread();
while (!shared.BgThreadFinished()) {
shared.GetCondVar()->Wait();
}
}

if (shared.HasVerificationFailedYet()) {
printf("Verification failed :(\n");
return false;
Expand Down Expand Up @@ -879,6 +912,38 @@ class StressTest {

}

static void PoolSizeChangeThread(void* v) {
assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;

while (true) {
{
MutexLock l(shared->GetMutex());
if (shared->ShoudStopBgThread()) {
shared->SetBgThreadFinish();
shared->GetCondVar()->SignalAll();
return;
}
}

auto thread_pool_size_base = FLAGS_max_background_compactions;
auto thread_pool_size_var = FLAGS_compaction_thread_pool_varations;
int new_thread_pool_size =
thread_pool_size_base - thread_pool_size_var +
thread->rand.Next() % (thread_pool_size_var * 2 + 1);
if (new_thread_pool_size < 1) {
new_thread_pool_size = 1;
}
FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
// Sleep up to 3 seconds
FLAGS_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1000 +
1);
}
}

// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer MultiGet.
Expand Down

0 comments on commit 593bb2c

Please sign in to comment.