Skip to content

Commit

Permalink
Drain unnecessary levels when `level_compaction_dynamic_level_bytes=t…
Browse files Browse the repository at this point in the history
…rue` (facebook#11340)

Summary:
When a user migrates to level compaction + `level_compaction_dynamic_level_bytes=true`, or when a DB shrinks, there can be unnecessary levels in the DB. Before this PR, this is no way to remove these levels except a manual compaction. These extra unnecessary levels make it harder to guarantee max_bytes_for_level_multiplier and can cause extra space amp. This PR boosts compaction score for these levels to allow RocksDB to automatically drain these levels. Together with facebook#11321, this makes migration to `level_compaction_dynamic_level_bytes=true` automatic without needing user to do a one time full manual compaction. Credit: this PR is modified from facebook#3921.

Pull Request resolved: facebook#11340

Test Plan:
- New unit tests
- `python3 tools/db_crashtest.py whitebox --simple` which randomly sets level_compaction_dynamic_level_bytes in each run.

Reviewed By: ajkr

Differential Revision: D44563884

Pulled By: cbi42

fbshipit-source-id: e20d3620bd73dff22be18c5a91a07f340740bcc8
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
cbi42 authored and tabokie committed Aug 15, 2023
1 parent 452084b commit ccca9a6
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 23 deletions.
3 changes: 2 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
* For track_and_verify_wals_in_manifest, revert to the original behavior before #10087: syncing of live WAL file is not tracked, and we track only the synced sizes of **closed** WALs. (PR #10330).
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table (#7516).
* In leveled compaction with dynamic levelling, level multiplier is not anymore adjusted due to oversized L0. Instead, compaction score is adjusted by increasing size level target by adding incoming bytes from upper levels. This would deprioritize compactions from upper levels if more data from L0 is coming. This is to fix some unnecessary full stalling due to drastic change of level targets, while not wasting write bandwidth for compaction while writes are overloaded.
* For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now trivially moves levels down to fill LSM starting from bottommost level during DB open. See more in comments for option `level_compaction_dynamic_level_bytes`.
* For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now trivially moves levels down to fill LSM starting from bottommost level during DB open. See more in comments for option `level_compaction_dynamic_level_bytes`.
* For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now drains unnecessary levels through background compaction automatically (#11340). This together with #11321 makes it automatic to migrate other compaction settings to level compaction with `level_compaction_dynamic_level_bytes=true`. In addition, a live DB that becomes smaller will now have unnecessary levels drained which can help to reduce read and space amp.

## 6.29.5 (03/29/2022)
### Bug Fixes
Expand Down
5 changes: 3 additions & 2 deletions db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
TEST_F(CompactFilesTest, MultipleLevel) {
Options options;
options.create_if_missing = true;
options.level_compaction_dynamic_level_bytes = true;
// Otherwise background compaction can happen to
// drain unnecessary level
options.level_compaction_dynamic_level_bytes = false;
options.num_levels = 6;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
Expand Down Expand Up @@ -258,7 +260,6 @@ TEST_F(CompactFilesTest, MultipleLevel) {
for (int invalid_output_level = 0; invalid_output_level < 5;
invalid_output_level++) {
s = db->CompactFiles(CompactionOptions(), files, invalid_output_level);
std::cout << s.ToString() << std::endl;
ASSERT_TRUE(s.IsInvalidArgument());
}

Expand Down
133 changes: 133 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7429,6 +7429,139 @@ TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) {

#endif // !defined(ROCKSDB_LITE)

TEST_F(DBCompactionTest, DrainUnnecessaryLevelsAfterMultiplierChanged) {
// When the level size multiplier increases such that fewer levels become
// necessary, unnecessary levels should to be drained.
const int kBaseLevelBytes = 256 << 10; // 256KB
const int kFileBytes = 64 << 10; // 64KB
const int kInitMultiplier = 2, kChangedMultiplier = 10;
const int kNumFiles = 32;
const int kNumLevels = 5;
const int kValueBytes = 1 << 10; // 1KB

Options options = CurrentOptions();
options.compression = kNoCompression;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = kBaseLevelBytes;
options.max_bytes_for_level_multiplier = kInitMultiplier;
options.num_levels = kNumLevels;
Reopen(options);

// Initially we setup the LSM to look roughly as follows:
//
// L0: empty
// L1: 256KB
// ...
// L4: 1MB
Random rnd(301);
for (int file = 0; file < kNumFiles; ++file) {
for (int i = 0; i < kFileBytes / kValueBytes; ++i) {
ASSERT_OK(Put(Key(file * kFileBytes / kValueBytes + i),
rnd.RandomString(kValueBytes)));
}
ASSERT_OK(Flush());
}

int init_num_nonempty = 0;
ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int level = 1; level < kNumLevels; ++level) {
if (NumTableFilesAtLevel(level) > 0) {
++init_num_nonempty;
}
}

// After increasing the multiplier and running compaction fewer levels are
// needed to hold all the data. Unnecessary levels should be drained.
ASSERT_OK(db_->SetOptions({{"max_bytes_for_level_multiplier",
std::to_string(kChangedMultiplier)}}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
int final_num_nonempty = 0;
for (int level = 1; level < kNumLevels; ++level) {
if (NumTableFilesAtLevel(level) > 0) {
++final_num_nonempty;
}
}
ASSERT_GT(init_num_nonempty, final_num_nonempty);
}

// Disabled because `CompactRange` doesn't work as expected.
TEST_F(DBCompactionTest, DISABLED_DrainUnnecessaryLevelsAfterDBBecomesSmall) {
// When the DB size is smaller, e.g., large chunk of data deleted by
// DeleteRange(), unnecessary levels should to be drained.
const int kBaseLevelBytes = 256 << 10; // 256KB
const int kFileBytes = 64 << 10; // 64KB
const int kMultiplier = 2;
const int kNumFiles = 32;
const int kNumLevels = 5;
const int kValueBytes = 1 << 10; // 1KB
const int kDeleteFileNum = 8;

Options options = CurrentOptions();
options.compression = kNoCompression;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = kBaseLevelBytes;
options.max_bytes_for_level_multiplier = kMultiplier;
options.num_levels = kNumLevels;
Reopen(options);

// Initially we setup the LSM to look roughly as follows:
//
// L0: empty
// L1: 256KB
// ...
// L4: 1MB
Random rnd(301);
for (int file = 0; file < kNumFiles; ++file) {
for (int i = 0; i < kFileBytes / kValueBytes; ++i) {
ASSERT_OK(Put(Key(file * kFileBytes / kValueBytes + i),
rnd.RandomString(kValueBytes)));
}
ASSERT_OK(Flush());
if (file == kDeleteFileNum) {
// Ensure the DeleteRange() call below only delete data from last level
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(NumTableFilesAtLevel(kNumLevels - 1), kDeleteFileNum + 1);
}
}

int init_num_nonempty = 0;
ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int level = 1; level < kNumLevels; ++level) {
if (NumTableFilesAtLevel(level) > 0) {
++init_num_nonempty;
}
}

// Disable auto compaction CompactRange() below
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}}));
// Delete keys within first (kDeleteFileNum + 1) files' key ranges.
// This should reduce DB size enough such that there is now
// an unneeded level.
std::string begin = Key(0);
std::string end = Key(kDeleteFileNum * kFileBytes / kValueBytes);
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), begin, end));
Slice begin_slice = begin;
Slice end_slice = end;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &begin_slice, &end_slice));
int after_delete_range_nonempty = 0;
for (int level = 1; level < kNumLevels; ++level) {
if (NumTableFilesAtLevel(level) > 0) {
++after_delete_range_nonempty;
}
}
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
int final_num_nonempty = 0;
for (int level = 1; level < kNumLevels; ++level) {
if (NumTableFilesAtLevel(level) > 0) {
++final_num_nonempty;
}
}
ASSERT_GE(init_num_nonempty, after_delete_range_nonempty);
ASSERT_GT(after_delete_range_nonempty, final_num_nonempty);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ Status DBImpl::Recover(
!cfd->GetLatestMutableCFOptions()->disable_auto_compactions) {
int to_level = cfd->ioptions()->num_levels - 1;
// last level is reserved
// allow_ingest_behind does not support Level Compaction.
if (cfd->ioptions()->allow_ingest_behind) {
to_level -= 1;
}
Expand Down
4 changes: 2 additions & 2 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_no_slowdown_func);
}
// Sleep for 2s to allow the threads to insert themselves into the
// Sleep for 3s to allow the threads to insert themselves into the
// write queue
env_->SleepForMicroseconds(3000000ULL);
}
Expand Down Expand Up @@ -431,7 +431,7 @@ TEST_F(DBTest, MixedSlowdownOptionsStop) {
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_no_slowdown_func);
}
// Sleep for 2s to allow the threads to insert themselves into the
// Sleep for 3s to allow the threads to insert themselves into the
// write queue
env_->SleepForMicroseconds(3000000ULL);
}
Expand Down
66 changes: 51 additions & 15 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
}
} else {
} else { // level > 0
// Compute the ratio of current size to size limit.
uint64_t level_bytes_no_compacting = 0;
uint64_t level_total_bytes = 0;
Expand All @@ -2831,21 +2831,36 @@ void VersionStorageInfo::ComputeCompactionScore(
level_bytes_no_compacting += f->compensated_file_size;
}
}
if (!immutable_options.level_compaction_dynamic_level_bytes ||
level_bytes_no_compacting < MaxBytesForLevel(level)) {
if (!immutable_options.level_compaction_dynamic_level_bytes) {
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
} else {
// If there are a large mount of data being compacted down to the
// current level soon, we would de-prioritize compaction from
// a level where the incoming data would be a large ratio. We do
// it by dividing level size not by target level size, but
// the target size and the incoming compaction bytes.
score = static_cast<double>(level_bytes_no_compacting) /
(MaxBytesForLevel(level) + total_downcompact_bytes) *
kScoreScale;
if (level_bytes_no_compacting < MaxBytesForLevel(level)) {
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
} else {
// If there are a large mount of data being compacted down to the
// current level soon, we would de-prioritize compaction from
// a level where the incoming data would be a large ratio. We do
// it by dividing level size not by target level size, but
// the target size and the incoming compaction bytes.
score = static_cast<double>(level_bytes_no_compacting) /
(MaxBytesForLevel(level) + total_downcompact_bytes) *
kScoreScale;
}
// Drain unnecessary levels, but with lower priority compared to
// when L0 is eligible. Only non-empty levels can be unnecessary.
// If there is no unnecessary levels, lowest_unnecessary_level_ = -1.
if (level_bytes_no_compacting > 0 &&
level <= lowest_unnecessary_level_) {
score = std::max(
score, kScoreScale *
(1.001 + 0.001 * (lowest_unnecessary_level_ - level)));
}
}
if (level_total_bytes > MaxBytesForLevel(level)) {
if (level <= lowest_unnecessary_level_) {
total_downcompact_bytes += level_total_bytes;
} else if (level_total_bytes > MaxBytesForLevel(level)) {
total_downcompact_bytes +=
static_cast<double>(level_total_bytes - MaxBytesForLevel(level));
}
Expand Down Expand Up @@ -3758,6 +3773,7 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
}
}
} else {
assert(ioptions.compaction_style == kCompactionStyleLevel);
uint64_t max_level_size = 0;

int first_non_empty_level = -1;
Expand All @@ -3782,11 +3798,13 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
}

lowest_unnecessary_level_ = -1;
if (max_level_size == 0) {
// No data for L1 and up. L0 compacts to last level directly.
// No compaction from L1+ needs to be scheduled.
base_level_ = num_levels_ - 1;
} else {
assert(first_non_empty_level >= 1);
uint64_t base_bytes_max = options.max_bytes_for_level_base;
uint64_t base_bytes_min = static_cast<uint64_t>(
base_bytes_max / options.max_bytes_for_level_multiplier);
Expand All @@ -3797,20 +3815,38 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
// Round up after dividing
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
if (lowest_unnecessary_level_ == -1 &&
cur_level_size <= base_bytes_min) {
// When per_key_placement is enabled, the penultimate level is
// necessary.
lowest_unnecessary_level_ = i;
}
}

// Calculate base level and its size.
uint64_t base_level_size;
if (cur_level_size <= base_bytes_min) {
// If per_key_placement is not enabled,
// either there is only one non-empty level after level 0,
// which can less than base_bytes_min AND necessary,
// or there is some unnecessary level.
assert(first_non_empty_level == num_levels_ - 1 ||
lowest_unnecessary_level_ != -1);
// Case 1. If we make target size of last level to be max_level_size,
// target size of the first non-empty level would be smaller than
// base_bytes_min. We set it be base_bytes_min.
base_level_size = base_bytes_min + 1U;
base_level_ = first_non_empty_level;
ROCKS_LOG_INFO(ioptions.logger,
"More existing levels in DB than needed. "
"max_bytes_for_level_multiplier may not be guaranteed.");
if (base_level_ < num_levels_ - 1) {
ROCKS_LOG_INFO(
ioptions.logger,
"More existing levels in DB than needed: all non-zero "
"levels <= level %d are unnecessary. "
"max_bytes_for_level_multiplier may not be guaranteed.",
lowest_unnecessary_level_);
}
} else {
assert(lowest_unnecessary_level_ == -1);
// Find base level (where L0 data is compacted to).
base_level_ = first_non_empty_level;
while (base_level_ > 1 && cur_level_size > base_bytes_max) {
Expand Down
6 changes: 6 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ class VersionStorageInfo {
// be empty. -1 if it is not level-compaction so it's not applicable.
int base_level_;

// Applies to level compaction when
// `level_compaction_dynamic_level_bytes=true`. All non-empty levels <=
// lowest_unnecessary_level_ are not needed and will be drained automatically.
// -1 if there is no unnecessary level,
int lowest_unnecessary_level_;

double level_multiplier_;

// A list for the same set of files that are stored in files_,
Expand Down
41 changes: 41 additions & 0 deletions db/version_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ class VersionStorageInfoTestBase : public testing::Test {
}
return result;
}

void UpdateVersionStorageInfo() {
vstorage_.UpdateFilesByCompactionPri(ioptions_, mutable_cf_options_);
vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateFileIndexer();
vstorage_.GenerateLevelFilesBrief();
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_.GenerateLevel0NonOverlapping();
vstorage_.SetFinalized();
}
};

class VersionStorageInfoTest : public VersionStorageInfoTestBase {
Expand Down Expand Up @@ -404,6 +414,37 @@ TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
ASSERT_EQ(4, vstorage_.CompactionScoreLevel(2));
}

TEST_F(VersionStorageInfoTest, DrainUnnecessaryLevel) {
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.max_bytes_for_level_base = 1000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;

// Create a few unnecessary levels.
// See if score is calculated correctly.
Add(5, 1U, "1", "2", 2000U); // target size 1010000
Add(4, 2U, "1", "2", 200U); // target size 101000
// Unnecessary levels
Add(3, 3U, "1", "2", 100U); // target size 10100
// Level 2: target size 1010
Add(1, 4U, "1", "2",
10U); // target size 1000 = max(base_bytes_min + 1, base_bytes_max)

UpdateVersionStorageInfo();

ASSERT_EQ(1, vstorage_.base_level());
ASSERT_EQ(1000, vstorage_.MaxBytesForLevel(1));
ASSERT_EQ(10100, vstorage_.MaxBytesForLevel(3));
vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_);

// Tests that levels 1 and 3 are eligible for compaction.
// Levels 1 and 3 are much smaller than target size,
// so size does not contribute to a high compaction score.
ASSERT_EQ(1, vstorage_.CompactionScoreLevel(0));
ASSERT_GT(vstorage_.CompactionScore(0), 10);
ASSERT_EQ(3, vstorage_.CompactionScoreLevel(1));
ASSERT_GT(vstorage_.CompactionScore(1), 10);
}

TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
// Test whether the overlaps are detected as expected
Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
Expand Down
Loading

0 comments on commit ccca9a6

Please sign in to comment.