Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick compaction enhancements and add range check function #346

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
## Behavior Changes
* For track_and_verify_wals_in_manifest, revert to the original behavior before #10087: syncing of live WAL file is not tracked, and we track only the synced sizes of **closed** WALs. (PR #10330).
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table (#7516).
* If `CompactRange()` is called with `CompactRangeOptions::bottommost_level_compaction=kForce*` to compact from L0 to L1, RocksDB now will try to do trivial move from L0 to L1 and then do an intra L1 compaction, instead of a L0 to L1 compaction with trivial move disabled (#11375).
* For Leveled Compaction users, `CompactRange()` will now always try to compact to the last non-empty level. (#11468)
For Leveled Compaction users, `CompactRange()` with `bottommost_level_compaction = BottommostLevelCompaction::kIfHaveCompactionFilter` will behave similar to `kForceOptimized` in that it will skip files created during this manual compaction when compacting files in the bottommost level. (#11468)

## 6.29.5 (03/29/2022)
### Bug Fixes
Expand Down
6 changes: 4 additions & 2 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,10 @@ Compaction* CompactionPicker::CompactRange(

// for BOTTOM LEVEL compaction only, use max_file_num_to_ignore to filter out
// files that are created during the current compaction.
if (compact_range_options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized &&
if ((compact_range_options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized ||
compact_range_options.bottommost_level_compaction ==
BottommostLevelCompaction::kIfHaveCompactionFilter) &&
max_file_num_to_ignore != port::kMaxUint64) {
assert(input_level == output_level);
// inputs_shrunk holds a continuous subset of input files which were all
Expand Down
5 changes: 2 additions & 3 deletions db/db_bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2225,10 +2225,9 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) {
BottommostLevelCompaction::kSkip;
compact_options.change_level = true;
compact_options.target_level = 7;
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsNotSupported());
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));

ASSERT_EQ(trivial_move, 1);
ASSERT_GE(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);

prev_cache_filter_hits = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
Expand Down
2 changes: 1 addition & 1 deletion db/db_compaction_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
}

#ifndef ROCKSDB_LITE
// Compaction filters aplies to all records, regardless snapshots.
// Compaction filters applies to all records, regardless snapshots.
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
std::string five = ToString(5);
Options options = CurrentOptions();
Expand Down
67 changes: 38 additions & 29 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ class DBCompactionTestWithParam

class DBCompactionTestWithBottommostParam
: public DBTestBase,
public testing::WithParamInterface<BottommostLevelCompaction> {
public testing::WithParamInterface<
std::tuple<BottommostLevelCompaction, bool>> {
public:
DBCompactionTestWithBottommostParam()
: DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
bottommost_level_compaction_ = GetParam();
bottommost_level_compaction_ = std::get<0>(GetParam());
}

BottommostLevelCompaction bottommost_level_compaction_;
Expand Down Expand Up @@ -5625,6 +5626,9 @@ TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
constexpr int kSstNum = 10;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
const bool dynamic_level = std::get<1>(GetParam());
options.level_compaction_dynamic_level_bytes = dynamic_level;
DestroyAndReopen(options);

// Generate some sst files on level 0 with sequence keys (no overlap)
Expand All @@ -5642,25 +5646,42 @@ TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {

auto cro = CompactRangeOptions();
cro.bottommost_level_compaction = bottommost_level_compaction_;
bool trivial_moved = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* /*arg*/) { trivial_moved = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// All bottommost_level_compaction options should allow l0 -> l1 trivial move.
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_TRUE(trivial_moved);
if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce ||
bottommost_level_compaction_ ==
BottommostLevelCompaction::kForceOptimized) {
// Real compaction to compact all sst files from level 0 to 1 file on level
// 1
ASSERT_EQ("0,1", FilesPerLevel(0));
// bottommost level should go through intra-level compaction
// and has only 1 file
if (dynamic_level) {
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0));
} else {
ASSERT_EQ("0,1", FilesPerLevel(0));
}
} else {
// Just trivial move from level 0 -> 1
ASSERT_EQ("0," + ToString(kSstNum), FilesPerLevel(0));
// Just trivial move from level 0 -> 1/base
if (dynamic_level) {
ASSERT_EQ("0,0,0,0,0,0," + std::to_string(kSstNum), FilesPerLevel(0));
} else {
ASSERT_EQ("0," + std::to_string(kSstNum), FilesPerLevel(0));
}
}
}

INSTANTIATE_TEST_CASE_P(
DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam,
::testing::Values(BottommostLevelCompaction::kSkip,
BottommostLevelCompaction::kIfHaveCompactionFilter,
BottommostLevelCompaction::kForce,
BottommostLevelCompaction::kForceOptimized));
::testing::Combine(
::testing::Values(BottommostLevelCompaction::kSkip,
BottommostLevelCompaction::kIfHaveCompactionFilter,
BottommostLevelCompaction::kForce,
BottommostLevelCompaction::kForceOptimized),
::testing::Bool()));

TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) {
Options options = CurrentOptions();
Expand Down Expand Up @@ -5953,26 +5974,14 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
auto start_idx = key_idx;
GenerateNewFile(&rnd, &key_idx);
GenerateNewFile(&rnd, &key_idx);
auto end_idx = key_idx - 1;
ASSERT_EQ("1,1,2", FilesPerLevel(0));

// Next two CompactRange() calls are used to test exercise error paths within
// RefitLevel() before triggering a valid RefitLevel() call

// Trigger a refit to L1 first
{
std::string begin_string = Key(start_idx);
std::string end_string = Key(end_idx);
Slice begin(begin_string);
Slice end(end_string);

CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 1;
ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end));
}
ASSERT_EQ("0,3,2", FilesPerLevel(0));
MoveFilesToLevel(1);
ASSERT_EQ("0,2,2", FilesPerLevel(0));

// The next CompactRange() call is used to test exercise error paths within
// RefitLevel() before triggering a valid RefitLevel() call
//
// Try a refit from L2->L1 - this should fail and exercise error paths in
// RefitLevel()
{
Expand All @@ -5987,7 +5996,7 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
cro.target_level = 1;
ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end));
}
ASSERT_EQ("0,3,2", FilesPerLevel(0));
ASSERT_EQ("0,2,2", FilesPerLevel(0));

