Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flexiable num of L0 (blocking approach) #180

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/table_compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace jungle {
//
Status TableMgr::compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level)
{
size_t level,
bool adjusting_num_l0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If passing adjusting_num_l0 is for blocking compaction cancellation, we should remove it.

if (level >= mani->getNumLevels()) return Status::INVALID_LEVEL;

Status s;
Expand Down Expand Up @@ -376,7 +376,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
// 1) number of files after split, and
// 2) min keys for each new file.
do {
if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not acceptable. No matter what the reason is, compaction should be cancelled upon the close of DB instance.

throw Status(Status::COMPACTION_CANCELLED);
}

Expand Down Expand Up @@ -445,6 +445,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
? &twh.leasedWriters[worker_idx]->writerArgs
: &local_args;
w_args->callerAwaiter.reset();
w_args->adjustingNumL0 = adjusting_num_l0;

uint64_t count = (jj + 1 == num_new_tables)
? offsets.size() - new_tables[jj]->index
Expand Down Expand Up @@ -482,7 +483,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
}
}

if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0) {
// NOTE: keys will be freed below.
for (LsmFlushResult& rr: results) delete rr.tFile;
throw Status(Status::COMPACTION_CANCELLED);
Expand Down
64 changes: 58 additions & 6 deletions src/table_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,6 @@ Status TableMgr::init(const TableMgrOptions& _options) {
opt.compressionEnabled = true;
}

compactStatus.resize(db_config->numL0Partitions);
for (size_t ii=0; ii<compactStatus.size(); ++ii) {
std::atomic<bool>*& entry = compactStatus[ii];
entry = new std::atomic<bool>(false);
}

Status s;
mani = new TableManifest(this, opt.fOps);
mani->setLogger(myLog);
Expand Down Expand Up @@ -252,6 +246,57 @@ Status TableMgr::init(const TableMgrOptions& _options) {
}
}

// Adjust num L0 partitions blockingly only when it is not logSectionOnly mdoe,
// not read only mode, and the number of L0 partitions read from existing manifest
// file is different from the number specified in db config.
if (!db_config->logSectionOnly && !db_config->readOnly
&& numL0Partitions != db_config->numL0Partitions) {
_log_info(myLog,
"adjust numL0 partitions: %zu -> %zu",
numL0Partitions,
db_config->numL0Partitions);

if (!db_config->nextLevelExtension) {
_log_err(myLog, "[Adjust numL0] not allowed in L0 only mode");
throw Status(Status::INVALID_CONFIG);
Comment on lines +260 to +261
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not return error, but follow the previous behavior (accept the existing number).

}

// Need to compact all existing L0 tables to L1 and recreate empty L0 tables,
// otherwise hash will be messed up.
for (size_t ii = 0; ii < numL0Partitions; ++ii) {
// Force compact L0 table to L1 in blocking manner to reduce L0
// partitions.
std::list<TableInfo*> tables;
s = mani->getL0Tables(ii, tables);
if (tables.size() != 1 || !s) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be more than 1 tables for the same hash, if previous L0 table was in the middle of compaction and the DB was closed.

_log_err(myLog, "[Adjust numL0] tables of hash %zu not found", ii);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use [adjust numL0]. Using capital letter makes hassles when we search logs by keywords.

throw s;
}
s = compactLevelItr(CompactOptions(), tables.back(), 0, true);
if (!s) {
_log_err(myLog, "[Adjust numL0] compaction error: %d", s);
throw s;
}
// The compacted table is remove from manifest in compactLevelItr,
// just release
for (TableInfo*& table: tables) {
table->done();
}
Comment on lines +275 to +284
Copy link
Contributor

@greensky00 greensky00 Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not that simple problem. What if the process is force-closed (or crashed) in the middle so that a few tables are removed and the others are not, and then we reopen the DB? L0 is screwed up and how are you going to continue adjusting L0? Data loss or letting DB instance unavailable is not acceptable.

And this scenario should be thoroughly tested. Adjusting L0 is very risky and vulnerable.

}
for (size_t ii = 0; ii < db_config->numL0Partitions; ++ii) {
TableFile* t_file = nullptr;
TableFileOptions t_opt;
// Set 1M bits as an initial size.
// It will be converging to some value as compaction happens.
t_opt.bloomFilterSize = 1024 * 1024;
EP(createNewTableFile(0, t_file, t_opt));
EP(mani->addTableFile(0, ii, SizedBuf(), t_file));
}
// Store manifest file.
mani->store(true);
numL0Partitions = db_config->numL0Partitions;
}

} else {
// Not exist, initial setup phase.

Expand Down Expand Up @@ -287,6 +332,13 @@ Status TableMgr::init(const TableMgrOptions& _options) {
// Store manifest file.
mani->store(true);
}

compactStatus.resize(numL0Partitions);
for (size_t ii = 0; ii < compactStatus.size(); ++ii) {
std::atomic<bool>*& entry = compactStatus[ii];
entry = new std::atomic<bool>(false);
}

logTableSettings(db_config);

removeStaleFiles();
Expand Down
6 changes: 4 additions & 2 deletions src/table_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ class TableMgr {

Status compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level);
size_t level,
bool adjust_num_l0 = false);

