From bda87acf77c68385f8178c0810b4b004edb1d1e7 Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Thu, 17 Aug 2023 13:59:44 -0700 Subject: [PATCH] Support Iterator::Refresh(snapshot) with range tombstones. Add Iterator::Refresh(snapshot) to the same snapshot to stress test. --- db/arena_wrapped_db_iter.cc | 24 ++--- db/db_iter.h | 1 + db/db_iterator_test.cc | 94 +++++++++++++++---- db/db_range_del_test.cc | 48 ++++++++++ db/range_del_aggregator.h | 4 + db/range_tombstone_fragmenter.h | 4 + db/table_cache.cc | 6 +- db/table_cache.h | 3 + db/version_set.cc | 53 ++++++----- db_stress_tool/db_stress_test_base.cc | 8 ++ table/block_based/block_based_table_reader.cc | 10 ++ table/block_based/block_based_table_reader.h | 3 + table/internal_iterator.h | 11 +++ table/iterator_wrapper.h | 5 + table/merging_iterator.cc | 12 +++ table/table_reader.h | 6 ++ .../new_features/iterator-refresh-snapshot.md | 1 + .../transactions/write_prepared_txn_db.cc | 2 +- .../transactions/write_unprepared_txn_db.cc | 2 +- 19 files changed, 240 insertions(+), 57 deletions(-) create mode 100644 unreleased_history/new_features/iterator-refresh-snapshot.md diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 728d9f161468..e1c271bf6565 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -64,7 +64,7 @@ void ArenaWrappedDBIter::Init( Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); } -Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { +Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { return Status::NotSupported("Creating renew iterator is not allowed."); } @@ -73,6 +73,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { // correct behavior. Will be corrected automatically when we take a snapshot // here for the case of WritePreparedTxnDB. uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + // If we recreate a new internal iterator below (NewInternalIterator()), + // we will pass in read_options_. We need to make sure it + // has the right snapshot. + read_options_.snapshot = snapshot; TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); auto reinit_internal_iter = [&]() { @@ -82,19 +86,18 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { new (&arena_) Arena(); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); - SequenceNumber latest_seq = GetSeqNum(db_impl_, snap); - // TODO: understand read_callback_ + SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); if (read_callback_) { - read_callback_->Refresh(latest_seq); + read_callback_->Refresh(read_seq); } Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, - sv->current, latest_seq, + sv->current, read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_, allow_refresh_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( - read_options_, cfd_, sv, &arena_, latest_seq, + read_options_, cfd_, sv, &arena_, read_seq, /* allow_unprepared_value */ true, /* db_iter */ this); SetIterUnderDBIter(internal_iter); }; @@ -103,13 +106,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { reinit_internal_iter(); break; } else { - SequenceNumber latest_seq = GetSeqNum(db_impl_, snap); + SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); // Refresh range-tombstones in MemTable if (!read_options_.ignore_range_deletions) { SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_); TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr); auto t = sv->mem->NewRangeTombstoneIterator( - read_options_, latest_seq, false /* immutable_memtable */); + read_options_, read_seq, false /* immutable_memtable */); if (!t || t->empty()) { // If memtable_range_tombstone_iter_ points to a non-empty tombstone // iterator, then it means sv->mem is not the memtable that @@ -139,9 +142,6 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { } db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); } - // Refresh latest sequence number - db_iter_->set_sequence(latest_seq); - db_iter_->set_valid(false); // Check again if the latest super version number is changed uint64_t latest_sv_number = cfd_->GetSuperVersionNumber(); if (latest_sv_number != cur_sv_number) { @@ -150,6 +150,8 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snap) { cur_sv_number = latest_sv_number; continue; } + db_iter_->set_sequence(read_seq); + db_iter_->set_valid(false); break; } } diff --git a/db/db_iter.h b/db/db_iter.h index 163da32650cb..e45da9dd1bdc 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -209,6 +209,7 @@ class DBIter final : public Iterator { if (read_callback_) { read_callback_->Refresh(s); } + iter_.SetRangeDelReadSeqno(s); } void set_valid(bool v) { valid_ = v; } diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 6f959e921391..ab39f3257c5a 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2418,34 +2418,92 @@ TEST_P(DBIteratorTest, Refresh) { } TEST_P(DBIteratorTest, RefreshWithSnapshot) { - ASSERT_OK(Put("x", "y")); + // L1 file, uses LevelIterator internally + ASSERT_OK(Put(Key(0), "val0")); + ASSERT_OK(Put(Key(5), "val5")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + // L0 file, uses table iterator internally + ASSERT_OK(Put(Key(1), "val1")); + ASSERT_OK(Put(Key(4), "val4")); + ASSERT_OK(Flush()); + + // Memtable + ASSERT_OK(Put(Key(2), "val2")); + ASSERT_OK(Put(Key(3), "val3")); const Snapshot* snapshot = db_->GetSnapshot(); + ASSERT_OK(Put(Key(2), "new val")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4), + Key(7))); + const Snapshot* snapshot2 = db_->GetSnapshot(); + + ASSERT_EQ(1, NumTableFilesAtLevel(1)); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ReadOptions options; options.snapshot = snapshot; Iterator* iter = NewIterator(options); + ASSERT_OK(Put(Key(6), "val6")); ASSERT_OK(iter->status()); - iter->Seek(Slice("a")); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(Slice("x")), 0); - iter->Next(); - ASSERT_FALSE(iter->Valid()); + auto verify_iter = [&](int start, int end, bool new_key2 = false) { + for (int i = start; i < end; ++i) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + if (i == 2 && new_key2) { + ASSERT_EQ(iter->value(), "new val"); + } else { + ASSERT_EQ(iter->value(), "val" + std::to_string(i)); + } + iter->Next(); + } + }; - ASSERT_OK(Put("c", "d")); + for (int j = 0; j < 2; j++) { + iter->Seek(Key(1)); + verify_iter(1, 3); + // Refresh to same snapshot + iter->Refresh(snapshot); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(3)); + verify_iter(3, 6); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to a newer snapshot + iter->Refresh(snapshot2); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->SeekToFirst(); + verify_iter(0, 4, /*new_key2=*/true); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to an older snapshot + iter->Refresh(snapshot); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(3)); + verify_iter(3, 6); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to no snapshot + iter->Refresh(); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(2)); + verify_iter(2, 4, /*new_key2=*/true); + verify_iter(6, 7); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Change LSM shape, new SuperVersion is created. + ASSERT_OK(Flush()); - iter->Seek(Slice("a")); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(Slice("x")), 0); - iter->Next(); - ASSERT_FALSE(iter->Valid()); + // Refresh back to original snapshot + iter->Refresh(snapshot); + } - ASSERT_OK(iter->status()); - Status s = iter->Refresh(); - ASSERT_TRUE(s.ok()); - s = iter->Refresh(snapshot); - ASSERT_TRUE(s.ok()); - db_->ReleaseSnapshot(snapshot); delete iter; + db_->ReleaseSnapshot(snapshot); + db_->ReleaseSnapshot(snapshot2); + ASSERT_OK(db_->Close()); } TEST_P(DBIteratorTest, CreationFailure) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index bb75592c7e6b..ad3e417dbd47 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -3511,6 +3511,54 @@ TEST_F(DBRangeDelTest, MemtableMaxRangeDeletions) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_EQ(3, NumTableFilesAtLevel(0)); } + +TEST_F(DBRangeDelTest, ReleaseSnapshotAfterIteratorCreation) { + // Test that range tombstone code path in LevelIterator + // does access ReadOptions::snapshot after Iterator creation. + // + // Put some data in L2 so that range tombstone in L1 will not be dropped. + ASSERT_OK(Put(Key(0), "v")); + ASSERT_OK(Put(Key(100), "v")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + // two L1 file with range del + ASSERT_OK(Put(Key(1), "v")); + ASSERT_OK(Put(Key(2), "v")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3), + Key(4))); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_OK(Put(Key(5), "v")); + ASSERT_OK(Put(Key(6), "v")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5), + Key(6))); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_EQ(2, NumTableFilesAtLevel(1)); + ASSERT_EQ(1, NumTableFilesAtLevel(2)); + + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions ro; + ro.snapshot = snapshot; + + Iterator* iter = db_->NewIterator(ro); + db_->ReleaseSnapshot(snapshot); + + iter->Seek(Key(1)); + std::vector expected_keys{1, 2, 6, 100}; + for (int i : expected_keys) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + iter->Next(); + } + ASSERT_TRUE(!iter->Valid() && iter->status().ok()); + + delete iter; +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index dc1e730389b2..f7fa87af40dd 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -36,6 +36,10 @@ class TruncatedRangeDelIterator { const InternalKeyComparator* icmp, const InternalKey* smallest, const InternalKey* largest); + void SetRangeDelReadSeqno(SequenceNumber read_seqno) { + iter_->SetRangeDelReadSeqno(read_seqno); + } + bool Valid() const; void Next() { iter_->TopNext(); } diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index 8c7d98297280..ce631d495e67 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -148,6 +148,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { const InternalKeyComparator& icmp, SequenceNumber upper_bound, const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); + void SetRangeDelReadSeqno(SequenceNumber read_seqno) override { + upper_bound_ = read_seqno; + } + void SeekToFirst() override; void SeekToLast() override; diff --git a/db/table_cache.cc b/db/table_cache.cc index bdbb47a2fe18..680af56a9481 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -220,7 +220,7 @@ InternalIterator* TableCache::NewIterator( size_t max_file_size_for_l0_meta_pin, const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value, - uint8_t block_protection_bytes_per_key, + uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno, TruncatedRangeDelIterator** range_del_iter) { PERF_TIMER_GUARD(new_table_iterator_nanos); @@ -269,7 +269,9 @@ InternalIterator* TableCache::NewIterator( if (s.ok() && !options.ignore_range_deletions) { if (range_del_iter != nullptr) { auto new_range_del_iter = - table_reader->NewRangeTombstoneIterator(options); + read_seqno ? table_reader->NewRangeTombstoneIterator( + *read_seqno, options.timestamp) + : table_reader->NewRangeTombstoneIterator(options); if (new_range_del_iter == nullptr || new_range_del_iter->empty()) { delete new_range_del_iter; *range_del_iter = nullptr; diff --git a/db/table_cache.h b/db/table_cache.h index 39e41cc6c916..67d36d8051a6 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -86,6 +86,8 @@ class TableCache { // not cached), depending on the CF options // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" + // @param range_del_read_seqno If non-nullptr, will be used to create + // *range_del_iter. InternalIterator* NewIterator( const ReadOptions& options, const FileOptions& toptions, const InternalKeyComparator& internal_comparator, @@ -97,6 +99,7 @@ class TableCache { const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value, uint8_t protection_bytes_per_key, + const SequenceNumber* range_del_read_seqno = nullptr, TruncatedRangeDelIterator** range_del_iter = nullptr); // If a seek to internal key "k" in specified file finds an entry, diff --git a/db/version_set.cc b/db/version_set.cc index c5057028d621..e5430748f0b7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -956,18 +956,21 @@ class LevelIterator final : public InternalIterator { flevel_(flevel), prefix_extractor_(prefix_extractor), file_read_hist_(file_read_hist), - should_sample_(should_sample), caller_(caller), - skip_filters_(skip_filters), - allow_unprepared_value_(allow_unprepared_value), file_index_(flevel_->num_files), - level_(level), range_del_agg_(range_del_agg), pinned_iters_mgr_(nullptr), compaction_boundaries_(compaction_boundaries), - is_next_read_sequential_(false), - block_protection_bytes_per_key_(block_protection_bytes_per_key), range_tombstone_iter_(nullptr), + read_seq_(read_options.snapshot + ? read_options.snapshot->GetSequenceNumber() + : kMaxSequenceNumber), + level_(level), + block_protection_bytes_per_key_(block_protection_bytes_per_key), + should_sample_(should_sample), + skip_filters_(skip_filters), + allow_unprepared_value_(allow_unprepared_value), + is_next_read_sequential_(false), to_return_sentinel_(false) { // Empty level is not supported. assert(flevel_ != nullptr && flevel_->num_files > 0); @@ -1111,7 +1114,7 @@ class LevelIterator final : public InternalIterator { /*arena=*/nullptr, skip_filters_, level_, /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key, largest_compaction_key, allow_unprepared_value_, - block_protection_bytes_per_key_, range_tombstone_iter_); + block_protection_bytes_per_key_, &read_seq_, range_tombstone_iter_); } // Check if current file being fully within iterate_lower_bound. @@ -1141,13 +1144,8 @@ class LevelIterator final : public InternalIterator { const std::shared_ptr& prefix_extractor_; HistogramImpl* file_read_hist_; - bool should_sample_; TableReaderCaller caller_; - bool skip_filters_; - bool allow_unprepared_value_; - bool may_be_out_of_lower_bound_ = true; size_t file_index_; - int level_; RangeDelAggregator* range_del_agg_; IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; @@ -1156,10 +1154,6 @@ class LevelIterator final : public InternalIterator { // tombstones. const std::vector* compaction_boundaries_; - bool is_next_read_sequential_; - - uint8_t block_protection_bytes_per_key_; - // This is set when this level iterator is used under a merging iterator // that processes range tombstones. range_tombstone_iter_ points to where the // merging iterator stores the range tombstones iterator for this level. When @@ -1176,20 +1170,29 @@ class LevelIterator final : public InternalIterator { // *range_tombstone_iter_ points to range tombstones of the current SST file TruncatedRangeDelIterator** range_tombstone_iter_; - // Whether next/prev key is a sentinel key. - bool to_return_sentinel_ = false; // The sentinel key to be returned Slice sentinel_; - // Sets flags for if we should return the sentinel key next. - // The condition for returning sentinel is reaching the end of current - // file_iter_: !Valid() && status.().ok(). - void TrySetDeleteRangeSentinel(const Slice& boundary_key); - void ClearSentinel() { to_return_sentinel_ = false; } + SequenceNumber read_seq_; + int level_; + uint8_t block_protection_bytes_per_key_; + bool should_sample_; + bool skip_filters_; + bool allow_unprepared_value_; + bool may_be_out_of_lower_bound_ = true; + bool is_next_read_sequential_; // Set in Seek() when a prefix seek reaches end of the current file, // and the next file has a different prefix. SkipEmptyFileForward() // will not move to next file when this flag is set. bool prefix_exhausted_ = false; + // Whether next/prev key is a sentinel key. + bool to_return_sentinel_ = false; + + // Sets flags for if we should return the sentinel key next. + // The condition for returning sentinel is reaching the end of current + // file_iter_: !Valid() && status.().ok(). + void TrySetDeleteRangeSentinel(const Slice& boundary_key); + void ClearSentinel() { to_return_sentinel_ = false; } }; void LevelIterator::TrySetDeleteRangeSentinel(const Slice& boundary_key) { @@ -2005,7 +2008,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, allow_unprepared_value, - mutable_cf_options_.block_protection_bytes_per_key, &tombstone_iter); + mutable_cf_options_.block_protection_bytes_per_key, + /*range_del_read_seqno=*/nullptr, &tombstone_iter); if (read_options.ignore_range_deletions) { merge_iter_builder->AddIterator(table_iter); } else { @@ -6958,6 +6962,7 @@ InternalIterator* VersionSet::MakeInputIterator( /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false, c->mutable_cf_options()->block_protection_bytes_per_key, + /*range_del_read_seqno=*/nullptr, /*range_del_iter=*/&range_tombstone_iter); range_tombstones.emplace_back(range_tombstone_iter, nullptr); } diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index bd71cb2a62e8..f6c92dde4e3f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1339,6 +1339,14 @@ Status StressTest::TestIterate(ThreadState* thread, const bool support_seek_first_or_last = expect_total_order; + // Write-prepared and Write-unprepared do not support Refresh() yet. + if (!(FLAGS_use_txn && FLAGS_txn_write_policy != 0 /* write committed */) && + thread->rand.OneIn(4)) { + Status s = iter->Refresh(snapshot_guard.snapshot()); + assert(s.ok()); + op_logs += "Refresh "; + } + LastIterateOp last_op; if (support_seek_first_or_last && thread->rand.OneIn(100)) { iter->SeekToFirst(); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index bebeece759ab..df554bc923d8 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1907,6 +1907,16 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( snapshot, read_options.timestamp); } +FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( + SequenceNumber read_seqno, const Slice* timestamp) { + if (rep_->fragmented_range_dels == nullptr) { + return nullptr; + } + return new FragmentedRangeTombstoneIterator(rep_->fragmented_range_dels, + rep_->internal_comparator, + read_seqno, timestamp); +} + bool BlockBasedTable::FullFilterKeyMayMatch( FilterBlockReader* filter, const Slice& internal_key, const bool no_io, const SliceTransform* prefix_extractor, GetContext* get_context, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 0b5fe1cb802c..1afdc24a95c6 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -138,6 +138,9 @@ class BlockBasedTable : public TableReader { FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& read_options) override; + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + SequenceNumber read_seqno, const Slice* timestamp) override; + // @param skip_filters Disables loading/accessing the filter block Status Get(const ReadOptions& readOptions, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 8015ed635112..a95fc3bdf813 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -43,6 +43,17 @@ class InternalIteratorBase : public Cleanable { virtual ~InternalIteratorBase() {} + // This iterator will only process range tombstones with sequence + // number <= `read_seqno`. + // Noop for most child classes. + // For range tombstone iterators (TruncatedRangeDelIterator, + // FragmentedRangeTombstoneIterator), will only return range tombstones with + // sequence number <= `read_seqno`. For LevelIterator, it may open new table + // files and create new range tombstone iterators during scanning. It will use + // `read_seqno` as the sequence number for creating new range tombstone + // iterators. + virtual void SetRangeDelReadSeqno(SequenceNumber /* read_seqno */){}; + // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. // Always returns false if !status().ok(). diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 17abef4ac796..3e6f9c1ae5f5 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -30,6 +30,11 @@ class IteratorWrapperBase { } ~IteratorWrapperBase() {} InternalIteratorBase* iter() const { return iter_; } + void SetRangeDelReadSeqno(SequenceNumber read_seqno) { + if (iter_) { + iter_->SetRangeDelReadSeqno(read_seqno); + } + } // Set the underlying Iterator to _iter and return // previous underlying Iterator. diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 0fa3fcd3ebe4..b5c3a4f95e6e 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -135,6 +135,18 @@ class MergingIterator : public InternalIterator { status_.PermitUncheckedError(); } + void SetRangeDelReadSeqno(SequenceNumber read_seqno) override { + for (auto& child : children_) { + // This should only be needed for LevelIterator (iterators from L1+). + child.iter.SetRangeDelReadSeqno(read_seqno); + } + for (auto& child : range_tombstone_iters_) { + if (child) { + child->SetRangeDelReadSeqno(read_seqno); + } + } + } + bool Valid() const override { return current_ != nullptr && status_.ok(); } Status status() const override { return status_; } diff --git a/table/table_reader.h b/table/table_reader.h index 53c522052936..87610f4fed1d 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -60,11 +60,17 @@ class TableReader { size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) = 0; + // read_options.snapshot needs to outlive this call. virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& /*read_options*/) { return nullptr; } + virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + SequenceNumber /* read_seqno */, const Slice* /* timestamp */) { + return nullptr; + } + // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were // present in the file). The returned value is in terms of file diff --git a/unreleased_history/new_features/iterator-refresh-snapshot.md b/unreleased_history/new_features/iterator-refresh-snapshot.md new file mode 100644 index 000000000000..f8a0e7b431f4 --- /dev/null +++ b/unreleased_history/new_features/iterator-refresh-snapshot.md @@ -0,0 +1 @@ +Add a new iterator API `Iterator::Refresh(const Snapshot *)` that allows iterator to be refreshed while using the input snapshot to read. \ No newline at end of file diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index b53d7720be17..5f17247e4809 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -390,7 +390,7 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options, read_options.io_activity = Env::IOActivity::kDBIterator; } constexpr bool expose_blob_index = false; - constexpr bool allow_refresh = true; + constexpr bool allow_refresh = false; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber min_uncommitted = 0; diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 2b8d40732547..973aa49fd3a9 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -401,7 +401,7 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options, } // TODO(lth): Refactor so that this logic is shared with WritePrepared. constexpr bool expose_blob_index = false; - constexpr bool allow_refresh = true; + constexpr bool allow_refresh = false; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber min_uncommitted = 0;