diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index 678ea3e78d7..2ac74635f60 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -73,6 +73,9 @@ class ArenaWrappedDBIter : public Iterator { Slice key() const override { return db_iter_->key(); } Slice value() const override { return db_iter_->value(); } const WideColumns& columns() const override { return db_iter_->columns(); } + inline bool seqno(SequenceNumber* no) const override { + return db_iter_->seqno(no); + } Status status() const override { return db_iter_->status(); } Slice timestamp() const override { return db_iter_->timestamp(); } bool IsBlob() const { return db_iter_->IsBlob(); } diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 2d53f2b992d..1e73211e247 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -3,7 +3,6 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). - #include #include #include @@ -11,6 +10,7 @@ #include "db/db_impl/db_impl.h" #include "port/port.h" +#include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "test_util/sync_point.h" @@ -60,6 +60,79 @@ class FlushedFileCollector : public EventListener { std::mutex mutex_; }; +class TestFilterFactory : public CompactionFilterFactory { + public: + std::shared_ptr context_; + std::shared_ptr compaction_count_; + + TestFilterFactory(std::shared_ptr context, + std::shared_ptr compaction_count) { + this->context_ = context; + this->compaction_count_ = compaction_count; + } + + ~TestFilterFactory() {} + + const char* Name() const { return "TestFilterFactory"; } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) { + context_->start_key = context.start_key; + context_->end_key = context.end_key; + context_->is_end_key_inclusive = context.is_end_key_inclusive; + context_->input_table_properties = context.input_table_properties; + *compaction_count_.get() += 1; + return nullptr; + } +}; + +TEST_F(CompactFilesTest, FilterContext) { + Options options; + // to trigger compaction more easily + const int kWriteBufferSize = 10000; + const int kLevel0Trigger = 10; + options.create_if_missing = true; + options.compaction_style = kCompactionStyleLevel; + // Small slowdown and stop trigger for experimental purpose. + options.level0_slowdown_writes_trigger = 20; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + options.level0_file_num_compaction_trigger = kLevel0Trigger; + options.compression = kNoCompression; + + std::shared_ptr expected_context( + new CompactionFilter::Context); + std::shared_ptr compaction_count(new int(0)); + CompactionFilterFactory* factory = + new TestFilterFactory(expected_context, compaction_count); + options.compaction_filter_factory = + std::shared_ptr(factory); + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + // `Flush` is different from `Compaction`. + db->Put(WriteOptions(), std::to_string(1), ""); + db->Put(WriteOptions(), std::to_string(51), ""); + db->Flush(FlushOptions()); + db->Put(WriteOptions(), std::to_string(50), ""); + db->Put(WriteOptions(), std::to_string(99), ""); + db->Flush(FlushOptions()); + ASSERT_EQ(*compaction_count.get(), 0); + + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); + usleep(10000); // Wait for compaction start. + ASSERT_EQ(expected_context->start_key, Slice("1")); + ASSERT_EQ(expected_context->is_end_key_inclusive, 1); + ASSERT_EQ(expected_context->input_table_properties.size(), 2); + ASSERT_EQ(*compaction_count.get(), 1); + + delete (db); +} + TEST_F(CompactFilesTest, L0ConflictsFiles) { Options options; // to trigger compaction more easily @@ -485,6 +558,87 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) { delete db; } +TEST_F(CompactFilesTest, IsWriteStalled) { + class SlowFilter : public CompactionFilter { + public: + SlowFilter(std::atomic* would_block) { would_block_ = would_block; } + + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + while (would_block_->load(std::memory_order_relaxed)) { + usleep(10000); + } + return false; + } + + const char* Name() const override { return "SlowFilter"; } + + private: + std::atomic* would_block_; + }; + + Options options; + options.create_if_missing = true; + options.delayed_write_rate = 1; + + ColumnFamilyOptions cf_options; + cf_options.level0_slowdown_writes_trigger = 12; + cf_options.level0_stop_writes_trigger = 15; + cf_options.write_buffer_size = 1024 * 1024; + + std::atomic compaction_would_block; + compaction_would_block.store(true, std::memory_order_relaxed); + cf_options.compaction_filter = new SlowFilter(&compaction_would_block); + + std::vector cfds; + cfds.push_back(ColumnFamilyDescriptor("default", cf_options)); + + DB* db = nullptr; + std::vector handles; + DestroyDB(db_name_, options); + + Status s = DB::Open(options, db_name_, cfds, &handles, &db); + assert(s.ok()); + assert(db); + + int flushed_l0_files = 0; + for (; flushed_l0_files < 100;) { + WriteBatch wb; + for (int j = 0; j < 100; ++j) { + char key[16]; + bzero(key, 16); + sprintf(key, "foo%.2d", j); + ASSERT_OK(wb.Put(handles[0], key, "bar")); + } + + WriteOptions wopts; + wopts.no_slowdown = true; + s = db->Write(wopts, &wb); + if (s.ok()) { + FlushOptions fopts; + fopts.allow_write_stall = true; + ASSERT_OK(db->Flush(fopts)); + ++flushed_l0_files; + } else { + ASSERT_EQ(s.code(), Status::Code::kIncomplete); + break; + } + } + + // The write loop must be terminated by write stall. + ASSERT_EQ(flushed_l0_files, 12); + uint64_t stalled = false; + db->GetIntProperty(handles[0], "rocksdb.is-write-stalled", &stalled); + ASSERT_TRUE(stalled); + + compaction_would_block.store(false, std::memory_order_relaxed); + for (size_t i = 0; i < handles.size(); ++i) { + delete handles[i]; + } + delete (db); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { @@ -492,4 +646,3 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } - diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index bbab8f79fb5..cda2feaa9f7 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -812,7 +812,8 @@ uint64_t Compaction::OutputFilePreallocationSize() const { preallocation_size + (preallocation_size / 10)); } -std::unique_ptr Compaction::CreateCompactionFilter() const { +std::unique_ptr Compaction::CreateCompactionFilter( + std::optional start, std::optional end) const { if (!cfd_->ioptions()->compaction_filter_factory) { return nullptr; } @@ -830,6 +831,12 @@ std::unique_ptr Compaction::CreateCompactionFilter() const { context.column_family_id = cfd_->GetID(); context.reason = TableFileCreationReason::kCompaction; context.input_table_properties = GetInputTableProperties(); + context.is_bottommost_level = bottommost_level_; + context.start_key = start == std::nullopt ? GetSmallestUserKey() + : ExtractUserKey(start.value()); + context.end_key = + end == std::nullopt ? GetLargestUserKey() : ExtractUserKey(end.value()); + context.is_end_key_inclusive = end == std::nullopt; if (context.input_table_properties.empty()) { ROCKS_LOG_WARN( immutable_options_.info_log, diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 50c75f70b22..4c209f8a258 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -311,7 +311,8 @@ class Compaction { void ResetNextCompactionIndex(); // Create a CompactionFilter from compaction_filter_factory - std::unique_ptr CreateCompactionFilter() const; + std::unique_ptr CreateCompactionFilter( + std::optional start, std::optional end) const; // Create a SstPartitioner from sst_partitioner_factory std::unique_ptr CreateSstPartitioner() const; diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 85d1c039bd3..99d2b726273 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && - ikey_.type != kTypeWideColumnEntity) { + ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) { return true; } CompactionFilter::Decision decision = CompactionFilter::Decision::kUndetermined; - CompactionFilter::ValueType value_type = - ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue - : ikey_.type == kTypeBlobIndex - ? CompactionFilter::ValueType::kBlobIndex - : CompactionFilter::ValueType::kWideColumnEntity; + CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue; + if (ikey_.type == kTypeBlobIndex) { + value_type = CompactionFilter::ValueType::kBlobIndex; + } else if (ikey_.type == kTypeWideColumnEntity) { + value_type = CompactionFilter::ValueType::kWideColumnEntity; + } else if (ikey_.type == kTypeDeletion) { + value_type = CompactionFilter::ValueType::kDeletion; + } // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. @@ -277,7 +280,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // For integrated BlobDB impl, CompactionIterator reads blob value. // For Stacked BlobDB impl, the corresponding CompactionFilter's - // FilterV2 method should read the blob value. + // FilterV4 method should read the blob value. BlobIndex blob_index; Status s = blob_index.DecodeFrom(value_); if (!s.ok()) { @@ -337,9 +340,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, existing_col = &existing_columns; } - decision = compaction_filter_->FilterV3( - level_, filter_key, value_type, existing_val, existing_col, - &compaction_filter_value_, &new_columns, + decision = compaction_filter_->UnsafeFilter( + level_, filter_key, ikey_.sequence, value_type, existing_val, + existing_col, &compaction_filter_value_, &new_columns, compaction_filter_skip_until_.rep()); } @@ -348,10 +351,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } if (decision == CompactionFilter::Decision::kUndetermined) { - // Should not reach here, since FilterV2/FilterV3 should never return - // kUndetermined. + // Should not reach here, since FilterV2/FilterV3/FilterV4 should never + // return kUndetermined. status_ = Status::NotSupported( - "FilterV2/FilterV3 should never return kUndetermined"); + "FilterV2/FilterV3/FilterV4 should never return kUndetermined"); validity_info_.Invalidate(); return false; } @@ -360,7 +363,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2/FilterV3 documentation. + // Keep the key as per FilterV2/FilterV3/FilterV4 documentation. decision = CompactionFilter::Decision::kKeep; } diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 699e629693d..1cb78c33f50 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -719,6 +719,42 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) { ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); } +TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) { + struct Filter : public CompactionFilter { + Decision UnsafeFilter( + int /*level*/, const Slice& key, SequenceNumber /*seq*/, ValueType t, + const Slice* /*existing_value*/, const WideColumns*, + std::string* /*new_value*/, + std::vector>* /*new_columns*/, + std::string* skip_until) const override { + if (t == ValueType::kDeletion) { + *skip_until = key.ToString(); + skip_until->back() += 1; + filtered += 1; + return Decision::kRemoveAndSkipUntil; + } + return Decision::kKeep; + } + + const char* Name() const override { + return "CompactionIteratorTest.SingleDelete::Filter"; + } + mutable size_t filtered = 0; + }; + + Filter filter; + InitIterators( + {test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue), + test::KeyStr("c", 70, kTypeDeletion), + test::KeyStr("c", 50, kTypeDeletion)}, + {"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, + nullptr, &filter); + + c_iter_->SeekToFirst(); + ASSERT_TRUE(!c_iter_->Valid()); + ASSERT_EQ(filter.filtered, 2); +} + // In bottommost level, values earlier than earliest snapshot can be output // with sequence = 0. TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 99b099759db..457b4351200 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1095,7 +1095,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { compaction_filter_from_factory = - sub_compact->compaction->CreateCompactionFilter(); + sub_compact->compaction->CreateCompactionFilter(sub_compact->start, + sub_compact->end); compaction_filter = compaction_filter_from_factory.get(); } if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) { diff --git a/db/db_iter.h b/db/db_iter.h index bbcf9ceb6dd..abb7ab191dc 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -170,6 +170,11 @@ class DBIter final : public Iterator { return wide_columns_; } + bool seqno(SequenceNumber* no) const override { + assert(valid_); + *no = ikey_.sequence; + return true; + } Status status() const override { if (status_.ok()) { return iter_.status(); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 6ef4b430236..b4180b959f2 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -305,6 +305,7 @@ static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; static const std::string is_write_stopped = "is-write-stopped"; +static const std::string is_write_stalled = "is-write-stalled"; static const std::string estimate_oldest_key_time = "estimate-oldest-key-time"; static const std::string block_cache_capacity = "block-cache-capacity"; static const std::string block_cache_usage = "block-cache-usage"; @@ -408,6 +409,8 @@ const std::string DB::Properties::kActualDelayedWriteRate = rocksdb_prefix + actual_delayed_write_rate; const std::string DB::Properties::kIsWriteStopped = rocksdb_prefix + is_write_stopped; +const std::string DB::Properties::kIsWriteStalled = + rocksdb_prefix + is_write_stalled; const std::string DB::Properties::kEstimateOldestKeyTime = rocksdb_prefix + estimate_oldest_key_time; const std::string DB::Properties::kBlockCacheCapacity = @@ -586,6 +589,9 @@ const UnorderedMap {DB::Properties::kIsWriteStopped, {false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr, nullptr}}, + {DB::Properties::kIsWriteStalled, + {false, nullptr, &InternalStats::HandleIsWriteStalled, nullptr, + nullptr}}, {DB::Properties::kEstimateOldestKeyTime, {false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, nullptr, nullptr}}, @@ -1463,6 +1469,12 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db, return true; } +bool InternalStats::HandleIsWriteStalled(uint64_t* value, DBImpl* db, + Version* /*version*/) { + *value = db->write_controller().NeedsDelay() ? 1 : 0; + return true; +} + bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // TODO(yiwu): The property is currently available for fifo compaction diff --git a/db/internal_stats.h b/db/internal_stats.h index 85c1a6bb1e9..465167024a4 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -833,6 +833,7 @@ class InternalStats { bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db, Version* version); bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version); + bool HandleIsWriteStalled(uint64_t* value, DBImpl* db, Version* version); bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db, Version* version); bool HandleBlockCacheCapacity(uint64_t* value, DBImpl* db, Version* version); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 908878a5fae..19926ae08c5 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -69,6 +69,10 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, io_options, nullptr); } + // The buffer initialization code previously in ctor. + if (buf_.Capacity() == 0) { + buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); + } // See whether we need to enlarge the buffer to avoid the flush if (buf_.Capacity() - buf_.CurrentSize() < left) { for (size_t cap = buf_.Capacity(); @@ -183,8 +187,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, return AssertFalseAndGetStatusForPrevError(); } assert(pad_bytes < kDefaultPageSize); - size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); + size_t left = std::min(pad_bytes, cap); size_t pad_start = buf_.CurrentSize(); // Assume pad_bytes is small compared to buf_ capacity. So we always diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index aac0f59491e..30c278cfb9a 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -202,7 +202,8 @@ class WritableFileWriter { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); - buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); + // Moved to `Append` to reduce memory usage of unused writer. + // buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); std::for_each(listeners.begin(), listeners.end(), [this](const std::shared_ptr& e) { if (e->ShouldBeNotifiedOnFileIO()) { diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 1784f2329ac..fb8a86e34cc 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -16,9 +16,11 @@ #include "rocksdb/customizable.h" #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" #include "rocksdb/table_properties.h" #include "rocksdb/types.h" #include "rocksdb/wide_columns.h" +#include "types.h" namespace ROCKSDB_NAMESPACE { @@ -67,6 +69,8 @@ class CompactionFilter : public Customizable { kBlobIndex, // Wide-column entity kWideColumnEntity, + // Only used by TiKV's region range filter. + kDeletion, }; // Potential decisions that can be returned by the compaction filter's @@ -164,6 +168,14 @@ class CompactionFilter : public Customizable { // The lowest level among all the input files (if any) used in table // creation int input_start_level = kUnknownStartLevel; + // Whether output files are in bottommost level or not. + bool is_bottommost_level; + + // The range of the compaction. + Slice start_key; + Slice end_key; + bool is_end_key_inclusive; + // The column family that will contain the created table file. uint32_t column_family_id; // Reason this table file is being created. @@ -251,6 +263,10 @@ class CompactionFilter : public Customizable { case ValueType::kBlobIndex: return Decision::kKeep; + case ValueType::kDeletion: + // Should never be reached. + assert(false); + return Decision::kKeep; default: assert(false); @@ -300,6 +316,32 @@ class CompactionFilter : public Customizable { skip_until); } + virtual Decision FilterV4( + int level, const Slice& key, SequenceNumber, ValueType value_type, + const Slice* existing_value, const WideColumns* existing_columns, + std::string* new_value, + std::vector>* new_columns, + std::string* skip_until) const { + return FilterV3(level, key, value_type, existing_value, existing_columns, + new_value, new_columns, skip_until); + } + + // This interface is reserved for TiKV's region range filter. Only this + // interface can accept `value_type=kTypeDeletion`. + virtual Decision UnsafeFilter( + int level, const Slice& key, SequenceNumber seqno, ValueType value_type, + const Slice* existing_value, const WideColumns* existing_columns, + std::string* new_value, + std::vector>* new_columns, + std::string* skip_until) const { + if (value_type != ValueType::kDeletion) { + return FilterV4(level, key, seqno, value_type, existing_value, + existing_columns, new_value, new_columns, skip_until); + } else { + return Decision::kKeep; + } + } + // Internal (BlobDB) use only. Do not override in application code. virtual BlobDecision PrepareBlobOutput(const Slice& /* key */, const Slice& /* existing_value */, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5ae73182b60..f41a491cc88 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1215,6 +1215,9 @@ class DB { // "rocksdb.is-write-stopped" - Return 1 if write has been stopped. static const std::string kIsWriteStopped; + // "rocksdb.is-write-stalled" - Return 1 if write has been stalled. + static const std::string kIsWriteStalled; + // "rocksdb.estimate-oldest-key-time" - returns an estimation of // oldest key timestamp in the DB. Currently only available for // FIFO compaction with @@ -1323,6 +1326,7 @@ class DB { // "rocksdb.num-running-flushes" // "rocksdb.actual-delayed-write-rate" // "rocksdb.is-write-stopped" + // "rocksdb.is-write-stalled" // "rocksdb.estimate-oldest-key-time" // "rocksdb.block-cache-capacity" // "rocksdb.block-cache-usage" diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 8568dd2588c..017e556ce7c 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -23,6 +23,7 @@ #include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" #include "rocksdb/wide_columns.h" namespace ROCKSDB_NAMESPACE { @@ -102,6 +103,11 @@ class Iterator : public Cleanable { return kNoWideColumns; } + // Return the sequence number for the current entry if it's available. + // Return false if it's not available. + // REQUIRES: Valid() + virtual bool seqno(SequenceNumber* /*seqno*/) const { return false; } + // If an error has occurred, return it. Else return an ok status. // If non-blocking IO is requested and this operation cannot be // satisfied without doing some IO, then this returns Status::Incomplete(). diff --git a/tikv-rocksdb-patches/0003-Compaction-filter-optimization.patch b/tikv-rocksdb-patches/0003-Compaction-filter-optimization.patch new file mode 100644 index 00000000000..2f10800e543 --- /dev/null +++ b/tikv-rocksdb-patches/0003-Compaction-filter-optimization.patch @@ -0,0 +1,735 @@ +From 79426b4f279ed8ca1eddf3da5ceff735ffec520c Mon Sep 17 00:00:00 2001 +From: qupeng +Date: Tue, 2 Jun 2020 14:45:08 +0800 +Subject: [PATCH 3/3] Compaction filter optimization + +compaction_filter: add bottommost_level into context (#160) + +Signed-off-by: qupeng +Signed-off-by: tabokie + +add range for compaction filter context (#192) + +* add range for compaction filter context + +Signed-off-by: qupeng +Signed-off-by: tabokie + +allow no_io for VersionSet::GetTableProperties (#211) + +* allow no_io for VersionSet::GetTableProperties + +Signed-off-by: qupeng +Signed-off-by: tabokie + +expose seqno from compaction filter and iterator (#215) + +This PR supports to access `seqno` for every key/value pairs in compaction filter or iterator. +It's helpful to enhance GC in compaction filter in TiKV. + +Signed-off-by: qupeng +Signed-off-by: tabokie + +allow to query DB stall status (#226) + +This PR adds a new property is-write-stalled to query whether the column family is in write stall or not. + +In TiKV there is a compaction filter used for GC, in which DB::write is called. So if we can query whether the DB instance is stalled or not, we can skip to create more compaction filter instances to save some resources. + +Signed-off-by: qupeng +Signed-off-by: tabokie + +Fix compatibilty issue with Titan + +Signed-off-by: v01dstar + +filter deletion in compaction filter (#344) + +And delay the buffer initialization of writable file to first actual write. + +--------- + +Signed-off-by: tabokie + +Adjustments for compaptibilty with 8.10.facebook + +Signed-off-by: v01dstar + +Adjust tikv related changes with upstream + +Signed-off-by: v01dstar +--- + db/arena_wrapped_db_iter.h | 3 + + db/compact_files_test.cc | 157 +++++++++++++++++++++- + db/compaction/compaction.cc | 9 +- + db/compaction/compaction.h | 3 +- + db/compaction/compaction_iterator.cc | 31 +++-- + db/compaction/compaction_iterator_test.cc | 36 +++++ + db/compaction/compaction_job.cc | 3 +- + db/db_iter.h | 5 + + db/internal_stats.cc | 12 ++ + db/internal_stats.h | 1 + + file/writable_file_writer.cc | 6 +- + file/writable_file_writer.h | 3 +- + include/rocksdb/compaction_filter.h | 42 ++++++ + include/rocksdb/db.h | 4 + + include/rocksdb/iterator.h | 6 + + utilities/blob_db/blob_db_iterator.h | 2 + + utilities/ttl/db_ttl_impl.h | 2 + + 17 files changed, 304 insertions(+), 21 deletions(-) + +diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h +index 678ea3e78..2ac74635f 100644 +--- a/db/arena_wrapped_db_iter.h ++++ b/db/arena_wrapped_db_iter.h +@@ -73,6 +73,9 @@ class ArenaWrappedDBIter : public Iterator { + Slice key() const override { return db_iter_->key(); } + Slice value() const override { return db_iter_->value(); } + const WideColumns& columns() const override { return db_iter_->columns(); } ++ inline bool seqno(SequenceNumber* no) const override { ++ return db_iter_->seqno(no); ++ } + Status status() const override { return db_iter_->status(); } + Slice timestamp() const override { return db_iter_->timestamp(); } + bool IsBlob() const { return db_iter_->IsBlob(); } +diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc +index 2d53f2b99..1e73211e2 100644 +--- a/db/compact_files_test.cc ++++ b/db/compact_files_test.cc +@@ -3,7 +3,6 @@ + // COPYING file in the root directory) and Apache 2.0 License + // (found in the LICENSE.Apache file in the root directory). + +- + #include + #include + #include +@@ -11,6 +10,7 @@ + + #include "db/db_impl/db_impl.h" + #include "port/port.h" ++#include "rocksdb/compaction_filter.h" + #include "rocksdb/db.h" + #include "rocksdb/env.h" + #include "test_util/sync_point.h" +@@ -60,6 +60,79 @@ class FlushedFileCollector : public EventListener { + std::mutex mutex_; + }; + ++class TestFilterFactory : public CompactionFilterFactory { ++ public: ++ std::shared_ptr context_; ++ std::shared_ptr compaction_count_; ++ ++ TestFilterFactory(std::shared_ptr context, ++ std::shared_ptr compaction_count) { ++ this->context_ = context; ++ this->compaction_count_ = compaction_count; ++ } ++ ++ ~TestFilterFactory() {} ++ ++ const char* Name() const { return "TestFilterFactory"; } ++ ++ std::unique_ptr CreateCompactionFilter( ++ const CompactionFilter::Context& context) { ++ context_->start_key = context.start_key; ++ context_->end_key = context.end_key; ++ context_->is_end_key_inclusive = context.is_end_key_inclusive; ++ context_->input_table_properties = context.input_table_properties; ++ *compaction_count_.get() += 1; ++ return nullptr; ++ } ++}; ++ ++TEST_F(CompactFilesTest, FilterContext) { ++ Options options; ++ // to trigger compaction more easily ++ const int kWriteBufferSize = 10000; ++ const int kLevel0Trigger = 10; ++ options.create_if_missing = true; ++ options.compaction_style = kCompactionStyleLevel; ++ // Small slowdown and stop trigger for experimental purpose. ++ options.level0_slowdown_writes_trigger = 20; ++ options.level0_stop_writes_trigger = 20; ++ options.write_buffer_size = kWriteBufferSize; ++ options.level0_file_num_compaction_trigger = kLevel0Trigger; ++ options.compression = kNoCompression; ++ ++ std::shared_ptr expected_context( ++ new CompactionFilter::Context); ++ std::shared_ptr compaction_count(new int(0)); ++ CompactionFilterFactory* factory = ++ new TestFilterFactory(expected_context, compaction_count); ++ options.compaction_filter_factory = ++ std::shared_ptr(factory); ++ ++ DB* db = nullptr; ++ DestroyDB(db_name_, options); ++ Status s = DB::Open(options, db_name_, &db); ++ assert(s.ok()); ++ assert(db); ++ ++ // `Flush` is different from `Compaction`. ++ db->Put(WriteOptions(), std::to_string(1), ""); ++ db->Put(WriteOptions(), std::to_string(51), ""); ++ db->Flush(FlushOptions()); ++ db->Put(WriteOptions(), std::to_string(50), ""); ++ db->Put(WriteOptions(), std::to_string(99), ""); ++ db->Flush(FlushOptions()); ++ ASSERT_EQ(*compaction_count.get(), 0); ++ ++ db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ++ usleep(10000); // Wait for compaction start. ++ ASSERT_EQ(expected_context->start_key, Slice("1")); ++ ASSERT_EQ(expected_context->is_end_key_inclusive, 1); ++ ASSERT_EQ(expected_context->input_table_properties.size(), 2); ++ ASSERT_EQ(*compaction_count.get(), 1); ++ ++ delete (db); ++} ++ + TEST_F(CompactFilesTest, L0ConflictsFiles) { + Options options; + // to trigger compaction more easily +@@ -485,6 +558,87 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) { + delete db; + } + ++TEST_F(CompactFilesTest, IsWriteStalled) { ++ class SlowFilter : public CompactionFilter { ++ public: ++ SlowFilter(std::atomic* would_block) { would_block_ = would_block; } ++ ++ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, ++ std::string* /*new_value*/, ++ bool* /*value_changed*/) const override { ++ while (would_block_->load(std::memory_order_relaxed)) { ++ usleep(10000); ++ } ++ return false; ++ } ++ ++ const char* Name() const override { return "SlowFilter"; } ++ ++ private: ++ std::atomic* would_block_; ++ }; ++ ++ Options options; ++ options.create_if_missing = true; ++ options.delayed_write_rate = 1; ++ ++ ColumnFamilyOptions cf_options; ++ cf_options.level0_slowdown_writes_trigger = 12; ++ cf_options.level0_stop_writes_trigger = 15; ++ cf_options.write_buffer_size = 1024 * 1024; ++ ++ std::atomic compaction_would_block; ++ compaction_would_block.store(true, std::memory_order_relaxed); ++ cf_options.compaction_filter = new SlowFilter(&compaction_would_block); ++ ++ std::vector cfds; ++ cfds.push_back(ColumnFamilyDescriptor("default", cf_options)); ++ ++ DB* db = nullptr; ++ std::vector handles; ++ DestroyDB(db_name_, options); ++ ++ Status s = DB::Open(options, db_name_, cfds, &handles, &db); ++ assert(s.ok()); ++ assert(db); ++ ++ int flushed_l0_files = 0; ++ for (; flushed_l0_files < 100;) { ++ WriteBatch wb; ++ for (int j = 0; j < 100; ++j) { ++ char key[16]; ++ bzero(key, 16); ++ sprintf(key, "foo%.2d", j); ++ ASSERT_OK(wb.Put(handles[0], key, "bar")); ++ } ++ ++ WriteOptions wopts; ++ wopts.no_slowdown = true; ++ s = db->Write(wopts, &wb); ++ if (s.ok()) { ++ FlushOptions fopts; ++ fopts.allow_write_stall = true; ++ ASSERT_OK(db->Flush(fopts)); ++ ++flushed_l0_files; ++ } else { ++ ASSERT_EQ(s.code(), Status::Code::kIncomplete); ++ break; ++ } ++ } ++ ++ // The write loop must be terminated by write stall. ++ ASSERT_EQ(flushed_l0_files, 12); ++ uint64_t stalled = false; ++ db->GetIntProperty(handles[0], "rocksdb.is-write-stalled", &stalled); ++ ASSERT_TRUE(stalled); ++ ++ compaction_would_block.store(false, std::memory_order_relaxed); ++ for (size_t i = 0; i < handles.size(); ++i) { ++ delete handles[i]; ++ } ++ delete (db); ++} ++ + } // namespace ROCKSDB_NAMESPACE + + int main(int argc, char** argv) { +@@ -492,4 +646,3 @@ int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); + } +- +diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc +index bbab8f79f..cda2feaa9 100644 +--- a/db/compaction/compaction.cc ++++ b/db/compaction/compaction.cc +@@ -812,7 +812,8 @@ uint64_t Compaction::OutputFilePreallocationSize() const { + preallocation_size + (preallocation_size / 10)); + } + +-std::unique_ptr Compaction::CreateCompactionFilter() const { ++std::unique_ptr Compaction::CreateCompactionFilter( ++ std::optional start, std::optional end) const { + if (!cfd_->ioptions()->compaction_filter_factory) { + return nullptr; + } +@@ -830,6 +831,12 @@ std::unique_ptr Compaction::CreateCompactionFilter() const { + context.column_family_id = cfd_->GetID(); + context.reason = TableFileCreationReason::kCompaction; + context.input_table_properties = GetInputTableProperties(); ++ context.is_bottommost_level = bottommost_level_; ++ context.start_key = start == std::nullopt ? GetSmallestUserKey() ++ : ExtractUserKey(start.value()); ++ context.end_key = ++ end == std::nullopt ? GetLargestUserKey() : ExtractUserKey(end.value()); ++ context.is_end_key_inclusive = end == std::nullopt; + if (context.input_table_properties.empty()) { + ROCKS_LOG_WARN( + immutable_options_.info_log, +diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h +index 50c75f70b..4c209f8a2 100644 +--- a/db/compaction/compaction.h ++++ b/db/compaction/compaction.h +@@ -311,7 +311,8 @@ class Compaction { + void ResetNextCompactionIndex(); + + // Create a CompactionFilter from compaction_filter_factory +- std::unique_ptr CreateCompactionFilter() const; ++ std::unique_ptr CreateCompactionFilter( ++ std::optional start, std::optional end) const; + + // Create a SstPartitioner from sst_partitioner_factory + std::unique_ptr CreateSstPartitioner() const; +diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc +index 85d1c039b..99d2b7262 100644 +--- a/db/compaction/compaction_iterator.cc ++++ b/db/compaction/compaction_iterator.cc +@@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + } + + if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && +- ikey_.type != kTypeWideColumnEntity) { ++ ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) { + return true; + } + + CompactionFilter::Decision decision = + CompactionFilter::Decision::kUndetermined; +- CompactionFilter::ValueType value_type = +- ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue +- : ikey_.type == kTypeBlobIndex +- ? CompactionFilter::ValueType::kBlobIndex +- : CompactionFilter::ValueType::kWideColumnEntity; ++ CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue; ++ if (ikey_.type == kTypeBlobIndex) { ++ value_type = CompactionFilter::ValueType::kBlobIndex; ++ } else if (ikey_.type == kTypeWideColumnEntity) { ++ value_type = CompactionFilter::ValueType::kWideColumnEntity; ++ } else if (ikey_.type == kTypeDeletion) { ++ value_type = CompactionFilter::ValueType::kDeletion; ++ } + + // Hack: pass internal key to BlobIndexCompactionFilter since it needs + // to get sequence number. +@@ -277,7 +280,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + + // For integrated BlobDB impl, CompactionIterator reads blob value. + // For Stacked BlobDB impl, the corresponding CompactionFilter's +- // FilterV2 method should read the blob value. ++ // FilterV4 method should read the blob value. + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value_); + if (!s.ok()) { +@@ -337,9 +340,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + existing_col = &existing_columns; + } + +- decision = compaction_filter_->FilterV3( +- level_, filter_key, value_type, existing_val, existing_col, +- &compaction_filter_value_, &new_columns, ++ decision = compaction_filter_->UnsafeFilter( ++ level_, filter_key, ikey_.sequence, value_type, existing_val, ++ existing_col, &compaction_filter_value_, &new_columns, + compaction_filter_skip_until_.rep()); + } + +@@ -348,10 +351,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + } + + if (decision == CompactionFilter::Decision::kUndetermined) { +- // Should not reach here, since FilterV2/FilterV3 should never return +- // kUndetermined. ++ // Should not reach here, since FilterV2/FilterV3/FilterV4 should never ++ // return kUndetermined. + status_ = Status::NotSupported( +- "FilterV2/FilterV3 should never return kUndetermined"); ++ "FilterV2/FilterV3/FilterV4 should never return kUndetermined"); + validity_info_.Invalidate(); + return false; + } +@@ -360,7 +363,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= + 0) { + // Can't skip to a key smaller than the current one. +- // Keep the key as per FilterV2/FilterV3 documentation. ++ // Keep the key as per FilterV2/FilterV3/FilterV4 documentation. + decision = CompactionFilter::Decision::kKeep; + } + +diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc +index 699e62969..1cb78c33f 100644 +--- a/db/compaction/compaction_iterator_test.cc ++++ b/db/compaction/compaction_iterator_test.cc +@@ -719,6 +719,42 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) { + ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); + } + ++TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) { ++ struct Filter : public CompactionFilter { ++ Decision UnsafeFilter( ++ int /*level*/, const Slice& key, SequenceNumber /*seq*/, ValueType t, ++ const Slice* /*existing_value*/, const WideColumns*, ++ std::string* /*new_value*/, ++ std::vector>* /*new_columns*/, ++ std::string* skip_until) const override { ++ if (t == ValueType::kDeletion) { ++ *skip_until = key.ToString(); ++ skip_until->back() += 1; ++ filtered += 1; ++ return Decision::kRemoveAndSkipUntil; ++ } ++ return Decision::kKeep; ++ } ++ ++ const char* Name() const override { ++ return "CompactionIteratorTest.SingleDelete::Filter"; ++ } ++ mutable size_t filtered = 0; ++ }; ++ ++ Filter filter; ++ InitIterators( ++ {test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue), ++ test::KeyStr("c", 70, kTypeDeletion), ++ test::KeyStr("c", 50, kTypeDeletion)}, ++ {"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, ++ nullptr, &filter); ++ ++ c_iter_->SeekToFirst(); ++ ASSERT_TRUE(!c_iter_->Valid()); ++ ASSERT_EQ(filter.filtered, 2); ++} ++ + // In bottommost level, values earlier than earliest snapshot can be output + // with sequence = 0. + TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { +diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc +index 99b099759..457b43512 100644 +--- a/db/compaction/compaction_job.cc ++++ b/db/compaction/compaction_job.cc +@@ -1095,7 +1095,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { + std::unique_ptr compaction_filter_from_factory = nullptr; + if (compaction_filter == nullptr) { + compaction_filter_from_factory = +- sub_compact->compaction->CreateCompactionFilter(); ++ sub_compact->compaction->CreateCompactionFilter(sub_compact->start, ++ sub_compact->end); + compaction_filter = compaction_filter_from_factory.get(); + } + if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) { +diff --git a/db/db_iter.h b/db/db_iter.h +index bbcf9ceb6..abb7ab191 100644 +--- a/db/db_iter.h ++++ b/db/db_iter.h +@@ -170,6 +170,11 @@ class DBIter final : public Iterator { + return wide_columns_; + } + ++ bool seqno(SequenceNumber* no) const override { ++ assert(valid_); ++ *no = ikey_.sequence; ++ return true; ++ } + Status status() const override { + if (status_.ok()) { + return iter_.status(); +diff --git a/db/internal_stats.cc b/db/internal_stats.cc +index 6ef4b4302..b4180b959 100644 +--- a/db/internal_stats.cc ++++ b/db/internal_stats.cc +@@ -305,6 +305,7 @@ static const std::string num_running_flushes = "num-running-flushes"; + static const std::string actual_delayed_write_rate = + "actual-delayed-write-rate"; + static const std::string is_write_stopped = "is-write-stopped"; ++static const std::string is_write_stalled = "is-write-stalled"; + static const std::string estimate_oldest_key_time = "estimate-oldest-key-time"; + static const std::string block_cache_capacity = "block-cache-capacity"; + static const std::string block_cache_usage = "block-cache-usage"; +@@ -408,6 +409,8 @@ const std::string DB::Properties::kActualDelayedWriteRate = + rocksdb_prefix + actual_delayed_write_rate; + const std::string DB::Properties::kIsWriteStopped = + rocksdb_prefix + is_write_stopped; ++const std::string DB::Properties::kIsWriteStalled = ++ rocksdb_prefix + is_write_stalled; + const std::string DB::Properties::kEstimateOldestKeyTime = + rocksdb_prefix + estimate_oldest_key_time; + const std::string DB::Properties::kBlockCacheCapacity = +@@ -586,6 +589,9 @@ const UnorderedMap + {DB::Properties::kIsWriteStopped, + {false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr, + nullptr}}, ++ {DB::Properties::kIsWriteStalled, ++ {false, nullptr, &InternalStats::HandleIsWriteStalled, nullptr, ++ nullptr}}, + {DB::Properties::kEstimateOldestKeyTime, + {false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, nullptr, + nullptr}}, +@@ -1463,6 +1469,12 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db, + return true; + } + ++bool InternalStats::HandleIsWriteStalled(uint64_t* value, DBImpl* db, ++ Version* /*version*/) { ++ *value = db->write_controller().NeedsDelay() ? 1 : 0; ++ return true; ++} ++ + bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/, + Version* /*version*/) { + // TODO(yiwu): The property is currently available for fifo compaction +diff --git a/db/internal_stats.h b/db/internal_stats.h +index 85c1a6bb1..465167024 100644 +--- a/db/internal_stats.h ++++ b/db/internal_stats.h +@@ -833,6 +833,7 @@ class InternalStats { + bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db, + Version* version); + bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version); ++ bool HandleIsWriteStalled(uint64_t* value, DBImpl* db, Version* version); + bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db, + Version* version); + bool HandleBlockCacheCapacity(uint64_t* value, DBImpl* db, Version* version); +diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc +index 908878a5f..19926ae08 100644 +--- a/file/writable_file_writer.cc ++++ b/file/writable_file_writer.cc +@@ -69,6 +69,10 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, + io_options, nullptr); + } + ++ // The buffer initialization code previously in ctor. ++ if (buf_.Capacity() == 0) { ++ buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); ++ } + // See whether we need to enlarge the buffer to avoid the flush + if (buf_.Capacity() - buf_.CurrentSize() < left) { + for (size_t cap = buf_.Capacity(); +@@ -183,8 +187,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, + return AssertFalseAndGetStatusForPrevError(); + } + assert(pad_bytes < kDefaultPageSize); +- size_t left = pad_bytes; + size_t cap = buf_.Capacity() - buf_.CurrentSize(); ++ size_t left = std::min(pad_bytes, cap); + size_t pad_start = buf_.CurrentSize(); + + // Assume pad_bytes is small compared to buf_ capacity. So we always +diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h +index aac0f5949..30c278cfb 100644 +--- a/file/writable_file_writer.h ++++ b/file/writable_file_writer.h +@@ -202,7 +202,8 @@ class WritableFileWriter { + TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", + reinterpret_cast(max_buffer_size_)); + buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); +- buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); ++ // Moved to `Append` to reduce memory usage of unused writer. ++ // buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { +diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h +index 1784f2329..fb8a86e34 100644 +--- a/include/rocksdb/compaction_filter.h ++++ b/include/rocksdb/compaction_filter.h +@@ -16,9 +16,11 @@ + + #include "rocksdb/customizable.h" + #include "rocksdb/rocksdb_namespace.h" ++#include "rocksdb/slice.h" + #include "rocksdb/table_properties.h" + #include "rocksdb/types.h" + #include "rocksdb/wide_columns.h" ++#include "types.h" + + namespace ROCKSDB_NAMESPACE { + +@@ -67,6 +69,8 @@ class CompactionFilter : public Customizable { + kBlobIndex, + // Wide-column entity + kWideColumnEntity, ++ // Only used by TiKV's region range filter. ++ kDeletion, + }; + + // Potential decisions that can be returned by the compaction filter's +@@ -164,6 +168,14 @@ class CompactionFilter : public Customizable { + // The lowest level among all the input files (if any) used in table + // creation + int input_start_level = kUnknownStartLevel; ++ // Whether output files are in bottommost level or not. ++ bool is_bottommost_level; ++ ++ // The range of the compaction. ++ Slice start_key; ++ Slice end_key; ++ bool is_end_key_inclusive; ++ + // The column family that will contain the created table file. + uint32_t column_family_id; + // Reason this table file is being created. +@@ -251,6 +263,10 @@ class CompactionFilter : public Customizable { + + case ValueType::kBlobIndex: + return Decision::kKeep; ++ case ValueType::kDeletion: ++ // Should never be reached. ++ assert(false); ++ return Decision::kKeep; + + default: + assert(false); +@@ -300,6 +316,32 @@ class CompactionFilter : public Customizable { + skip_until); + } + ++ virtual Decision FilterV4( ++ int level, const Slice& key, SequenceNumber, ValueType value_type, ++ const Slice* existing_value, const WideColumns* existing_columns, ++ std::string* new_value, ++ std::vector>* new_columns, ++ std::string* skip_until) const { ++ return FilterV3(level, key, value_type, existing_value, existing_columns, ++ new_value, new_columns, skip_until); ++ } ++ ++ // This interface is reserved for TiKV's region range filter. Only this ++ // interface can accept `value_type=kTypeDeletion`. ++ virtual Decision UnsafeFilter( ++ int level, const Slice& key, SequenceNumber seqno, ValueType value_type, ++ const Slice* existing_value, const WideColumns* existing_columns, ++ std::string* new_value, ++ std::vector>* new_columns, ++ std::string* skip_until) const { ++ if (value_type != ValueType::kDeletion) { ++ return FilterV4(level, key, seqno, value_type, existing_value, ++ existing_columns, new_value, new_columns, skip_until); ++ } else { ++ return Decision::kKeep; ++ } ++ } ++ + // Internal (BlobDB) use only. Do not override in application code. + virtual BlobDecision PrepareBlobOutput(const Slice& /* key */, + const Slice& /* existing_value */, +diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h +index 5ae73182b..f41a491cc 100644 +--- a/include/rocksdb/db.h ++++ b/include/rocksdb/db.h +@@ -1215,6 +1215,9 @@ class DB { + // "rocksdb.is-write-stopped" - Return 1 if write has been stopped. + static const std::string kIsWriteStopped; + ++ // "rocksdb.is-write-stalled" - Return 1 if write has been stalled. ++ static const std::string kIsWriteStalled; ++ + // "rocksdb.estimate-oldest-key-time" - returns an estimation of + // oldest key timestamp in the DB. Currently only available for + // FIFO compaction with +@@ -1323,6 +1326,7 @@ class DB { + // "rocksdb.num-running-flushes" + // "rocksdb.actual-delayed-write-rate" + // "rocksdb.is-write-stopped" ++ // "rocksdb.is-write-stalled" + // "rocksdb.estimate-oldest-key-time" + // "rocksdb.block-cache-capacity" + // "rocksdb.block-cache-usage" +diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h +index 8568dd258..017e556ce 100644 +--- a/include/rocksdb/iterator.h ++++ b/include/rocksdb/iterator.h +@@ -23,6 +23,7 @@ + #include "rocksdb/cleanable.h" + #include "rocksdb/slice.h" + #include "rocksdb/status.h" ++#include "rocksdb/types.h" + #include "rocksdb/wide_columns.h" + + namespace ROCKSDB_NAMESPACE { +@@ -102,6 +103,11 @@ class Iterator : public Cleanable { + return kNoWideColumns; + } + ++ // Return the sequence number for the current entry if it's available. ++ // Return false if it's not available. ++ // REQUIRES: Valid() ++ virtual bool seqno(SequenceNumber* /*seqno*/) const { return false; } ++ + // If an error has occurred, return it. Else return an ok status. + // If non-blocking IO is requested and this operation cannot be + // satisfied without doing some IO, then this returns Status::Incomplete(). +diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h +index 4898ddfd7..3e6f33ac3 100644 +--- a/utilities/blob_db/blob_db_iterator.h ++++ b/utilities/blob_db/blob_db_iterator.h +@@ -114,6 +114,8 @@ class BlobDBIterator : public Iterator { + return value_; + } + ++ bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } ++ + // Iterator::Refresh() not supported. + + private: +diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h +index b125d79b0..5f57644a9 100644 +--- a/utilities/ttl/db_ttl_impl.h ++++ b/utilities/ttl/db_ttl_impl.h +@@ -144,6 +144,8 @@ class TtlIterator : public Iterator { + return trimmed_value; + } + ++ bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } ++ + Status status() const override { return iter_->status(); } + + private: +-- +2.43.2 + diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 4898ddfd769..3e6f33ac3fc 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -114,6 +114,8 @@ class BlobDBIterator : public Iterator { return value_; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + // Iterator::Refresh() not supported. private: diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index b125d79b067..5f57644a99a 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -144,6 +144,8 @@ class TtlIterator : public Iterator { return trimmed_value; } + bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); } + Status status() const override { return iter_->status(); } private: