Skip to content

Commit

Permalink
Add a new option serializeMultiThreadedLogFlush (#182)
Browse files Browse the repository at this point in the history
* With this option, and if multi threads are calling `sync` or
`flushLogs`, all threads will be executed one by one instead of letting
one thread be executed and returning `OPERATION_IN_PROGRESS` to the others.
  • Loading branch information
greensky00 authored Oct 30, 2024
1 parent 5473c1b commit a854936
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 35 deletions.
9 changes: 9 additions & 0 deletions include/libjungle/db_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class DBConfig {
, fastIndexScan(false)
, seqLoadingDelayFactor(0)
, safeMode(false)
, serializeMultiThreadedLogFlush(false)
{
tableSizeRatio.push_back(2.5);
levelSizeRatio.push_back(10.0);
Expand Down Expand Up @@ -575,6 +576,14 @@ class DBConfig {
* real production environment.
*/
bool safeMode;

/**
* If `true`, `sync` and `flushLogs` calls by multiple threads will be serialized,
* and executed one by one.
* If `false`, only one thread will execute `sync` and `flushLogs` calls, while
* the other concurrent threads will get `OPERATION_IN_PROGRESS` status.
*/
bool serializeMultiThreadedLogFlush;
};

class GlobalConfig {
Expand Down
7 changes: 7 additions & 0 deletions include/libjungle/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ struct DebugParams {
, newLogBatchCb(nullptr)
, getLogFileInfoBySeqCb(nullptr)
, logFlushCb(nullptr)
, syncCb(nullptr)
, forceMerge(false)
{}

Expand Down Expand Up @@ -299,6 +300,12 @@ struct DebugParams {
*/
std::function< void(const GenericCbParams&) > logFlushCb;

/**
* Callback function that will be invoked at the beginning log sync
* (reading memtable data and writing them into log files).
*/
std::function< void(const GenericCbParams&) > syncCb;

/**
* If true, merge will proceed the task even with the small number
* of tables in the level.
Expand Down
7 changes: 4 additions & 3 deletions src/log_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ Status LogManifest::clone(const std::string& dst_path) {
Status LogManifest::store(bool call_fsync) {
if (mFileName.empty() || !fOps) return Status::NOT_INITIALIZED;

if (call_fsync) {
// `fsync` is required: calls by multiple threads should be serialized.
if (call_fsync || logMgr->getDbConfig()->serializeMultiThreadedLogFlush) {
// `fsync` is required, or serialize option is on:
// calls by multiple threads should be serialized.
std::unique_lock<std::mutex> l(mFileWriteLock);
return storeInternal(call_fsync);
} else {
Expand Down Expand Up @@ -600,7 +601,7 @@ Status LogManifest::storeInternal(bool call_fsync) {
if (need_truncate) {
fOps->ftruncate(mFile, ss.pos());
}

bool backup_done = false;
if (call_fsync) {
s = fOps->fsync(mFile);
Expand Down
64 changes: 41 additions & 23 deletions src/log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,12 @@ void LogMgr::logMgrSettings() {

_log_info( myLog,
"initialized log manager, memtable flush buffer %zu, "
"direct-IO %s, custom hash length function %s",
"direct-IO %s, custom hash length function %s, "
"sync multi-threaded log flush %s",
g_conf->memTableFlushBufferSize,
get_on_off_str(opt.dbConfig->directIoOpt.enabled),
get_on_off_str((bool)opt.dbConfig->customLenForHash) );
get_on_off_str((bool)opt.dbConfig->customLenForHash),
opt.dbConfig->serializeMultiThreadedLogFlush ? "ON" : "OFF" );
}

Status LogMgr::rollback(uint64_t seq_upto) {
Expand All @@ -215,7 +217,8 @@ Status LogMgr::rollback(uint64_t seq_upto) {
DebugParams d_params = mgr->getDebugParams();

// Return error in read-only mode.
if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION;
const DBConfig* db_config = getDbConfig();
if (db_config->readOnly) return Status::WRITE_VIOLATION;

// WARNING:
// Both syncing (memtable -> log) and flushing (log -> table)
Expand All @@ -225,35 +228,35 @@ Status LogMgr::rollback(uint64_t seq_upto) {
const size_t MAX_RETRY_MS = 1000; // 1 second.
tt.setDurationMs(MAX_RETRY_MS);

OpSemaWrapper ow_sync(&syncSema);
OpSemaWrapper ow_sync(&syncSema, db_config->serializeMultiThreadedLogFlush);
while (!ow_sync.acquire()) {
if (tt.timeout()) {
_log_err(myLog, "rollback timeout due to sync");
return Status::TIMEOUT;
}
Timer::sleepMs(10);
}
assert(ow_sync.op_sema->enabled);
assert(ow_sync.opSema->enabled);

OpSemaWrapper ow_flush(&flushSema);
OpSemaWrapper ow_flush(&flushSema, db_config->serializeMultiThreadedLogFlush);
while (!ow_flush.acquire()) {
if (tt.timeout()) {
_log_err(myLog, "rollback timeout due to flush");
return Status::TIMEOUT;
}
Timer::sleepMs(10);
}
assert(ow_flush.op_sema->enabled);
assert(ow_flush.opSema->enabled);

OpSemaWrapper ow_reclaim(&reclaimSema);
OpSemaWrapper ow_reclaim(&reclaimSema, db_config->serializeMultiThreadedLogFlush);
while (!ow_reclaim.acquire()) {
if (tt.timeout()) {
_log_err(myLog, "rollback timeout due to reclaim");
return Status::TIMEOUT;
}
Timer::sleepMs(10);
}
assert(ow_reclaim.op_sema->enabled);
assert(ow_reclaim.opSema->enabled);

_log_info(myLog, "[ROLLBACK] upto %zu", seq_upto);

Expand Down Expand Up @@ -1124,21 +1127,32 @@ Status LogMgr::sync(bool call_fsync) {

Status LogMgr::syncNoWait(bool call_fsync) {
// Return error in read-only mode.
if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION;
const DBConfig* db_config = getDbConfig();
if (db_config->readOnly) return Status::WRITE_VIOLATION;

// Only one sync operation at a time.
OpSemaWrapper ow(&syncSema);
OpSemaWrapper ow(&syncSema, db_config->serializeMultiThreadedLogFlush);
if (!ow.acquire()) {
_log_debug(myLog, "Sync failed. Other thread is working on it.");
return Status::OPERATION_IN_PROGRESS;
}
assert(ow.op_sema->enabled);
assert(ow.opSema->enabled);
return syncInternal(call_fsync);
}

Status LogMgr::syncInternal(bool call_fsync) {
Status s;
uint64_t ln_from, ln_to;

DBMgr* dbm = DBMgr::getWithoutInit();
if (dbm && dbm->isDebugCallbackEffective()) {
DebugParams dp = dbm->getDebugParams();
if (dp.syncCb) {
DebugParams::GenericCbParams p;
dp.syncCb(p);
}
}

s = mani->getMaxLogFileNum(ln_to);
if (!s) {
// No log, do nothing.
Expand Down Expand Up @@ -1208,15 +1222,16 @@ Status LogMgr::syncInternal(bool call_fsync) {

Status LogMgr::discardDirty(uint64_t seq_begin) {
// Return error in read-only mode.
if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION;
const DBConfig* db_config = getDbConfig();
if (db_config->readOnly) return Status::WRITE_VIOLATION;

// Should not race with sync.
OpSemaWrapper ow(&syncSema);
OpSemaWrapper ow(&syncSema, db_config->serializeMultiThreadedLogFlush);
if (!ow.acquire()) {
_log_debug(myLog, "discard failed. Other thread is working on it.");
return Status::OPERATION_IN_PROGRESS;
}
assert(ow.op_sema->enabled);
assert(ow.opSema->enabled);

Status s;
uint64_t ln_from, ln_to;
Expand Down Expand Up @@ -1263,12 +1278,13 @@ Status LogMgr::flush(const FlushOptions& options,
return Status::INVALID_SEQNUM;
}

OpSemaWrapper ow(&flushSema);
const DBConfig* db_config = getDbConfig();
OpSemaWrapper ow(&flushSema, db_config->serializeMultiThreadedLogFlush);
if (!ow.acquire()) {
_log_debug(myLog, "Flush skipped. Other thread is working on it.");
return Status::OPERATION_IN_PROGRESS;
}
assert(ow.op_sema->enabled);
assert(ow.opSema->enabled);

Status s;
Timer tt;
Expand Down Expand Up @@ -1599,12 +1615,13 @@ void LogMgr::adjustThrottlingExtreme() {
}

Status LogMgr::doLogReclaim() {
OpSemaWrapper ow(&reclaimSema);
const DBConfig* db_config = getDbConfig();
OpSemaWrapper ow(&reclaimSema, db_config->serializeMultiThreadedLogFlush);
if (!ow.acquire()) {
_log_debug(myLog, "Reclaim skipped. Other thread is working on it.");
return Status::OPERATION_IN_PROGRESS;
}
assert(ow.op_sema->enabled);
assert(ow.opSema->enabled);

mani->reclaimExpiredLogFiles();
return Status();
Expand Down Expand Up @@ -1926,7 +1943,8 @@ Status LogMgr::close() {

// If sync() or flush() is running,
// wait until they finish their jobs.
OpSemaWrapper op_sync(&syncSema);
const DBConfig* db_config = getDbConfig();
OpSemaWrapper op_sync(&syncSema, db_config->serializeMultiThreadedLogFlush);
_log_info(myLog, "Wait for on-going sync operation.");

uint64_t ticks = 0;
Expand All @@ -1937,15 +1955,15 @@ Status LogMgr::close() {
syncSema.enabled = false;
_log_info(myLog, "Disabled syncing for %p, %zu ticks", this, ticks);

if (!getDbConfig()->readOnly) {
if (!db_config->readOnly) {
// Last sync before close (not in read-only mode).
syncInternal(false);
_log_info(myLog, "Last sync done");
} else {
_log_info(myLog, "read-only mode: skip the last sync");
}

OpSemaWrapper op_flush(&flushSema);
OpSemaWrapper op_flush(&flushSema, db_config->serializeMultiThreadedLogFlush);
_log_info(myLog, "Wait for on-going flush operation.");
ticks = 0;
while (!op_flush.acquire()) {
Expand All @@ -1956,7 +1974,7 @@ Status LogMgr::close() {
flushSema.enabled = false;
_log_info(myLog, "Disabled flushing for %p, %zu ticks", this, ticks);

OpSemaWrapper op_reclaim(&reclaimSema);
OpSemaWrapper op_reclaim(&reclaimSema, db_config->serializeMultiThreadedLogFlush);
_log_info(myLog, "Wait for on-going log reclaim operation.");
ticks = 0;
while (!op_reclaim.acquire()) {
Expand Down
30 changes: 22 additions & 8 deletions src/log_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,44 @@ struct OpSema {
OpSema() : enabled(true), grabbed(false) {}
std::atomic<bool> enabled;
std::atomic<bool> grabbed;

// Used only when `synchronizeMultiThreadedLogFlush` is on.
std::mutex lock;
};

struct OpSemaWrapper {
OpSemaWrapper(OpSema* _op_sema) : op_sema(_op_sema), acquired(false) {}
OpSemaWrapper(OpSema* op_sema, bool lock_mode = false)
: opSema(op_sema), acquired(false), lockMode(lock_mode) {}
~OpSemaWrapper() {
if (acquired) {
op_sema->grabbed = false;
opSema->grabbed = false;
if (lockMode) {
opSema->lock.unlock();
}
}
op_sema = nullptr;
opSema = nullptr;
acquired = false;
}

bool acquire() {
bool expected = false;
bool val = true;
if ( op_sema->enabled &&
op_sema->grabbed.compare_exchange_weak(expected, val) ) {
if (lockMode) {
opSema->lock.lock();
opSema->grabbed = true;
acquired = true;
} else {
bool expected = false;
bool val = true;
if ( opSema->enabled &&
opSema->grabbed.compare_exchange_weak(expected, val) ) {
acquired = true;
}
}
return acquired;
}

OpSema* op_sema;
OpSema* opSema;
bool acquired;
bool lockMode;
};

namespace checker { class Checker; }
Expand Down
Loading

0 comments on commit a854936

Please sign in to comment.