Skip to content

Commit

Permalink
Merge pull request #8 from petermattis/pmattis/backport
Browse files Browse the repository at this point in the history
Backport the various range tombstone PRs
  • Loading branch information
petermattis authored Jul 13, 2018
2 parents edd4b23 + d6b0247 commit f91cf6f
Show file tree
Hide file tree
Showing 35 changed files with 1,833 additions and 813 deletions.
7 changes: 7 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# Rocksdb Change Log
## 5.13.crl (7/13/2018)
### Public API Change
* The "rocksdb.num.entries" table property no longer counts range deletion tombstones as entries.

### New Features
* Add a new table property, "rocksdb.num.range-deletions", which counts the number of range deletion tombstones in the table.

## 5.13.4 (6/12/2018)
### Bug Fixes
* Fix regression bug of Prev() with ReadOptions.iterate_upper_bound.
Expand Down
20 changes: 13 additions & 7 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Status BuildTable(
#endif // !ROCKSDB_LITE
TableProperties tp;

if (iter->Valid() || range_del_agg->ShouldAddTombstones()) {
if (iter->Valid() || !range_del_agg->IsEmpty()) {
TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer;
{
Expand Down Expand Up @@ -152,12 +152,18 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
// nullptr for table_{min,max} so all range tombstones will be flushed
range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */,
nullptr /* upper_bound */, meta);

for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}

// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;
tp = builder->GetTableProperties();
bool empty = builder->NumEntries() == 0 && tp.num_range_deletions == 0;
s = c_iter.status();
if (!s.ok() || empty) {
builder->Abandon();
Expand All @@ -170,7 +176,7 @@ Status BuildTable(
meta->fd.file_size = file_size;
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0);
tp = builder->GetTableProperties();
tp = builder->GetTableProperties(); // refresh now that builder is finished
if (table_properties) {
*table_properties = tp;
}
Expand All @@ -194,7 +200,7 @@ Status BuildTable(
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd,
ReadOptions(), env_options, internal_comparator, *meta,
nullptr /* range_del_agg */, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ void CompactionIterator::NextFromInput() {
// 1. new user key -OR-
// 2. different snapshot stripe
bool should_delete = range_del_agg_->ShouldDelete(
key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal);
key_, RangeDelPositioningMode::kForwardTraversal);
if (should_delete) {
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_range_del;
Expand Down
88 changes: 77 additions & 11 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

if (status.ok() && sub_compact->builder == nullptr &&
sub_compact->outputs.size() == 0 &&
range_del_agg->ShouldAddTombstones(bottommost_level_)) {
sub_compact->outputs.size() == 0) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}
Expand Down Expand Up @@ -1048,6 +1047,9 @@ Status CompactionJob::FinishCompactionOutputFile(
uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
assert(output_number != 0);

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
const Comparator* ucmp = cfd->user_comparator();

// Check for iterator errors
Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
Expand Down Expand Up @@ -1079,9 +1081,71 @@ Status CompactionJob::FinishCompactionOutputFile(
// subcompaction ends.
upper_bound = sub_compact->end;
}
range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound,
upper_bound, meta, range_del_out_stats,
bottommost_level_);
auto earliest_snapshot = kMaxSequenceNumber;
if (existing_snapshots_.size() > 0) {
earliest_snapshot = existing_snapshots_[0];
}
auto it = range_del_agg->NewIterator();
if (lower_bound != nullptr) {
it->Seek(*lower_bound);
}
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
// Tombstones starting at upper_bound or later only need to be included
// in the next table. Break because subsequent tombstones will start
// even later.
break;
}

if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
// TODO(andrewkr): tombstones that span multiple output files are
// counted for each compaction output file, so lots of double counting.
range_del_out_stats->num_range_del_drop_obsolete++;
range_del_out_stats->num_record_drop_obsolete++;
continue;
}

auto kv = tombstone.Serialize();
sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
// Pretend the smallest key has the same user key as lower_bound
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
//
// Choose lowest seqnum so this file's smallest internal key comes
// after the previous file's/subcompaction's largest. The fake seqnum
// is OK because the read path's file-picking code only considers user
// key.
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
}
InternalKey largest_candidate = tombstone.SerializeEndKey();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
// Pretend the largest key has the same user key as upper_bound (the
// min key in the following table or subcompaction) in order for files
// to appear key-space partitioned.
//
// Choose highest seqnum so this file's largest internal key comes
// before the next file's/subcompaction's smallest. The fake seqnum is
// OK because the read path's file-picking code only considers the user
// key portion.
//
// Note Seek() also creates InternalKey with (user_key,
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key.
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
tombstone.seq_,
cfd->internal_comparator());
}
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
Expand All @@ -1107,7 +1171,12 @@ Status CompactionJob::FinishCompactionOutputFile(
}
sub_compact->outfile.reset();

