Skip to content

Commit

Permalink
Merge pull request facebook#10984 from cbi42/7.9-iterate-upperbound
Browse files Browse the repository at this point in the history
[7.9] Prevent iterating over range tombstones beyond `iterate_upper_bound`
  • Loading branch information
cbi42 authored Nov 24, 2022
2 parents ed91ed0 + 9399bb8 commit ab9741a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 13 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fixed a memory safety bug when using a SecondaryCache with `block_cache_compressed`. `block_cache_compressed` no longer attempts to use SecondaryCache features.
* Fixed a regression in scan for async_io. During seek, valid buffers were getting cleared causing a regression.
* Tiered Storage: fixed excessive keys written to penultimate level in non-debug builds.
* Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.

### New Features
* Add basic support for user-defined timestamp to Merge (#10819).
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,8 @@ InternalIterator* DBImpl::NewInternalIterator(
MergeIteratorBuilder merge_iter_builder(
&cfd->internal_comparator(), arena,
!read_options.total_order_seek &&
super_version->mutable_cf_options.prefix_extractor != nullptr);
super_version->mutable_cf_options.prefix_extractor != nullptr,
read_options.iterate_upper_bound);
// Collect iterator for mutable memtable
auto mem_iter = super_version->mem->NewIterator(read_options, arena);
Status s;
Expand Down
40 changes: 40 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,46 @@ TEST_F(DBRangeDelTest, RefreshMemtableIter) {
ASSERT_OK(iter->Refresh());
}

TEST_F(DBRangeDelTest, RangeTombstoneRespectIterateUpperBound) {
// Memtable: a, [b, bz)
// Do a Seek on `a` with iterate_upper_bound being az
// range tombstone [b, bz) should not be processed (added to and
// popped from the min_heap in MergingIterator).
Options options = CurrentOptions();
options.disable_auto_compactions = true;
DestroyAndReopen(options);

ASSERT_OK(Put("a", "bar"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "b", "bz"));

// I could not find a cleaner way to test this without relying on
// implementation detail. Tried to test the value of
// `internal_range_del_reseek_count` but that did not work
// since BlockBasedTable iterator becomes !Valid() when point key
// is out of bound and that reseek only happens when a point key
// is covered by some range tombstone.
SyncPoint::GetInstance()->SetCallBack("MergeIterator::PopDeleteRangeStart",
[](void*) {
// there should not be any range
// tombstone in the heap.
FAIL();
});
SyncPoint::GetInstance()->EnableProcessing();

ReadOptions read_opts;
std::string upper_bound = "az";
Slice upper_bound_slice = upper_bound;
read_opts.iterate_upper_bound = &upper_bound_slice;
std::unique_ptr<Iterator> iter{db_->NewIterator(read_opts)};
iter->Seek("a");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), "a");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}

