Skip to content

Commit

Permalink
Range deletion performance improvements + cleanup (#4014)
Browse files Browse the repository at this point in the history
Summary:
This fixes the same performance issue that #3992 fixes but with much more invasive cleanup.

I'm more excited about this PR because it paves the way for fixing another problem we uncovered at Cockroach where range deletion tombstones can cause massive compactions. For example, suppose L4 contains deletions from [a, c) and [x, z) and no other keys, and L5 is entirely empty. L6, however, is full of data. When compacting L4 -> L5, we'll end up with one file that spans, massively, from [a, z). When we go to compact L5 -> L6, we'll have to rewrite all of L6! If, instead of range deletions in L4, we had keys a, b, x, y, and z, RocksDB would have been smart enough to create two files in L5: one for a and b and another for x, y, and z.

With the changes in this PR, it will be possible to adjust the compaction logic to split tombstones/start new output files when they would span too many files in the grandparent level.

ajkr please take a look when you have a minute!
Pull Request resolved: #4014

Differential Revision: D8773253

Pulled By: ajkr

fbshipit-source-id: ec62fa85f648fdebe1380b83ed997f9baec35677
  • Loading branch information
benesch authored and facebook-github-bot committed Jul 12, 2018
1 parent 121e321 commit 5f3088d
Show file tree
Hide file tree
Showing 10 changed files with 788 additions and 586 deletions.
13 changes: 9 additions & 4 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status BuildTable(
#endif // !ROCKSDB_LITE
TableProperties tp;

if (iter->Valid() || range_del_agg->ShouldAddTombstones()) {
if (iter->Valid() || !range_del_agg->IsEmpty()) {
TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer;
{
Expand Down Expand Up @@ -156,9 +156,14 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
// nullptr for table_{min,max} so all range tombstones will be flushed
range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */,
nullptr /* upper_bound */, meta);

for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}

// Finish and check for builder errors
tp = builder->GetTableProperties();
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ void CompactionIterator::NextFromInput() {
// 1. new user key -OR-
// 2. different snapshot stripe
bool should_delete = range_del_agg_->ShouldDelete(
key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal);
key_, RangeDelPositioningMode::kForwardTraversal);
if (should_delete) {
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_range_del;
Expand Down
75 changes: 69 additions & 6 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

if (status.ok() && sub_compact->builder == nullptr &&
sub_compact->outputs.size() == 0 &&
range_del_agg->ShouldAddTombstones(bottommost_level_)) {
sub_compact->outputs.size() == 0) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}
Expand Down Expand Up @@ -1164,6 +1163,9 @@ Status CompactionJob::FinishCompactionOutputFile(
uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
assert(output_number != 0);

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
const Comparator* ucmp = cfd->user_comparator();

// Check for iterator errors
Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
Expand Down Expand Up @@ -1196,9 +1198,71 @@ Status CompactionJob::FinishCompactionOutputFile(
// subcompaction ends.
upper_bound = sub_compact->end;
}
range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound,
upper_bound, meta, range_del_out_stats,
bottommost_level_);
auto earliest_snapshot = kMaxSequenceNumber;
if (existing_snapshots_.size() > 0) {
earliest_snapshot = existing_snapshots_[0];
}
auto it = range_del_agg->NewIterator();
if (lower_bound != nullptr) {
it->Seek(*lower_bound);
}
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
// Tombstones starting at upper_bound or later only need to be included
// in the next table. Break because subsequent tombstones will start
// even later.
break;
}

if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
// TODO(andrewkr): tombstones that span multiple output files are
// counted for each compaction output file, so lots of double counting.
range_del_out_stats->num_range_del_drop_obsolete++;
range_del_out_stats->num_record_drop_obsolete++;
continue;
}

auto kv = tombstone.Serialize();
sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
// Pretend the smallest key has the same user key as lower_bound
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
//
// Choose lowest seqnum so this file's smallest internal key comes
// after the previous file's/subcompaction's largest. The fake seqnum
// is OK because the read path's file-picking code only considers user
// key.
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
}
InternalKey largest_candidate = tombstone.SerializeEndKey();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
// Pretend the largest key has the same user key as upper_bound (the
// min key in the following table or subcompaction) in order for files
// to appear key-space partitioned.
//
// Choose highest seqnum so this file's largest internal key comes
// before the next file's/subcompaction's smallest. The fake seqnum is
// OK because the read path's file-picking code only considers the user
// key portion.
//
// Note Seek() also creates InternalKey with (user_key,
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key.
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
tombstone.seq_,
cfd->internal_comparator());
}
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
Expand Down Expand Up @@ -1247,7 +1311,6 @@ Status CompactionJob::FinishCompactionOutputFile(
return s;
}

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
// Output to event logger and fire events.
sub_compact->current_output()->table_properties =
Expand Down
28 changes: 11 additions & 17 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
Expand Down Expand Up @@ -549,8 +548,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
Expand Down Expand Up @@ -657,8 +655,7 @@ bool DBIter::MergeValuesNewToOld() {
break;
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey, RangeDelPositioningMode::kForwardTraversal)) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
Expand Down Expand Up @@ -916,8 +913,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
case kTypeBlobIndex:
if (range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
last_key_entry_type = kTypeRangeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
Expand All @@ -935,8 +931,7 @@ bool DBIter::FindValueForCurrentKey() {
break;
case kTypeMerge:
if (range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
merge_context_.Clear();
last_key_entry_type = kTypeRangeDeletion;
last_not_merge_type = last_key_entry_type;
Expand Down Expand Up @@ -1069,7 +1064,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {

if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
valid_ = false;
return true;
}
Expand Down Expand Up @@ -1114,8 +1109,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {

if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
break;
} else if (ikey.type == kTypeValue) {
const Slice val = iter_->value();
Expand Down Expand Up @@ -1283,7 +1277,7 @@ void DBIter::Seek(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->Seek(saved_key_.GetInternalKey());
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
Expand Down Expand Up @@ -1334,7 +1328,7 @@ void DBIter::SeekForPrev(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekForPrev(saved_key_.GetInternalKey());
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}

RecordTick(statistics_, NUMBER_DB_SEEK);
Expand Down Expand Up @@ -1383,7 +1377,7 @@ void DBIter::SeekToFirst() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToFirst();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}

RecordTick(statistics_, NUMBER_DB_SEEK);
Expand Down Expand Up @@ -1434,7 +1428,7 @@ void DBIter::SeekToLast() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToLast();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
PrevInternal();
if (statistics_ != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,12 @@ TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {
Reopen(opts);

db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr1"); // obsolete after compaction
"dr10"); // obsolete after compaction
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2",
"dr2"); // protected by snapshot
"dr20"); // protected by snapshot
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());

Expand Down
3 changes: 1 addition & 2 deletions db/merge_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
range_del_agg != nullptr &&
range_del_agg->ShouldDelete(
iter->key(),
RangeDelAggregator::RangePositioningMode::kForwardTraversal)) {
iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
filter = CompactionFilter::Decision::kRemove;
}
if (filter == CompactionFilter::Decision::kKeep ||
Expand Down
Loading

0 comments on commit 5f3088d

Please sign in to comment.