From 9416ebfa9db7b078fc4cd7c5ffc1e662abc26ab8 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 27 Nov 2020 15:46:01 +0800 Subject: [PATCH 1/3] add seqno for compaction filter Signed-off-by: qupeng --- db/compaction/compaction_iterator.cc | 7 ++++--- include/rocksdb/compaction_filter.h | 13 ++++++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 135018f5148..1d3406b4082 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -172,10 +172,11 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; + SequenceNumber seqno = ikey_.sequence; { StopWatchNano timer(env_, report_detailed_time_); - filter = compaction_filter_->FilterV2( - compaction_->level(), filter_key, value_type, value_, + filter = compaction_filter_->FilterV3( + compaction_->level(), filter_key, seqno, value_type, value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; @@ -185,7 +186,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2 documentation. + // Keep the key as per FilterV3 documentation. filter = CompactionFilter::Decision::kKeep; } diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 0318387c193..89a666977a4 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -130,6 +130,14 @@ class CompactionFilter { return false; } + // Almost same as FilterV3, except won't pass out sequence number. + virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const { + return FilterV3(level, key, 0, value_type, existing_value, new_value, + skip_until); + } + // An extended API. Called for both values and merge operands. // Allows changing value and skipping ranges of keys. // The default implementation uses Filter() and FilterMergeOperand(). @@ -169,7 +177,10 @@ class CompactionFilter { // is a write conflict and may allow a Transaction to Commit that should have // failed. Instead, it is better to implement any Merge filtering inside the // MergeOperator. - virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + // + // Note: for kTypeBlobIndex, `key` is internal key instead of user key. + virtual Decision FilterV3(int level, const Slice& key, + SequenceNumber /*seqno*/, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const { switch (value_type) { From 2dcd15071de66f732f8b29183ab04898b04ab514 Mon Sep 17 00:00:00 2001 From: qupeng Date: Sat, 28 Nov 2020 19:19:08 +0800 Subject: [PATCH 2/3] allow to get sequence number from db iter Signed-off-by: qupeng --- db/compact_files_test.cc | 1 - db/db_iter.cc | 7 +++++ db/db_iter.h | 1 + include/rocksdb/compaction_filter.h | 46 ++++++++++++++-------------- include/rocksdb/iterator.h | 6 ++++ utilities/blob_db/blob_db_iterator.h | 2 ++ utilities/ttl/db_ttl_impl.h | 2 ++ 7 files changed, 41 insertions(+), 24 deletions(-) diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 561c1e7cc89..9a3208eb72c 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -141,7 +141,6 @@ TEST_F(CompactFilesTest, FilterContext) { db->CompactRange(CompactRangeOptions(), nullptr, nullptr); usleep(10000); // Wait for compaction start. ASSERT_EQ(expected_context->start_key, Slice("1")); - ASSERT_EQ(expected_context->end_key, Slice("99")); ASSERT_EQ(expected_context->is_end_key_inclusive, 1); ASSERT_EQ(expected_context->file_numbers[0], 11); ASSERT_EQ(*compaction_count.get(), 2); diff --git a/db/db_iter.cc b/db/db_iter.cc index 92e004c23ac..d553815b70e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -195,6 +195,10 @@ class DBIter final: public Iterator { return iter_.value(); } } + bool seqno(SequenceNumber* no) const override { + *no = ikey_.sequence; + return true; + } Status status() const override { if (status_.ok()) { return iter_.status(); @@ -1602,6 +1606,9 @@ inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } +inline bool ArenaWrappedDBIter::seqno(SequenceNumber* no) const { + return db_iter_->seqno(no); +} inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, diff --git a/db/db_iter.h b/db/db_iter.h index 6a4bf8a5507..84244f03460 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -89,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator { virtual void Prev() override; virtual Slice key() const override; virtual Slice value() const override; + virtual bool seqno(SequenceNumber* no) const override; virtual Status status() const override; virtual Status Refresh() override; bool IsBlob() const; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 89a666977a4..c6851cf6998 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -130,12 +130,28 @@ class CompactionFilter { return false; } - // Almost same as FilterV3, except won't pass out sequence number. + // Almost same as FilterV3, except won't pass out sequence numbers. virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, const Slice& existing_value, std::string* new_value, - std::string* skip_until) const { - return FilterV3(level, key, 0, value_type, existing_value, new_value, - skip_until); + std::string* /*skip_until*/) const { + switch (value_type) { + case ValueType::kValue: { + bool value_changed = false; + bool rv = Filter(level, key, existing_value, new_value, &value_changed); + if (rv) { + return Decision::kRemove; + } + return value_changed ? Decision::kChangeValue : Decision::kKeep; + } + case ValueType::kMergeOperand: { + bool rv = FilterMergeOperand(level, key, existing_value); + return rv ? Decision::kRemove : Decision::kKeep; + } + case ValueType::kBlobIndex: + return Decision::kKeep; + } + assert(false); + return Decision::kKeep; } // An extended API. Called for both values and merge operands. @@ -182,25 +198,9 @@ class CompactionFilter { virtual Decision FilterV3(int level, const Slice& key, SequenceNumber /*seqno*/, ValueType value_type, const Slice& existing_value, std::string* new_value, - std::string* /*skip_until*/) const { - switch (value_type) { - case ValueType::kValue: { - bool value_changed = false; - bool rv = Filter(level, key, existing_value, new_value, &value_changed); - if (rv) { - return Decision::kRemove; - } - return value_changed ? Decision::kChangeValue : Decision::kKeep; - } - case ValueType::kMergeOperand: { - bool rv = FilterMergeOperand(level, key, existing_value); - return rv ? Decision::kRemove : Decision::kKeep; - } - case ValueType::kBlobIndex: - return Decision::kKeep; - } - assert(false); - return Decision::kKeep; + std::string* skip_until) const { + return FilterV2(level, key, value_type, existing_value, new_value, + skip_until); } // This function is deprecated. Snapshots will always be ignored for diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index e99b434a019..8c595ce1edb 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -22,6 +22,7 @@ #include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" namespace rocksdb { @@ -78,6 +79,11 @@ class Iterator : public Cleanable { // REQUIRES: Valid() virtual Slice value() const = 0; + // Return the sequence number for the current entry if it's available. + // Return false if it's not available. + // REQUIRES: Valid() + virtual bool seqno(SequenceNumber* /*seqno*/) const { return false; } + // If an error has occurred, return it. Else return an ok status. // If non-blocking IO is requested and this operation cannot be // satisfied without doing some IO, then this returns Status::Incomplete(). diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 1565c670b18..35b811bef0c 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -111,6 +111,8 @@ class BlobDBIterator : public Iterator { return value_; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + // Iterator::Refresh() not supported. private: diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index af4040089ea..e11deade703 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -143,6 +143,8 @@ class TtlIterator : public Iterator { return trimmed_value; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + Status status() const override { return iter_->status(); } private: From af6724411b6bd245dded199886bf63e608fe3287 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 8 Dec 2020 13:03:45 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: qupeng --- db/db_iter.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/db/db_iter.cc b/db/db_iter.cc index d553815b70e..10ce051515e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -196,6 +196,7 @@ class DBIter final: public Iterator { } } bool seqno(SequenceNumber* no) const override { + assert(valid_); *no = ikey_.sequence; return true; }