Status migrateLevel(const CompactOptions& options,
size_t level);
Expand Down Expand Up @@ -300,7 +301,8 @@ class TableMgr {
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count);
uint64_t count,
bool adjusting_num_l0 = false);

void setTableFileItrFlush(TableFile* dst_file,
std::list<Record*>& recs_batch,
Expand Down
6 changes: 3 additions & 3 deletions src/table_set_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count )
{
uint64_t count,
bool adjusting_num_l0 ) {
const DBConfig* db_config = getDbConfig();
(void)db_config;

Expand Down Expand Up @@ -86,7 +86,7 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,

try {
for (uint64_t ii = start_index; ii < start_index + count; ++ii) {
if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0 ) {
// To avoid file corruption, we should flush all cached pages
// even for cancel.
Timer cancel_timer;
Expand Down
4 changes: 3 additions & 1 deletion src/table_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ TableWriterArgs::TableWriterArgs()
: writerId(0)
, stopSignal(false)
, myLog(nullptr)
, adjustingNumL0(false)
{}

void TableWriterArgs::invoke() {
Expand Down Expand Up @@ -170,7 +171,8 @@ void TableWriterMgr::doTableWrite(TableWriterArgs* args) {
args->payload.targetTableFile,
*args->payload.offsets,
args->payload.startIdx,
args->payload.count );
args->payload.count,
args->adjustingNumL0.load() );
}
}

Expand Down
1 change: 1 addition & 0 deletions src/table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct TableWriterArgs {
std::atomic<bool> stopSignal;
TableWritePayload payload;
SimpleLogger* myLog;
std::atomic<bool> adjustingNumL0;
};

struct TableWriterPkg {
Expand Down
41 changes: 33 additions & 8 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1778,42 +1778,67 @@ int different_l0_partitions() {
config.numL0Partitions = 1;
CHK_Z(jungle::DB::open(&db, filename, config));
int n = 10;

// Insert, flush, and check
std::vector<jungle::KV> kv(n);
CHK_Z(_init_kv_pairs(n, kv, "key", "value"));
CHK_Z(_init_kv_pairs(n, kv, "key1", "value1"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(jungle::DB::close(db));

// Change the number of partitions,
// but it should be ignored internally.
// Increase the number of partitions, it should be handle correctly internally.
config.numL0Partitions = 4;

// Reopen & check.
CHK_Z(jungle::DB::open(&db, filename, config));
CHK_Z(_get_bykey_check(0, n, db, kv));

// Insert more.
// Insert more, flush and check.
std::vector<jungle::KV> kv2(n);
CHK_Z(_init_kv_pairs(n, kv2, "key_new", "value_new"));
CHK_Z(_init_kv_pairs(n, kv2, "key2", "value2"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv2));
std::vector<jungle::KV> kv3(n);
CHK_Z(_init_kv_pairs(n, kv3, "key3", "value3"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv3));
std::vector<jungle::KV> kv4(n);
CHK_Z(_init_kv_pairs(n, kv4, "key4", "value4"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv4));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));

// Check both.
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(_get_bykey_check(0, n, db, kv2));
CHK_Z(_get_bykey_check(0, n, db, kv3));
CHK_Z(_get_bykey_check(0, n, db, kv4));
CHK_Z(jungle::DB::close(db));

// Decrease the number of partitions, it should be handle correctly internally.
config.numL0Partitions = 2;
// Reopen & check.
CHK_Z(jungle::DB::open(&db, filename, config));
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(_get_bykey_check(0, n, db, kv2));
CHK_Z(_get_bykey_check(0, n, db, kv3));
CHK_Z(_get_bykey_check(0, n, db, kv4));

// Insert more, flush and check
std::vector<jungle::KV> kv5(n);
CHK_Z(_init_kv_pairs(n, kv5, "key5", "value5"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv5));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));
CHK_Z(_get_bykey_check(0, n, db, kv5));

// Wait 7 seconds.
// TestSuite::sleep_sec(7);
CHK_Z(jungle::DB::close(db));

CHK_Z(jungle::shutdown());
_free_kv_pairs(n, kv);
_free_kv_pairs(n, kv2);
_free_kv_pairs(n, kv3);
_free_kv_pairs(n, kv4);
_free_kv_pairs(n, kv5);

TEST_SUITE_CLEANUP_PATH();
return 0;
Expand Down
Loading