// Try a valid Refit request to ensure, the path is still working
{
Expand Down
7 changes: 6 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,16 @@ class DBImpl : public DB {
// max_file_num_to_ignore allows bottom level compaction to filter out newly
// compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
// disable the filtering
// If `final_output_level` is not nullptr, it is set to manual compaction's
// output level if returned status is OK, and it may or may not be set to
// manual compaction's output level if returned status is not OK.
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level,
const CompactRangeOptions& compact_range_options,
const Slice* begin, const Slice* end,
bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore,
int* final_output_level = nullptr);

// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
Expand Down Expand Up @@ -965,6 +969,7 @@ class DBImpl : public DB {

Status MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) override;
Status CheckInRange(const Slice* begin, const Slice* end) override;

static IOStatus CreateAndNewDirectory(
FileSystem* fs, const std::string& dirname,
Expand Down
160 changes: 97 additions & 63 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,9 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
}
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
final_output_level, options, begin, end, exclusive,
false, port::kMaxUint64);
false /* disable_trivial_move */, port::kMaxUint64);
} else {
int first_overlapped_level = kInvalidLevel;
int max_overlapped_level = kInvalidLevel;
{
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* current_version = super_version->current;
Expand All @@ -1108,79 +1107,102 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
begin, end);
}
if (overlap) {
if (first_overlapped_level == kInvalidLevel) {
first_overlapped_level = level;
}
max_overlapped_level = level;
first_overlapped_level = level;
break;
}
}
CleanupSuperVersion(super_version);
}
if (s.ok() && first_overlapped_level != kInvalidLevel) {
// max_file_num_to_ignore can be used to filter out newly created SST
// files, useful for bottom level compaction in a manual compaction
uint64_t max_file_num_to_ignore = port::kMaxUint64;
uint64_t next_file_number = versions_->current_next_file_number();
final_output_level = max_overlapped_level;
int output_level;
for (int level = first_overlapped_level; level <= max_overlapped_level;
level++) {
bool disallow_trivial_move = false;
// in case the compaction is universal or if we're compacting the
// bottom-most level, the output level will be the same as input one.
// level 0 can never be the bottommost level (i.e. if all files are in
// level 0, we will compact to level 1)
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
output_level = level;
} else if (level == max_overlapped_level && level > 0) {
if (options.bottommost_level_compaction ==
BottommostLevelCompaction::kSkip) {
// Skip bottommost level compaction
continue;
} else if (options.bottommost_level_compaction ==
BottommostLevelCompaction::kIfHaveCompactionFilter &&
cfd->ioptions()->compaction_filter == nullptr &&
cfd->ioptions()->compaction_filter_factory == nullptr) {
// Skip bottommost level compaction since we don't have a compaction
// filter
continue;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
assert(first_overlapped_level == 0);
s = RunManualCompaction(
cfd, first_overlapped_level, first_overlapped_level, options, begin,
end, exclusive, true /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */);
final_output_level = first_overlapped_level;
} else {
assert(cfd->ioptions()->compaction_style == kCompactionStyleLevel);
uint64_t next_file_number = versions_->current_next_file_number();
// Start compaction from `first_overlapped_level`, one level down at a
// time, until output level >= max_overlapped_level.
// When max_overlapped_level == 0, we will still compact from L0 -> L1
// (or LBase), and followed by a bottommost level intra-level compaction
// at L1 (or LBase), if applicable.
int level = first_overlapped_level;
final_output_level = level;
int output_level = 0, base_level = 0;
for (;;) {
// Always allow L0 -> L1 compaction
if (level > 0) {
if (cfd->ioptions()->level_compaction_dynamic_level_bytes) {
assert(final_output_level < cfd->ioptions()->num_levels);
if (final_output_level + 1 == cfd->ioptions()->num_levels) {
break;
}
} else {
// TODO(cbi): there is still a race condition here where
Copy link

@tonyxuqqi tonyxuqqi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this still cause the source region's trim to not clean all the data beyond its range?
What if we always set final_output_level to last level (6)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for dynamic_level_bytes=false, using 6 will move all files to L6 which makes write amp much larger for smaller dataset.

// if a background compaction compacts some file beyond
// current()->storage_info()->num_non_empty_levels() right after
// the check here.This should happen very infrequently and should
// not happen once a user populates the last level of the LSM.
InstrumentedMutexLock l(&mutex_);
// num_non_empty_levels may be lower after a compaction, so
// we check for >= here.
if (final_output_level + 1 >=
cfd->current()->storage_info()->num_non_empty_levels()) {
break;
}
}
}
output_level = level;
// update max_file_num_to_ignore only for bottom level compaction
// because data in newly compacted files in middle levels may still
// need to be pushed down
max_file_num_to_ignore = next_file_number;
} else {
output_level = level + 1;
if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
cfd->ioptions()->level_compaction_dynamic_level_bytes &&
if (cfd->ioptions()->level_compaction_dynamic_level_bytes &&
level == 0) {
output_level = ColumnFamilyData::kCompactToBaseLevel;
}
// if it's a BottommostLevel compaction and `kForce*` compaction is
// set, disallow trivial move
if (level == max_overlapped_level &&
(options.bottommost_level_compaction ==
BottommostLevelCompaction::kForce ||
options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized)) {
disallow_trivial_move = true;
// Use max value for `max_file_num_to_ignore` to always compact
// files down.
s = RunManualCompaction(
cfd, level, output_level, options, begin, end, exclusive,
false /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */,
output_level == ColumnFamilyData::kCompactToBaseLevel
? &base_level
: nullptr);
if (!s.ok()) {
break;
}
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
assert(base_level > 0);
level = base_level;
} else {
++level;
}
final_output_level = level;
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
}
s = RunManualCompaction(cfd, level, output_level, options, begin, end,
exclusive, disallow_trivial_move,
max_file_num_to_ignore);
if (!s.ok()) {
break;
}
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
final_output_level = cfd->NumberLevels() - 1;
} else if (output_level > final_output_level) {
final_output_level = output_level;
if (s.ok()) {
assert(final_output_level > 0);
// bottommost level intra-level compaction
if ((options.bottommost_level_compaction ==
BottommostLevelCompaction::kIfHaveCompactionFilter &&
(cfd->ioptions()->compaction_filter != nullptr ||
cfd->ioptions()->compaction_filter_factory != nullptr)) ||
options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized ||
options.bottommost_level_compaction ==
BottommostLevelCompaction::kForce) {
// Use `next_file_number` as `max_file_num_to_ignore` to avoid
// rewriting newly compacted files when it is kForceOptimized
// or kIfHaveCompactionFilter with compaction filter set.
s = RunManualCompaction(
cfd, final_output_level, final_output_level, options, begin,
end, exclusive, true /* disallow_trivial_move */,
next_file_number /* max_file_num_to_ignore */);
}
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
}
}
}
Expand Down Expand Up @@ -1798,7 +1820,7 @@ Status DBImpl::RunManualCompaction(
ColumnFamilyData* cfd, int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const Slice* begin,
const Slice* end, bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, int* final_output_level) {
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0);

Expand Down Expand Up @@ -1942,6 +1964,15 @@ Status DBImpl::RunManualCompaction(
} else if (!scheduled) {
if (compaction == nullptr) {
manual.done = true;
if (final_output_level) {
// No compaction needed or there is a conflicting compaction.
// Still set `final_output_level` to the level where we would
// have compacted to.
*final_output_level = output_level;
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
*final_output_level = cfd->current()->storage_info()->base_level();
}
}
bg_cv_.SignalAll();
continue;
}
Expand Down Expand Up @@ -1975,6 +2006,9 @@ Status DBImpl::RunManualCompaction(
}
scheduled = true;
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
if (final_output_level) {
*final_output_level = compaction->output_level();
}
}
}

Expand Down
Loading