diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 561c1e7cc89..9a3208eb72c 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -141,7 +141,6 @@ TEST_F(CompactFilesTest, FilterContext) { db->CompactRange(CompactRangeOptions(), nullptr, nullptr); usleep(10000); // Wait for compaction start. ASSERT_EQ(expected_context->start_key, Slice("1")); - ASSERT_EQ(expected_context->end_key, Slice("99")); ASSERT_EQ(expected_context->is_end_key_inclusive, 1); ASSERT_EQ(expected_context->file_numbers[0], 11); ASSERT_EQ(*compaction_count.get(), 2); diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 135018f5148..1d3406b4082 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -172,10 +172,11 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; + SequenceNumber seqno = ikey_.sequence; { StopWatchNano timer(env_, report_detailed_time_); - filter = compaction_filter_->FilterV2( - compaction_->level(), filter_key, value_type, value_, + filter = compaction_filter_->FilterV3( + compaction_->level(), filter_key, seqno, value_type, value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; @@ -185,7 +186,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2 documentation. + // Keep the key as per FilterV3 documentation. filter = CompactionFilter::Decision::kKeep; } diff --git a/db/db_iter.cc b/db/db_iter.cc index 92e004c23ac..10ce051515e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -195,6 +195,11 @@ class DBIter final: public Iterator { return iter_.value(); } } + bool seqno(SequenceNumber* no) const override { + assert(valid_); + *no = ikey_.sequence; + return true; + } Status status() const override { if (status_.ok()) { return iter_.status(); @@ -1602,6 +1607,9 @@ inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } +inline bool ArenaWrappedDBIter::seqno(SequenceNumber* no) const { + return db_iter_->seqno(no); +} inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, diff --git a/db/db_iter.h b/db/db_iter.h index 6a4bf8a5507..84244f03460 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -89,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator { virtual void Prev() override; virtual Slice key() const override; virtual Slice value() const override; + virtual bool seqno(SequenceNumber* no) const override; virtual Status status() const override; virtual Status Refresh() override; bool IsBlob() const; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 0318387c193..c6851cf6998 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -130,6 +130,30 @@ class CompactionFilter { return false; } + // Almost same as FilterV3, except won't pass out sequence numbers. + virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const { + switch (value_type) { + case ValueType::kValue: { + bool value_changed = false; + bool rv = Filter(level, key, existing_value, new_value, &value_changed); + if (rv) { + return Decision::kRemove; + } + return value_changed ? Decision::kChangeValue : Decision::kKeep; + } + case ValueType::kMergeOperand: { + bool rv = FilterMergeOperand(level, key, existing_value); + return rv ? Decision::kRemove : Decision::kKeep; + } + case ValueType::kBlobIndex: + return Decision::kKeep; + } + assert(false); + return Decision::kKeep; + } + // An extended API. Called for both values and merge operands. // Allows changing value and skipping ranges of keys. // The default implementation uses Filter() and FilterMergeOperand(). @@ -169,27 +193,14 @@ class CompactionFilter { // is a write conflict and may allow a Transaction to Commit that should have // failed. Instead, it is better to implement any Merge filtering inside the // MergeOperator. - virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + // + // Note: for kTypeBlobIndex, `key` is internal key instead of user key. + virtual Decision FilterV3(int level, const Slice& key, + SequenceNumber /*seqno*/, ValueType value_type, const Slice& existing_value, std::string* new_value, - std::string* /*skip_until*/) const { - switch (value_type) { - case ValueType::kValue: { - bool value_changed = false; - bool rv = Filter(level, key, existing_value, new_value, &value_changed); - if (rv) { - return Decision::kRemove; - } - return value_changed ? Decision::kChangeValue : Decision::kKeep; - } - case ValueType::kMergeOperand: { - bool rv = FilterMergeOperand(level, key, existing_value); - return rv ? Decision::kRemove : Decision::kKeep; - } - case ValueType::kBlobIndex: - return Decision::kKeep; - } - assert(false); - return Decision::kKeep; + std::string* skip_until) const { + return FilterV2(level, key, value_type, existing_value, new_value, + skip_until); } // This function is deprecated. Snapshots will always be ignored for diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index e99b434a019..8c595ce1edb 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -22,6 +22,7 @@ #include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" namespace rocksdb { @@ -78,6 +79,11 @@ class Iterator : public Cleanable { // REQUIRES: Valid() virtual Slice value() const = 0; + // Return the sequence number for the current entry if it's available. + // Return false if it's not available. + // REQUIRES: Valid() + virtual bool seqno(SequenceNumber* /*seqno*/) const { return false; } + // If an error has occurred, return it. Else return an ok status. // If non-blocking IO is requested and this operation cannot be // satisfied without doing some IO, then this returns Status::Incomplete(). diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 1565c670b18..35b811bef0c 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -111,6 +111,8 @@ class BlobDBIterator : public Iterator { return value_; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + // Iterator::Refresh() not supported. private: diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index af4040089ea..e11deade703 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -143,6 +143,8 @@ class TtlIterator : public Iterator { return trimmed_value; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + Status status() const override { return iter_->status(); } private: