Skip to content

Commit

Permalink
Support Iterator::Refresh(snapshot) with range tombstones.
Browse files Browse the repository at this point in the history
Add Iterator::Refresh(snapshot) to the same snapshot to stress test.
  • Loading branch information
cbi42 committed Aug 17, 2023
1 parent ddb5ed4 commit bda87ac
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 57 deletions.
24 changes: 13 additions & 11 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void ArenaWrappedDBIter::Init(

Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }

Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
Expand All @@ -73,6 +73,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
read_options_.snapshot = snapshot;
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
auto reinit_internal_iter = [&]() {
Expand All @@ -82,19 +86,18 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = GetSeqNum(db_impl_, snap);
// TODO: understand read_callback_
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
if (read_callback_) {
read_callback_->Refresh(latest_seq);
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
sv->current, latest_seq,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, latest_seq,
read_options_, cfd_, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
Expand All @@ -103,13 +106,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
reinit_internal_iter();
break;
} else {
SequenceNumber latest_seq = GetSeqNum(db_impl_, snap);
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, latest_seq, false /* immutable_memtable */);
read_options_, read_seq, false /* immutable_memtable */);
if (!t || t->empty()) {
// If memtable_range_tombstone_iter_ points to a non-empty tombstone
// iterator, then it means sv->mem is not the memtable that
Expand Down Expand Up @@ -139,9 +142,6 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
}
// Refresh latest sequence number
db_iter_->set_sequence(latest_seq);
db_iter_->set_valid(false);
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
Expand All @@ -150,6 +150,8 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) {
cur_sv_number = latest_sv_number;
continue;
}
db_iter_->set_sequence(read_seq);
db_iter_->set_valid(false);
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class DBIter final : public Iterator {
if (read_callback_) {
read_callback_->Refresh(s);
}
iter_.SetRangeDelReadSeqno(s);
}
void set_valid(bool v) { valid_ = v; }

Expand Down
94 changes: 76 additions & 18 deletions db/db_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2418,34 +2418,92 @@ TEST_P(DBIteratorTest, Refresh) {
}

TEST_P(DBIteratorTest, RefreshWithSnapshot) {
ASSERT_OK(Put("x", "y"));
// L1 file, uses LevelIterator internally
ASSERT_OK(Put(Key(0), "val0"));
ASSERT_OK(Put(Key(5), "val5"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

// L0 file, uses table iterator internally
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(4), "val4"));
ASSERT_OK(Flush());

// Memtable
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(3), "val3"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(Put(Key(2), "new val"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(7)));
const Snapshot* snapshot2 = db_->GetSnapshot();

ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(0));

ReadOptions options;
options.snapshot = snapshot;
Iterator* iter = NewIterator(options);
ASSERT_OK(Put(Key(6), "val6"));
ASSERT_OK(iter->status());

iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
auto verify_iter = [&](int start, int end, bool new_key2 = false) {
for (int i = start; i < end; ++i) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
if (i == 2 && new_key2) {
ASSERT_EQ(iter->value(), "new val");
} else {
ASSERT_EQ(iter->value(), "val" + std::to_string(i));
}
iter->Next();
}
};

ASSERT_OK(Put("c", "d"));
for (int j = 0; j < 2; j++) {
iter->Seek(Key(1));
verify_iter(1, 3);
// Refresh to same snapshot
iter->Refresh(snapshot);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to a newer snapshot
iter->Refresh(snapshot2);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->SeekToFirst();
verify_iter(0, 4, /*new_key2=*/true);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to an older snapshot
iter->Refresh(snapshot);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to no snapshot
iter->Refresh();
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(2));
verify_iter(2, 4, /*new_key2=*/true);
verify_iter(6, 7);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Change LSM shape, new SuperVersion is created.
ASSERT_OK(Flush());

iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
// Refresh back to original snapshot
iter->Refresh(snapshot);
}

ASSERT_OK(iter->status());
Status s = iter->Refresh();
ASSERT_TRUE(s.ok());
s = iter->Refresh(snapshot);
ASSERT_TRUE(s.ok());
db_->ReleaseSnapshot(snapshot);
delete iter;
db_->ReleaseSnapshot(snapshot);
db_->ReleaseSnapshot(snapshot2);
ASSERT_OK(db_->Close());
}

TEST_P(DBIteratorTest, CreationFailure) {
Expand Down
48 changes: 48 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3511,6 +3511,54 @@ TEST_F(DBRangeDelTest, MemtableMaxRangeDeletions) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(3, NumTableFilesAtLevel(0));
}

TEST_F(DBRangeDelTest, ReleaseSnapshotAfterIteratorCreation) {
// Test that range tombstone code path in LevelIterator
// does access ReadOptions::snapshot after Iterator creation.
//
// Put some data in L2 so that range tombstone in L1 will not be dropped.
ASSERT_OK(Put(Key(0), "v"));
ASSERT_OK(Put(Key(100), "v"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);

// two L1 file with range del
ASSERT_OK(Put(Key(1), "v"));
ASSERT_OK(Put(Key(2), "v"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(4)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

ASSERT_OK(Put(Key(5), "v"));
ASSERT_OK(Put(Key(6), "v"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5),
Key(6)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

ASSERT_EQ(2, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));

const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions ro;
ro.snapshot = snapshot;

Iterator* iter = db_->NewIterator(ro);
db_->ReleaseSnapshot(snapshot);

iter->Seek(Key(1));
std::vector<int> expected_keys{1, 2, 6, 100};
for (int i : expected_keys) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
iter->Next();
}
ASSERT_TRUE(!iter->Valid() && iter->status().ok());

delete iter;
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
4 changes: 4 additions & 0 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class TruncatedRangeDelIterator {
const InternalKeyComparator* icmp, const InternalKey* smallest,
const InternalKey* largest);

void SetRangeDelReadSeqno(SequenceNumber read_seqno) {
iter_->SetRangeDelReadSeqno(read_seqno);
}

bool Valid() const;

void Next() { iter_->TopNext(); }
Expand Down
4 changes: 4 additions & 0 deletions db/range_tombstone_fragmenter.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);

void SetRangeDelReadSeqno(SequenceNumber read_seqno) override {
upper_bound_ = read_seqno;
}

void SeekToFirst() override;
void SeekToLast() override;

Expand Down
6 changes: 4 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ InternalIterator* TableCache::NewIterator(
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
uint8_t block_protection_bytes_per_key,
uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno,
TruncatedRangeDelIterator** range_del_iter) {
PERF_TIMER_GUARD(new_table_iterator_nanos);

Expand Down Expand Up @@ -269,7 +269,9 @@ InternalIterator* TableCache::NewIterator(
if (s.ok() && !options.ignore_range_deletions) {
if (range_del_iter != nullptr) {
auto new_range_del_iter =
table_reader->NewRangeTombstoneIterator(options);
read_seqno ? table_reader->NewRangeTombstoneIterator(
*read_seqno, options.timestamp)
: table_reader->NewRangeTombstoneIterator(options);
if (new_range_del_iter == nullptr || new_range_del_iter->empty()) {
delete new_range_del_iter;
*range_del_iter = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class TableCache {
// not cached), depending on the CF options
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
// @param range_del_read_seqno If non-nullptr, will be used to create
// *range_del_iter.
InternalIterator* NewIterator(
const ReadOptions& options, const FileOptions& toptions,
const InternalKeyComparator& internal_comparator,
Expand All @@ -97,6 +99,7 @@ class TableCache {
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
uint8_t protection_bytes_per_key,
const SequenceNumber* range_del_read_seqno = nullptr,
TruncatedRangeDelIterator** range_del_iter = nullptr);

// If a seek to internal key "k" in specified file finds an entry,
Expand Down
Loading

0 comments on commit bda87ac

Please sign in to comment.