Skip to content

Commit

Permalink
Correct a few places
Browse files Browse the repository at this point in the history
* Handle the case when compaction was cancelled in the middle.

* Compact all L0 tables first, add new tables, and then remove tables,
  to avoid data loss against crash at any steps.
  • Loading branch information
greensky00 committed Jan 7, 2025
1 parent 1960674 commit 6970e39
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 66 deletions.
6 changes: 6 additions & 0 deletions include/libjungle/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class CompactOptions {
CompactOptions()
: preserveTombstone(false)
, ignoreThreshold(false)
, doNotRemoveOldFile(false)
{}

/**
Expand All @@ -92,6 +93,11 @@ class CompactOptions {
* manner, even though it does not meet the compaction threshold.
*/
bool ignoreThreshold;

/**
* If `true`, the old file will not be removed after compaction.
*/
bool doNotRemoveOldFile;
};

/**
Expand Down
14 changes: 7 additions & 7 deletions src/table_compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ namespace jungle {
//
Status TableMgr::compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level,
bool adjusting_num_l0) {
size_t level) {
if (level >= mani->getNumLevels()) return Status::INVALID_LEVEL;

Status s;
Expand Down Expand Up @@ -376,7 +375,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
// 1) number of files after split, and
// 2) min keys for each new file.
do {
if (!isCompactionAllowed() && !adjusting_num_l0) {
if (!isCompactionAllowed()) {
throw Status(Status::COMPACTION_CANCELLED);
}

Expand Down Expand Up @@ -445,7 +444,6 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
? &twh.leasedWriters[worker_idx]->writerArgs
: &local_args;
w_args->callerAwaiter.reset();
w_args->adjustingNumL0 = adjusting_num_l0;

uint64_t count = (jj + 1 == num_new_tables)
? offsets.size() - new_tables[jj]->index
Expand Down Expand Up @@ -483,7 +481,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
}
}

if (!isCompactionAllowed() && !adjusting_num_l0) {
if (!isCompactionAllowed()) {
// NOTE: keys will be freed below.
for (LsmFlushResult& rr: results) delete rr.tFile;
throw Status(Status::COMPACTION_CANCELLED);
Expand Down Expand Up @@ -513,7 +511,9 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
//
// Hence, all threads who are going to write data to the same level
// should hold `mani->getLock()`.
mani->removeTableFile(level, local_victim);
if (!options.doNotRemoveOldFile) {
mani->removeTableFile(level, local_victim);
}

// NOTE:
// As an optimization, if this level is neither zero nor last one,
Expand Down Expand Up @@ -752,7 +752,7 @@ Status TableMgr::compactL0(const CompactOptions& options,
uint32_t hash_num)
{
if (!allowCompaction) {
_log_warn(myLog, "compaction is now allowed");
_log_warn(myLog, "compaction is not allowed");
return Status::COMPACTION_IS_NOT_ALLOWED;
}

Expand Down
167 changes: 122 additions & 45 deletions src/table_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const size_t TableMgr::APPROX_META_SIZE = 96;
TableMgr::TableMgr(DB* parent_db)
: parentDb(parent_db)
, allowCompaction(false)
, tableAdjInProgress(false)
, mani(nullptr)
, numL0Partitions(1)
, numL1Compactions(0)
Expand Down Expand Up @@ -249,52 +250,15 @@ Status TableMgr::init(const TableMgrOptions& _options) {
// Adjust num L0 partitions blockingly only when it is not logSectionOnly mdoe,
// not read only mode, and the number of L0 partitions read from existing manifest
// file is different from the number specified in db config.
if (!db_config->logSectionOnly && !db_config->readOnly
&& numL0Partitions != db_config->numL0Partitions) {
_log_info(myLog,
"adjust numL0 partitions: %zu -> %zu",
numL0Partitions,
db_config->numL0Partitions);

if (!db_config->nextLevelExtension) {
_log_err(myLog, "[Adjust numL0] not allowed in L0 only mode");
throw Status(Status::INVALID_CONFIG);
if (!db_config->logSectionOnly &&
!db_config->readOnly &&
numL0Partitions != db_config->numL0Partitions) {
s = adjustNumL0Partitions();
if (!s.ok()) {
// If failed to adjust num L0 partitions, tolerate it.
// We can try it next time.
_log_warn(myLog, "failed to adjust num L0 partitions: %d", s);
}

// Need to compact all existing L0 tables to L1 and recreate empty L0 tables,
// otherwise hash will be messed up.
for (size_t ii = 0; ii < numL0Partitions; ++ii) {
// Force compact L0 table to L1 in blocking manner to reduce L0
// partitions.
std::list<TableInfo*> tables;
s = mani->getL0Tables(ii, tables);
if (tables.size() != 1 || !s) {
_log_err(myLog, "[Adjust numL0] tables of hash %zu not found", ii);
throw s;
}
s = compactLevelItr(CompactOptions(), tables.back(), 0, true);
if (!s) {
_log_err(myLog, "[Adjust numL0] compaction error: %d", s);
throw s;
}
// The compacted table is remove from manifest in compactLevelItr,
// just release
for (TableInfo*& table: tables) {
table->done();
}
}
for (size_t ii = 0; ii < db_config->numL0Partitions; ++ii) {
TableFile* t_file = nullptr;
TableFileOptions t_opt;
// Set 1M bits as an initial size.
// It will be converging to some value as compaction happens.
t_opt.bloomFilterSize = 1024 * 1024;
EP(createNewTableFile(0, t_file, t_opt));
EP(mani->addTableFile(0, ii, SizedBuf(), t_file));
}
// Store manifest file.
mani->store(true);
numL0Partitions = db_config->numL0Partitions;
}

} else {
Expand Down Expand Up @@ -356,6 +320,119 @@ Status TableMgr::init(const TableMgrOptions& _options) {
}
}

Status TableMgr::adjustNumL0Partitions() {
Status s;
const DBConfig* db_config = getDbConfig();

_log_info(myLog, "adjust numL0 partitions: %zu -> %zu",
numL0Partitions, db_config->numL0Partitions);

if (!db_config->nextLevelExtension) {
_log_err(myLog, "[adjust numL0] not allowed in L0 only mode");
throw Status(Status::INVALID_CONFIG);
}

tableAdjInProgress = true;
GcFunc gcf_adj_flag([this]() {
// Clear the flag when we exit this function.
tableAdjInProgress = false;
});

// Need to compact all existing L0 tables to L1 and recreate empty L0 tables,
// otherwise hash will be messed up.
std::list<TableInfo*> l0_tables;
CompactOptions c_opt;
c_opt.doNotRemoveOldFile = true;

for (size_t ii = 0; ii < numL0Partitions; ++ii) {
// Force compact L0 table to L1 in blocking manner to reduce L0
// partitions.
std::list<TableInfo*> tables;

// On any error, we should release the table.
GcFunc gcf([&tables]() {
for (auto& t: tables) {
t->done();
}
});

s = mani->getL0Tables(ii, tables);
if (tables.size() < 1 || !s) {
_log_err(myLog, "[adjust numL0] tables of hash %zu not found", ii);
EP(s);
}

// WARNING:
// There can be two tables for the same hash, if the DB was closed
// in the middle of compaction.
//
// e.g.)
// * let's say hash 0's table: table_001, and L1's table table_002
// * compaction happens from L0 to L1.
// - new table table_003 is created for hash 0 in L0 to serve traffic.
// - meanwhile, table_001's data is flushed to table_002
// * before compaction is done, DB is closed.
// - as a result, hash 0 will have two tables: table_001 and table_003.
//
// In this case, we have to flush both table_001 and table_003 to L1.

for (auto& tt: tables) {
s = compactLevelItr(c_opt, tables.back(), 0);
if (!s) {
_log_err(myLog, "[adjust numL0] compaction error: %d, "
"table: %lu, hash %zu",
s, tt->number, tt->hashNum);
EP(s);
}
}

// For safety reason (any shutdown or crash in the middle),
// original file should be removed and released after
// the new L0 (with new num L0 setting) tables are created.
//
// Until then, keep the original file in the list.
for (TableInfo*& table: tables) {
l0_tables.push_back(table);
}
tables.clear();
}

// After this point, there won't be data loss, as all data in L0 are
// already copied to L1. Any errors happen after this can be tolerable.

// Tables in `l0_tables` should be release when we exit this function.
GcFunc gcf_l0_tables([&l0_tables]() {
for (auto& t: l0_tables) {
t->done();
}
});

for (size_t ii = 0; ii < db_config->numL0Partitions; ++ii) {
TableFile* t_file = nullptr;
TableFileOptions t_opt;

// Set 1M bits as an initial size.
// It will be converging to some value as compaction happens.
t_opt.bloomFilterSize = 1024 * 1024;
EP(createNewTableFile(0, t_file, t_opt));
EP(mani->addTableFile(0, ii, SizedBuf(), t_file));
}

// Now release and remove table files at once.
for (TableInfo*& table: l0_tables) {
mani->removeTableFile(0, table);
}

// Store manifest file.
EP(mani->store(true));
numL0Partitions = db_config->numL0Partitions;

_log_info(myLog, "adjust numL0 partitions: %zu -> %zu, done",
numL0Partitions, db_config->numL0Partitions);

return Status();
}

Status TableMgr::removeStaleFiles() {
// Do nothing in read only mode.
if (getDbConfig()->readOnly) return Status();
Expand Down
21 changes: 16 additions & 5 deletions src/table_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class TableMgr {

Status init(const TableMgrOptions& _options);

Status adjustNumL0Partitions();

Status removeStaleFiles();

Status shutdown();
Expand Down Expand Up @@ -236,8 +238,7 @@ class TableMgr {

Status compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level,
bool adjust_num_l0 = false);
size_t level);

Status migrateLevel(const CompactOptions& options,
size_t level);
Expand Down Expand Up @@ -286,7 +287,9 @@ class TableMgr {

uint64_t getBoosterLimit(size_t level) const;

bool isCompactionAllowed() const { return allowCompaction; }
bool isCompactionAllowed() const {
return allowCompaction || tableAdjInProgress;
}

void setTableFile(std::list<Record*>& batch,
std::list<uint64_t>& checkpoints,
Expand All @@ -301,8 +304,7 @@ class TableMgr {
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count,
bool adjusting_num_l0 = false);
uint64_t count);

void setTableFileItrFlush(TableFile* dst_file,
std::list<Record*>& recs_batch,
Expand Down Expand Up @@ -527,8 +529,17 @@ class TableMgr {
*/
DB* parentDb;

/**
* If `false`, compaction is not allowed.
*/
std::atomic<bool> allowCompaction;

/**
* If `true`, internal adjustment is in progress. Temporarily allow
* compaction or other mutations, even though `allowCompaction == false`.
*/
std::atomic<bool> tableAdjInProgress;

TableMgrOptions opt;
TableManifest* mani;

Expand Down
5 changes: 2 additions & 3 deletions src/table_set_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count,
bool adjusting_num_l0 ) {
uint64_t count) {
const DBConfig* db_config = getDbConfig();
(void)db_config;

Expand Down Expand Up @@ -86,7 +85,7 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,

try {
for (uint64_t ii = start_index; ii < start_index + count; ++ii) {
if (!isCompactionAllowed() && !adjusting_num_l0 ) {
if (!isCompactionAllowed()) {
// To avoid file corruption, we should flush all cached pages
// even for cancel.
Timer cancel_timer;
Expand Down
4 changes: 1 addition & 3 deletions src/table_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ TableWriterArgs::TableWriterArgs()
: writerId(0)
, stopSignal(false)
, myLog(nullptr)
, adjustingNumL0(false)
{}

void TableWriterArgs::invoke() {
Expand Down Expand Up @@ -171,8 +170,7 @@ void TableWriterMgr::doTableWrite(TableWriterArgs* args) {
args->payload.targetTableFile,
*args->payload.offsets,
args->payload.startIdx,
args->payload.count,
args->adjustingNumL0.load() );
args->payload.count );
}
}

Expand Down
1 change: 0 additions & 1 deletion src/table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ struct TableWriterArgs {
std::atomic<bool> stopSignal;
TableWritePayload payload;
SimpleLogger* myLog;
std::atomic<bool> adjustingNumL0;
};

struct TableWriterPkg {
Expand Down
2 changes: 0 additions & 2 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1829,8 +1829,6 @@ int different_l0_partitions() {
CHK_Z(db->flushLogs(jungle::FlushOptions()));
CHK_Z(_get_bykey_check(0, n, db, kv5));

// Wait 7 seconds.
// TestSuite::sleep_sec(7);
CHK_Z(jungle::DB::close(db));

CHK_Z(jungle::shutdown());
Expand Down

0 comments on commit 6970e39

Please sign in to comment.