if (s.ok() && current_entries == 0) {
TableProperties tp;
if (s.ok()) {
tp = sub_compact->builder->GetTableProperties();
}

if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
// If there is nothing to output, no necessary to generate a sst file.
// This happens when the output level is bottom level, at the same time
// the sub_compact output nothing.
Expand All @@ -1124,17 +1193,15 @@ Status CompactionJob::FinishCompactionOutputFile(
return s;
}

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
TableProperties tp;
if (s.ok() && current_entries > 0) {
if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
ReadOptions(), env_options_, cfd->internal_comparator(), *meta,
nullptr /* range_del_agg */, nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
Expand All @@ -1151,7 +1218,6 @@ Status CompactionJob::FinishCompactionOutputFile(

// Output to event logger and fire events.
if (s.ok()) {
tp = sub_compact->builder->GetTableProperties();
sub_compact->current_output()->table_properties =
std::make_shared<TableProperties>(tp);
ROCKS_LOG_INFO(db_options_.info_log,
Expand Down
35 changes: 15 additions & 20 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
Expand Down Expand Up @@ -523,8 +522,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
Expand Down Expand Up @@ -630,8 +628,7 @@ void DBIter::MergeValuesNewToOld() {
break;
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
ikey, RangeDelPositioningMode::kForwardTraversal)) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
Expand Down Expand Up @@ -716,7 +713,7 @@ void DBIter::ReverseToForward() {
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
}

Expand All @@ -732,7 +729,7 @@ void DBIter::ReverseToBackward() {
// previous key.
if (!iter_->Valid()) {
iter_->SeekToLast();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
Expand Down Expand Up @@ -855,8 +852,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
case kTypeBlobIndex:
if (range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
last_key_entry_type = kTypeRangeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
Expand All @@ -874,8 +870,7 @@ bool DBIter::FindValueForCurrentKey() {
break;
case kTypeMerge:
if (range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
merge_context_.Clear();
last_key_entry_type = kTypeRangeDeletion;
last_not_merge_type = last_key_entry_type;
Expand Down Expand Up @@ -991,7 +986,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {

if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
valid_ = false;
return false;
}
Expand Down Expand Up @@ -1019,7 +1014,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) &&
ikey.type == kTypeMerge &&
!range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
Expand All @@ -1032,7 +1027,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
Expand Down Expand Up @@ -1168,7 +1163,7 @@ void DBIter::Seek(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->Seek(saved_key_.GetInternalKey());
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
Expand Down Expand Up @@ -1218,7 +1213,7 @@ void DBIter::SeekForPrev(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekForPrev(saved_key_.GetInternalKey());
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}

RecordTick(statistics_, NUMBER_DB_SEEK);
Expand Down Expand Up @@ -1266,7 +1261,7 @@ void DBIter::SeekToFirst() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToFirst();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}

RecordTick(statistics_, NUMBER_DB_SEEK);
Expand Down Expand Up @@ -1306,14 +1301,14 @@ void DBIter::SeekToLast() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToLast();
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
}
// When the iterate_upper_bound is set to a value,
// it will seek to the last key before the
// ReadOptions.iterate_upper_bound
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
SeekForPrev(*iterate_upper_bound_);
range_del_agg_.InvalidateTombstoneMapPositions();
range_del_agg_.InvalidateRangeDelMapPositions();
if (!Valid()) {
return;
} else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
Expand Down
Loading

0 comments on commit f91cf6f

Please sign in to comment.