Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global throttling based on the number of log files #194

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions include/libjungle/db_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,53 @@ class GlobalConfig {
*/
CompactionThrottlingOptions ctOpt;

/**
* Settings for Log Throttling. When there is heavy incoming traffic
* and the flush speed cannot keep up with the incoming write speed,
* the number of log files increases. If the total number of log files
* across all open database instances exceeds `startNumLogs`, user threads
* calling the write API will be temporarily blocked for a certain period
* of time.
*
* Once the total number of log files reaches `limitNumLogs`, each write API
* call will be blocked for `maxSleepTimeMs`, and the background flushers
* will immediately flush the log files. The sleep time will not exceed
* `maxSleepTimeMs`.
*
* If `maxSleepTimeMs` is set to zero, this feature will be disabled.
*/
struct LogThrottlingOptions {
LogThrottlingOptions()
: startNumLogs(256)
, limitNumLogs(512)
, maxSleepTimeMs(0)
{}

/**
* The minimum number of log files initiates the throttling.
*/
uint32_t startNumLogs;

/**
* The number of log files that will cause user threads to
* sleep for `maxSleepTimeMs`.
*/
uint32_t limitNumLogs;

/**
* The maximum duration of sleep time.
*/
uint32_t maxSleepTimeMs;
};

/**
* Log throttling option.
*
* It can be used to limit the overall memory consumption of the
* process occupied by memory tables corresponding to each log file.
*/
LogThrottlingOptions ltOpt;

/**
* Shutdown system logger on shutdown of Jungle.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/db_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ DBMgr::DBMgr()
, twMgr(new TableWriterMgr())
, gbExecutor(new GlobalBatchExecutor())
, debugCbEnabled(false)
, globalTime(0)
, idleTraffic(false)
, globalThrottlingMs(0)
, myLog(nullptr)
{
updateGlobalTime();
Expand Down
15 changes: 15 additions & 0 deletions src/db_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ class DBMgr {

bool isDebugCallbackEffective() const { return debugCbEnabled.load(MOR); }

uint32_t setGlobalThrottling(uint32_t to) {
uint32_t prev = globalThrottlingMs.load(MOR);
globalThrottlingMs = to;
return prev;
}

uint32_t getGlobalThrottling() const {
return globalThrottlingMs.load(MOR);
}

private:
DBMgr();

Expand Down Expand Up @@ -194,6 +204,11 @@ class DBMgr {
// `true` if the current traffic to this process is idle.
std::atomic<bool> idleTraffic;

/**
* Non-zero if global log throttling is active.
*/
std::atomic<uint32_t> globalThrottlingMs;

// Logger.
SimpleLogger* myLog;
};
Expand Down
43 changes: 43 additions & 0 deletions src/flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,41 @@ Flusher::Flusher(const std::string& w_name,
Flusher::~Flusher() {
}

void Flusher::calcGlobalThrottling(size_t total_num_log_files) {
if (gConfig.ltOpt.maxSleepTimeMs == 0) {
return;
}

DBMgr* dbm = DBMgr::getWithoutInit();

uint32_t next_throttling_ms = 0;
uint32_t old_ms =
dbm->getGlobalThrottling();

if (total_num_log_files > gConfig.ltOpt.startNumLogs) {
next_throttling_ms =
(total_num_log_files - gConfig.ltOpt.startNumLogs) *
gConfig.ltOpt.maxSleepTimeMs /
(gConfig.ltOpt.limitNumLogs - gConfig.ltOpt.startNumLogs);
if (next_throttling_ms > gConfig.ltOpt.maxSleepTimeMs) {
next_throttling_ms = gConfig.ltOpt.maxSleepTimeMs;
}
}

if (next_throttling_ms != old_ms) {
dbm->setGlobalThrottling(next_throttling_ms);

auto logger = dbm->getLogger();
_timed_log_g(logger,
5000,
SimpleLogger::TRACE, SimpleLogger::INFO,
"total log files %zu, "
"global throttling is set to %u ms (was %u ms).",
total_num_log_files,
next_throttling_ms, old_ms);
}
}

void Flusher::work(WorkerOptions* opt_base) {
Status s;

Expand Down Expand Up @@ -136,15 +171,23 @@ void Flusher::work(WorkerOptions* opt_base) {
// as long as we are holding `dbMapLock`.
std::vector<DBWrap*> dbs_to_check;

size_t total_num_log_files = 0;

skiplist_node* cursor = skiplist_begin(&dbm->dbMap);
while (cursor) {
DBWrap* dbwrap = _get_entry(cursor, DBWrap, snode);
dbs_to_check.push_back(dbwrap);
if (!dbwrap->db->p->dbConfig.logSectionOnly) {
total_num_log_files += dbwrap->db->p->logMgr->getNumLogFiles();
}

cursor = skiplist_next(&dbm->dbMap, cursor);
skiplist_release_node(&dbwrap->snode);
}
if (cursor) skiplist_release_node(cursor);

calcGlobalThrottling(total_num_log_files);

size_t num_dbs = dbs_to_check.size();
if (++lastCheckedFileIndex >= num_dbs) lastCheckedFileIndex = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class Flusher : public WorkerBase {
~Flusher();
void work(WorkerOptions* opt_base);

void calcGlobalThrottling(size_t total_num_log_files);

GlobalConfig gConfig;
size_t lastCheckedFileIndex;
FlusherType type;
Expand Down
80 changes: 51 additions & 29 deletions src/log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,36 +550,47 @@ Status LogMgr::addNewLogFile(LogFileInfoGuard& cur_log_file_info,
}

void LogMgr::execBackPressure(uint64_t elapsed_us) {
if (throttlingRate > 0) {
DBMgr* mgr = DBMgr::getWithoutInit();
GlobalConfig* g_config = mgr->getGlobalConfig();

// Throttling.
double exp_us = 1000000.0 / throttlingRate.load();

size_t effective_time_ms =
std::min( lastFlushIntervalMs.load(),
(int64_t)THROTTLING_EFFECTIVE_TIME_MS );
size_t num_log_files = mani->getNumLogFiles();
size_t log_files_limit = g_config->flusherMinLogFilesToTrigger * 2;
if (num_log_files > log_files_limit) {
effective_time_ms *= (num_log_files - log_files_limit);
DBMgr* mgr = DBMgr::getWithoutInit();
GlobalConfig* g_config = mgr->getGlobalConfig();

if (g_config->ltOpt.maxSleepTimeMs) {
uint32_t global_throttling_ms = mgr->getGlobalThrottling();
if (global_throttling_ms) {
// Global throttling is enabled.
// Do not apply local throttling.
_log_trace(myLog, "global throttling %u ms", global_throttling_ms);
Timer::sleepMs(global_throttling_ms);
return;
}
}

uint64_t throttle_age_ms = throttlingRateTimer.getUs() / 1000;
if ( effective_time_ms &&
throttle_age_ms < effective_time_ms ) {
// Should consider age.
exp_us *= (effective_time_ms - throttle_age_ms);
exp_us /= effective_time_ms;

double cur_us = elapsed_us;
if ( exp_us > cur_us ) {
// Throttle incoming writes.
double remaining_us = exp_us - cur_us;
if (remaining_us > 1.0) {
Timer::sleepUs((uint64_t)remaining_us);
}
if (throttlingRate == 0) return;

// Throttling.
double exp_us = 1000000.0 / throttlingRate.load();

size_t effective_time_ms =
std::min( lastFlushIntervalMs.load(),
(int64_t)THROTTLING_EFFECTIVE_TIME_MS );
size_t num_log_files = mani->getNumLogFiles();
size_t log_files_limit = g_config->flusherMinLogFilesToTrigger * 2;
if (num_log_files > log_files_limit) {
effective_time_ms *= (num_log_files - log_files_limit);
}

uint64_t throttle_age_ms = throttlingRateTimer.getUs() / 1000;
if ( effective_time_ms &&
throttle_age_ms < effective_time_ms ) {
// Should consider age.
exp_us *= (effective_time_ms - throttle_age_ms);
exp_us /= effective_time_ms;

double cur_us = elapsed_us;
if ( exp_us > cur_us ) {
// Throttle incoming writes.
double remaining_us = exp_us - cur_us;
if (remaining_us > 1.0) {
Timer::sleepUs((uint64_t)remaining_us);
}
}
}
Expand Down Expand Up @@ -1988,8 +1999,19 @@ bool LogMgr::checkTimeToFlush(const GlobalConfig& config) {
if (seq_max > seq_last_flush + config.flusherMinRecordsToTrigger) {
return true;
}

size_t min_log_files = config.flusherMinLogFilesToTrigger;
DBMgr* dbm = DBMgr::getWithoutInit();
if (dbm && dbm->getGlobalThrottling()) {
// If global throttling is active, we should flush more often.
_log_trace(myLog, "global throttling is active, flush more often. "
"l_max %lu, l_last_flush %lu, seq_max %lu, seq_last_flush %lu",
l_max, l_last_flush, seq_max, seq_last_flush);
min_log_files = 1;
}

// If the number of log files exceeds the limit.
if (l_max > l_last_flush + config.flusherMinLogFilesToTrigger) {
if (l_max > l_last_flush + min_log_files) {
return true;
}

Expand Down
94 changes: 93 additions & 1 deletion tests/stress/flush_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ int auto_flusher_stress_test(size_t dur_sec) {
jungle::GlobalConfig g_config;
g_config.numFlusherThreads = 1;
g_config.fdbCacheSize = (uint64_t)1024*1024*1024; // 1GB
g_config.ltOpt.startNumLogs = 64;
g_config.ltOpt.limitNumLogs = 128;
g_config.ltOpt.maxSleepTimeMs = 1000;
jungle::init(g_config);

jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config)
//config.maxEntriesInLogFile = 1000;
config.maxEntriesInLogFile = 1000;

jungle::DB* db;
CHK_Z(jungle::DB::open(&db, filename, config));
Expand Down Expand Up @@ -214,6 +217,93 @@ int auto_flusher_stress_test(size_t dur_sec) {
return 0;
}

int auto_flusher_many_db_test(size_t dur_sec) {
jungle::Status s;

const std::string prefix = TEST_SUITE_AUTO_PREFIX;
TestSuite::clearTestFile(prefix);
std::string filename = TestSuite::getTestFileName(prefix);
TestSuite::mkdir(filename);

jungle::GlobalConfig g_config;
g_config.numFlusherThreads = 1;
g_config.fdbCacheSize = (uint64_t)1024*1024*1024; // 1GB
g_config.ltOpt.startNumLogs = 64;
g_config.ltOpt.limitNumLogs = 128;
g_config.ltOpt.maxSleepTimeMs = 1000;
jungle::init(g_config);

jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config)
config.maxEntriesInLogFile = 1000;

const size_t NUM_DBS = 32;

std::vector<jungle::DB*> dbs;
for (size_t ii=0; ii<NUM_DBS; ++ii) {
jungle::DB* db;
std::string path = filename + "/" + TestSuite::lzStr(2, ii);
CHK_Z(jungle::DB::open(&db, path, config));
dbs.push_back(db);
}

uint64_t idx = 0;
uint64_t last_inserted_idx = 0;

TestSuite::Progress pp(dur_sec, "populating", "sec");
TestSuite::Timer tt(dur_sec * 1000);
while (!tt.timeover()) {
std::string key = "k" + TestSuite::lzStr(7, idx);
std::string val = "v" + TestSuite::lzStr(7, idx);

for (auto& db: dbs) {
CHK_Z(db->set(jungle::KV(key, val)));
}
idx++;
last_inserted_idx = idx;

uint64_t cur_sec = tt.getTimeUs() / 1000000;
pp.update(cur_sec);
}
TestSuite::_msg("%lu writes (* %zu = %lu)\n", idx, NUM_DBS, idx * NUM_DBS);

// Close, reopen, verify (twice).
for (size_t ii=0; ii<2; ++ii) {
for (size_t ii=0; ii<NUM_DBS; ++ii) {
pp = TestSuite::Progress(last_inserted_idx,
"verifying " + TestSuite::lzStr(2, ii));

std::string path = filename + "/" + TestSuite::lzStr(2, ii);
CHK_Z(jungle::DB::close(dbs[ii]));
CHK_Z(jungle::DB::open(&dbs[ii], path, config));

for (uint64_t jj = 0; jj < last_inserted_idx; ++jj) {
std::string key = "k" + TestSuite::lzStr(7, jj);
std::string val = "v" + TestSuite::lzStr(7, jj);

jungle::SizedBuf key_req(key);
jungle::SizedBuf value_out;
s = dbs[ii]->get(key_req, value_out);
CHK_Z(s);

CHK_EQ(jungle::SizedBuf(val), value_out);
value_out.free();
pp.update(jj);
}

pp.done();
}
}

for (auto& db: dbs) {
CHK_Z(jungle::DB::close(db));
}
CHK_Z(jungle::shutdown());

TestSuite::clearTestFile(prefix, TestSuite::END_OF_TEST);
return 0;
}

int main(int argc, char** argv) {
TestSuite ts(argc, argv);

Expand All @@ -223,6 +313,8 @@ int main(int argc, char** argv) {
flusher_stress_basic_test, TestRange<size_t>({10}) );
ts.doTest( "auto flusher stress test",
auto_flusher_stress_test, TestRange<size_t>({10}) );
ts.doTest( "auto flusher many db test",
auto_flusher_many_db_test, TestRange<size_t>({10}) );

return 0;
}
Expand Down