#endif // ROCKSDB_LITE

} // namespace ROCKSDB_NAMESPACE
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ struct ReadOptions {
const Slice* iterate_lower_bound;

// "iterate_upper_bound" defines the extent up to which the forward iterator
// can returns entries. Once the bound is reached, Valid() will be false.
// can return entries. Once the bound is reached, Valid() will be false.
// "iterate_upper_bound" is exclusive ie the bound value is
// not a valid entry. If prefix_extractor is not null:
// 1. If options.auto_prefix_mode = true, iterate_upper_bound will be used
Expand Down
53 changes: 43 additions & 10 deletions table/merging_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,16 @@ class MergingIterator : public InternalIterator {
public:
MergingIterator(const InternalKeyComparator* comparator,
InternalIterator** children, int n, bool is_arena_mode,
bool prefix_seek_mode)
bool prefix_seek_mode,
const Slice* iterate_upper_bound = nullptr)
: is_arena_mode_(is_arena_mode),
prefix_seek_mode_(prefix_seek_mode),
direction_(kForward),
comparator_(comparator),
current_(nullptr),
minHeap_(comparator_),
pinned_iters_mgr_(nullptr) {
pinned_iters_mgr_(nullptr),
iterate_upper_bound_(iterate_upper_bound) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
Expand Down Expand Up @@ -202,11 +204,26 @@ class MergingIterator : public InternalIterator {
assert(!range_tombstone_iters_.empty() &&
range_tombstone_iters_[level]->Valid());
if (start_key) {
pinned_heap_item_[level].SetTombstoneKey(
range_tombstone_iters_[level]->start_key());
ParsedInternalKey pik = range_tombstone_iters_[level]->start_key();
// iterate_upper_bound does not have timestamp
if (iterate_upper_bound_ &&
comparator_->user_comparator()->CompareWithoutTimestamp(
pik.user_key, true /* a_has_ts */, *iterate_upper_bound_,
false /* b_has_ts */) >= 0) {
if (replace_top) {
// replace_top implies this range tombstone iterator is still in
// minHeap_ and at the top.
minHeap_.pop();
}
return;
}
pinned_heap_item_[level].SetTombstoneKey(std::move(pik));
pinned_heap_item_[level].type = HeapItem::DELETE_RANGE_START;
assert(active_.count(level) == 0);
} else {
// allow end key to go over upper bound (if present) since start key is
// before upper bound and the range tombstone could still cover a
// range before upper bound.
pinned_heap_item_[level].SetTombstoneKey(
range_tombstone_iters_[level]->end_key());
pinned_heap_item_[level].type = HeapItem::DELETE_RANGE_END;
Expand Down Expand Up @@ -251,6 +268,7 @@ class MergingIterator : public InternalIterator {
void PopDeleteRangeStart() {
while (!minHeap_.empty() &&
minHeap_.top()->type == HeapItem::DELETE_RANGE_START) {
TEST_SYNC_POINT_CALLBACK("MergeIterator::PopDeleteRangeStart", nullptr);
// insert end key of this range tombstone and updates active_
InsertRangeTombstoneToMinHeap(
minHeap_.top()->level, false /* start_key */, true /* replace_top */);
Expand Down Expand Up @@ -573,6 +591,10 @@ class MergingIterator : public InternalIterator {
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;

// Used to bound range tombstones. For point keys, DBIter and SSTable iterator
// take care of boundary checking.
const Slice* iterate_upper_bound_;

// In forward direction, process a child that is not in the min heap.
// If valid, add to the min heap. Otherwise, check status.
void AddToMinHeapOrCheckStatus(HeapItem*);
Expand Down Expand Up @@ -634,9 +656,19 @@ void MergingIterator::SeekImpl(const Slice& target, size_t starting_level,
for (size_t level = 0; level < starting_level; ++level) {
if (range_tombstone_iters_[level] &&
range_tombstone_iters_[level]->Valid()) {
assert(static_cast<bool>(active_.count(level)) ==
(pinned_heap_item_[level].type == HeapItem::DELETE_RANGE_END));
minHeap_.push(&pinned_heap_item_[level]);
// use an iterator on active_ if performance becomes an issue here
if (active_.count(level) > 0) {
assert(pinned_heap_item_[level].type == HeapItem::DELETE_RANGE_END);
// if it was active, then start key must be within upper_bound,
// so we can add to minHeap_ directly.
minHeap_.push(&pinned_heap_item_[level]);
} else {
// this takes care of checking iterate_upper_bound, but with an extra
// key comparison if range_tombstone_iters_[level] was already out of
// bound. Consider using a new HeapItem type or some flag to remember
// boundary checking result.
InsertRangeTombstoneToMinHeap(level);
}
} else {
assert(!active_.count(level));
}
Expand Down Expand Up @@ -1280,11 +1312,12 @@ InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
}

MergeIteratorBuilder::MergeIteratorBuilder(
const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode,
const Slice* iterate_upper_bound)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter =
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true,
prefix_seek_mode, iterate_upper_bound);
}

MergeIteratorBuilder::~MergeIteratorBuilder() {
Expand Down
3 changes: 2 additions & 1 deletion table/merging_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class MergeIteratorBuilder {
// comparator: the comparator used in merging comparator
// arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder(const InternalKeyComparator* comparator,
Arena* arena, bool prefix_seek_mode = false);
Arena* arena, bool prefix_seek_mode = false,
const Slice* iterate_upper_bound = nullptr);
~MergeIteratorBuilder();

// Add iter to the merging iterator.
Expand Down

0 comments on commit ab9741a

Please sign in to comment.