From d6b024760d7b29046f7a648e8468e01020fdb214 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Tue, 12 Jun 2018 22:03:48 -0400 Subject: [PATCH] Relax VersionStorageInfo::GetOverlappingInputs check Do not consider the range tombstone sentinel key as causing 2 adjacent sstables in a level to overlap. When a range tombstone's end key is the largest key in an sstable, the sstable's end key is set to a "sentinel" value that is the smallest key in the next sstable with a sequence number of kMaxSequenceNumber. This "sentinel" is guaranteed to not overlap in internal-key space with the next sstable. Unfortunately, GetOverlappingFiles uses user-keys to determine overlap and was thus considering 2 adjacent sstables in a level to overlap if they were separated by this sentinel key. This in turn would cause compactions to be larger than necessary. This previous behavior of GetOverlappingInputs was necessary due to the following scenario: * Write a delete range [a, d). * After compaction, this deleted range may be added to multiple sst files: a.sst, b.sst, c.sst, but the boundaries of these sst files are [a, b), [b, c), [c, d). * When a.sst and b.sst reach the bottommost level, the delete range of the sst files will be dropped. * Write a new key in the range [a, c). * When the newly written key [a, c) reaches the bottommost level, its sequence number will be set to zero. * When the front c.sst compacts with the key in the range [a, c) the sequence number of that key is zero, and the key will be incorrectly dropped. That scenario no longer occurs because we are truncating range deletion tombstones to sstable boundaries when adding them to RangeDelAggregator. --- db/db_range_del_test.cc | 104 +++++++++++++++++++++++++-- db/dbformat.h | 16 ++++- db/dbformat_test.cc | 7 ++ db/version_set.cc | 156 ++++++++++++++++++++++++++-------------- db/version_set.h | 24 +++---- db/version_set_test.cc | 64 +++++++++++++++++ 6 files changed, 298 insertions(+), 73 deletions(-) diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index cba20cbceb7..dfcabf71a5c 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -935,11 +935,14 @@ TEST_F(DBRangeDelTest, MemtableBloomFilter) { } TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) { - // make sure compaction treats files containing a split range deletion in the - // input level as an atomic unit. I.e., compacting any input-level file(s) - // containing a portion of the range deletion causes all other input-level - // files containing portions of that same range deletion to be included in the - // compaction. + // This test originally verified that compaction treated files containing a + // split range deletion in the input level as an atomic unit. I.e., + // compacting any input-level file(s) containing a portion of the range + // deletion causes all other input-level files containing portions of that + // same range deletion to be included in the compaction. Range deletion + // tombstones are now truncated to sstable boundaries which removed the need + // for that behavior (which could lead to excessively large + // compactions). const int kNumFilesPerLevel = 4, kValueBytes = 4 << 10; Options options = CurrentOptions(); options.compression = kNoCompression; @@ -986,22 +989,111 @@ TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) { if (i == 0) { ASSERT_OK(db_->CompactFiles( CompactionOptions(), {meta.levels[1].files[0].name}, 2 /* level */)); + ASSERT_EQ(0, NumTableFilesAtLevel(1)); } else if (i == 1) { auto begin_str = Key(0), end_str = Key(1); Slice begin = begin_str, end = end_str; ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &begin, &end)); + ASSERT_EQ(3, NumTableFilesAtLevel(1)); } else if (i == 2) { ASSERT_OK(db_->SetOptions(db_->DefaultColumnFamily(), {{"max_bytes_for_level_base", "10000"}})); dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(1, NumTableFilesAtLevel(1)); } - ASSERT_EQ(0, NumTableFilesAtLevel(1)); ASSERT_GT(NumTableFilesAtLevel(2), 0); db_->ReleaseSnapshot(snapshot); } } +TEST_F(DBRangeDelTest, RangeTombstoneEndKeyAsSstableUpperBound) { + // Test the handling of the range-tombstone end-key as the + // upper-bound for an sstable. + + const int kNumFilesPerLevel = 2, kValueBytes = 4 << 10; + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = kNumFilesPerLevel; + options.memtable_factory.reset( + new SpecialSkipListFactory(2 /* num_entries_flush */)); + options.target_file_size_base = kValueBytes; + options.disable_auto_compactions = true; + + DestroyAndReopen(options); + + // Create an initial sstable at L2: + // [key000000#1,1, key000000#1,1] + ASSERT_OK(Put(Key(0), "")); + ASSERT_OK(db_->Flush(FlushOptions())); + MoveFilesToLevel(2); + ASSERT_EQ(1, NumTableFilesAtLevel(2)); + + // A snapshot protects the range tombstone from dropping due to + // becoming obsolete. + const Snapshot* snapshot = db_->GetSnapshot(); + db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key(0), Key(2 * kNumFilesPerLevel)); + + // Create 2 additional sstables in L0. Note that the first sstable + // contains the range tombstone. + // [key000000#3,1, key000004#72057594037927935,15] + // [key000001#5,1, key000002#6,1] + Random rnd(301); + std::string value = RandomString(&rnd, kValueBytes); + for (int j = 0; j < kNumFilesPerLevel; ++j) { + // Give files overlapping key-ranges to prevent a trivial move when we + // compact from L0 to L1. + ASSERT_OK(Put(Key(j), value)); + ASSERT_OK(Put(Key(2 * kNumFilesPerLevel - 1 - j), value)); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(j + 1, NumTableFilesAtLevel(0)); + } + // Compact the 2 L0 sstables to L1, resulting in the following LSM. There + // are 2 sstables generated in L1 due to the target_file_size_base setting. + // L1: + // [key000000#3,1, key000002#72057594037927935,15] + // [key000002#6,1, key000004#72057594037927935,15] + // L2: + // [key000000#1,1, key000000#1,1] + MoveFilesToLevel(1); + ASSERT_EQ(2, NumTableFilesAtLevel(1)); + + { + // Compact the second sstable in L1: + // L1: + // [key000000#3,1, key000002#72057594037927935,15] + // L2: + // [key000000#1,1, key000000#1,1] + // [key000002#6,1, key000004#72057594037927935,15] + auto begin_str = Key(3); + const rocksdb::Slice begin = begin_str; + dbfull()->TEST_CompactRange(1, &begin, nullptr); + ASSERT_EQ(1, NumTableFilesAtLevel(1)); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + } + + { + // Compact the first sstable in L1. This should be copacetic, but + // was previously resulting in overlapping sstables in L2 due to + // mishandling of the range tombstone end-key when used as the + // largest key for an sstable. The resulting LSM structure should + // be: + // + // L2: + // [key000000#1,1, key000001#72057594037927935,15] + // [key000001#5,1, key000002#72057594037927935,15] + // [key000002#6,1, key000004#72057594037927935,15] + auto begin_str = Key(0); + const rocksdb::Slice begin = begin_str; + dbfull()->TEST_CompactRange(1, &begin, &begin); + ASSERT_EQ(0, NumTableFilesAtLevel(1)); + ASSERT_EQ(3, NumTableFilesAtLevel(2)); + } + + db_->ReleaseSnapshot(snapshot); +} + TEST_F(DBRangeDelTest, UnorderedTombstones) { // Regression test for #2752. Range delete tombstones between // different snapshot stripes are not stored in order, so the first diff --git a/db/dbformat.h b/db/dbformat.h index 52e668d1d0e..35147ff7040 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -135,10 +135,14 @@ inline Slice ExtractUserKey(const Slice& internal_key) { return Slice(internal_key.data(), internal_key.size() - 8); } -inline ValueType ExtractValueType(const Slice& internal_key) { +inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) { assert(internal_key.size() >= 8); const size_t n = internal_key.size(); - uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return DecodeFixed64(internal_key.data() + n - 8); +} + +inline ValueType ExtractValueType(const Slice& internal_key) { + uint64_t num = ExtractInternalKeyFooter(internal_key); unsigned char c = num & 0xff; return static_cast(c); } @@ -601,9 +605,15 @@ struct RangeTombstone { return InternalKey(start_key_, seq_, kTypeRangeDeletion); } + // The tombstone end-key is exclusive, so we generate an internal-key here + // which has a similar property. Using kMaxSequenceNumber guarantees that + // the returned internal-key will compare less than any other internal-key + // with the same user-key. This in turn guarantees that the serialized + // end-key for a tombstone such as [a-b] will compare less than the key "b". + // // be careful to use SerializeEndKey(), allocates new memory InternalKey SerializeEndKey() const { - return InternalKey(end_key_, seq_, kTypeRangeDeletion); + return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion); } }; diff --git a/db/dbformat_test.cc b/db/dbformat_test.cc index d96b5757afd..0b16c13f573 100644 --- a/db/dbformat_test.cc +++ b/db/dbformat_test.cc @@ -192,6 +192,13 @@ TEST_F(FormatTest, UpdateInternalKey) { ASSERT_EQ(new_val_type, decoded.type); } +TEST_F(FormatTest, RangeTombstoneSerializeEndKey) { + RangeTombstone t("a", "b", 2); + InternalKey k("b", 3, kTypeValue); + const InternalKeyComparator cmp(BytewiseComparator()); + ASSERT_LT(cmp.Compare(t.SerializeEndKey(), k), 0); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 1ce801a9def..f22aafcd38f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2050,8 +2050,8 @@ void VersionStorageInfo::GetOverlappingInputs( *file_index = -1; } const Comparator* user_cmp = user_comparator_; - if (begin != nullptr && end != nullptr && level > 0) { - GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs, + if (level > 0) { + GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index, file_index); return; } @@ -2106,24 +2106,73 @@ void VersionStorageInfo::GetCleanInputsWithinInterval( return; } - Slice user_begin, user_end; const auto& level_files = level_files_brief_[level]; if (begin == nullptr) { - user_begin = ExtractUserKey(level_files.files[0].smallest_key); - } else { - user_begin = begin->user_key(); + begin = &level_files.files[0].file_metadata->smallest; } if (end == nullptr) { - user_end = ExtractUserKey( - level_files.files[level_files.num_files - 1].largest_key); - } else { - user_end = end->user_key(); + end = &level_files.files[level_files.num_files - 1].file_metadata->largest; } - GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs, + + GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index, file_index, true /* within_interval */); } +namespace { + +const uint64_t kRangeTombstoneSentinel = + PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); + +// Utility for comparing sstable boundary keys. Returns -1 if either a or b is +// null which provides the property that a==null indicates a key that is less +// than any key and b==null indicates a key that is greater than any key. Note +// that the comparison is performed primarily on the user-key portion of the +// key. If the user-keys compare equal, an additional test is made to sort +// range tombstone sentinel keys before other keys with the same user-key. The +// result is that 2 user-keys will compare equal if they differ purely on +// their sequence number and value, but the range tombstone sentinel for that +// user-key will compare not equal. This is necessary because the range +// tombstone sentinel key is set as the largest key for an sstable even though +// that key never appears in the database. We don't want adjacent sstables to +// be considered overlapping if they are separated by the range tombstone +// sentinel. +int sstableKeyCompare(const Comparator* user_cmp, + const InternalKey& a, const InternalKey& b) { + auto c = user_cmp->Compare(a.user_key(), b.user_key()); + if (c != 0) { + return c; + } + auto a_footer = ExtractInternalKeyFooter(a.Encode()); + auto b_footer = ExtractInternalKeyFooter(b.Encode()); + if (a_footer == kRangeTombstoneSentinel) { + if (b_footer != kRangeTombstoneSentinel) { + return -1; + } + } else if (b_footer == kRangeTombstoneSentinel) { + return 1; + } + return 0; +} + +int sstableKeyCompare(const Comparator* user_cmp, + const InternalKey* a, const InternalKey& b) { + if (a == nullptr) { + return -1; + } + return sstableKeyCompare(user_cmp, *a, b); +} + +int sstableKeyCompare(const Comparator* user_cmp, + const InternalKey& a, const InternalKey* b) { + if (b == nullptr) { + return -1; + } + return sstableKeyCompare(user_cmp, a, *b); +} + +} // namespace + // Store in "*inputs" all files in "level" that overlap [begin,end] // Employ binary search to find at least one file that overlaps the // specified range. From that file, iterate backwards and @@ -2132,7 +2181,7 @@ void VersionStorageInfo::GetCleanInputsWithinInterval( // within range [begin, end]. "clean" means there is a boudnary // between the files in "*inputs" and the surrounding files void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( - int level, const Slice& user_begin, const Slice& user_end, + int level, const InternalKey* begin, const InternalKey* end, std::vector* inputs, int hint_index, int* file_index, bool within_interval) const { assert(level > 0); @@ -2140,7 +2189,7 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( int mid = 0; int max = static_cast(files_[level].size()) - 1; bool foundOverlap = false; - const Comparator* user_cmp = user_comparator_; + auto user_cmp = user_comparator_; // if the caller already knows the index of a file that has overlap, // then we can skip the binary search. @@ -2152,15 +2201,15 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( while (!foundOverlap && min <= max) { mid = (min + max)/2; FdWithKeyRange* f = &(level_files_brief_[level].files[mid]); - const Slice file_start = ExtractUserKey(f->smallest_key); - const Slice file_limit = ExtractUserKey(f->largest_key); - if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) || - (within_interval && user_cmp->Compare(file_start, user_begin) < 0)) { + auto& smallest = f->file_metadata->smallest; + auto& largest = f->file_metadata->largest; + if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) || + (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) { min = mid + 1; } else if ((!within_interval && - user_cmp->Compare(user_end, file_start) < 0) || + sstableKeyCompare(user_cmp, smallest, end) > 0) || (within_interval && - user_cmp->Compare(user_end, file_limit) < 0)) { + sstableKeyCompare(user_cmp, largest, end) > 0)) { max = mid - 1; } else { foundOverlap = true; @@ -2179,10 +2228,10 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( int start_index, end_index; if (within_interval) { - ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index, - &end_index); + ExtendFileRangeWithinInterval(level, begin, end, mid, + &start_index, &end_index); } else { - ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid, + ExtendFileRangeOverlappingInterval(level, begin, end, mid, &start_index, &end_index); assert(end_index >= start_index); } @@ -2199,21 +2248,28 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( // and forward to find all overlapping files. // Use FileLevel in searching, make it faster void VersionStorageInfo::ExtendFileRangeOverlappingInterval( - int level, const Slice& user_begin, const Slice& user_end, + int level, const InternalKey* begin, const InternalKey* end, unsigned int mid_index, int* start_index, int* end_index) const { - const Comparator* user_cmp = user_comparator_; + auto user_cmp = user_comparator_; const FdWithKeyRange* files = level_files_brief_[level].files; #ifndef NDEBUG { // assert that the file at mid_index overlaps with the range assert(mid_index < level_files_brief_[level].num_files); const FdWithKeyRange* f = &files[mid_index]; - const Slice fstart = ExtractUserKey(f->smallest_key); - const Slice flimit = ExtractUserKey(f->largest_key); - if (user_cmp->Compare(fstart, user_begin) >= 0) { - assert(user_cmp->Compare(fstart, user_end) <= 0); + auto& smallest = f->file_metadata->smallest; + auto& largest = f->file_metadata->largest; + if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) { + assert(sstableKeyCompare(user_cmp, smallest, end) <= 0); } else { - assert(user_cmp->Compare(flimit, user_begin) >= 0); + // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n", + // begin ? begin->DebugString().c_str() : "(null)", + // end ? end->DebugString().c_str() : "(null)", + // smallest->DebugString().c_str(), + // largest->DebugString().c_str(), + // sstableKeyCompare(user_cmp, smallest, begin), + // sstableKeyCompare(user_cmp, largest, begin)); + assert(sstableKeyCompare(user_cmp, begin, largest) <= 0); } } #endif @@ -2225,8 +2281,8 @@ void VersionStorageInfo::ExtendFileRangeOverlappingInterval( // check backwards from 'mid' to lower indices for (int i = mid_index; i >= 0 ; i--) { const FdWithKeyRange* f = &files[i]; - const Slice file_limit = ExtractUserKey(f->largest_key); - if (user_cmp->Compare(file_limit, user_begin) >= 0) { + auto& largest = f->file_metadata->largest; + if (sstableKeyCompare(user_cmp, begin, largest) <= 0) { *start_index = i; assert((count++, true)); } else { @@ -2237,8 +2293,8 @@ void VersionStorageInfo::ExtendFileRangeOverlappingInterval( for (unsigned int i = mid_index+1; i < level_files_brief_[level].num_files; i++) { const FdWithKeyRange* f = &files[i]; - const Slice file_start = ExtractUserKey(f->smallest_key); - if (user_cmp->Compare(file_start, user_end) <= 0) { + auto& smallest = f->file_metadata->smallest; + if (sstableKeyCompare(user_cmp, smallest, end) <= 0) { assert((count++, true)); *end_index = i; } else { @@ -2256,39 +2312,36 @@ void VersionStorageInfo::ExtendFileRangeOverlappingInterval( // the clean range required. // Use FileLevel in searching, make it faster void VersionStorageInfo::ExtendFileRangeWithinInterval( - int level, const Slice& user_begin, const Slice& user_end, + int level, const InternalKey* begin, const InternalKey* end, unsigned int mid_index, int* start_index, int* end_index) const { assert(level != 0); - const Comparator* user_cmp = user_comparator_; + auto* user_cmp = user_comparator_; const FdWithKeyRange* files = level_files_brief_[level].files; #ifndef NDEBUG { // assert that the file at mid_index is within the range assert(mid_index < level_files_brief_[level].num_files); const FdWithKeyRange* f = &files[mid_index]; - const Slice fstart = ExtractUserKey(f->smallest_key); - const Slice flimit = ExtractUserKey(f->largest_key); - assert(user_cmp->Compare(fstart, user_begin) >= 0 && - user_cmp->Compare(flimit, user_end) <= 0); + auto& smallest = f->file_metadata->smallest; + auto& largest = f->file_metadata->largest; + assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 && + sstableKeyCompare(user_cmp, largest, end) <= 0); } #endif - ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index, + ExtendFileRangeOverlappingInterval(level, begin, end, mid_index, start_index, end_index); int left = *start_index; int right = *end_index; // shrink from left to right while (left <= right) { - const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key); - if (user_cmp->Compare(first_key_in_range, user_begin) < 0) { + auto& smallest = files[left].file_metadata->smallest; + if (sstableKeyCompare(user_cmp, begin, smallest) > 0) { left++; continue; } if (left > 0) { // If not first file - const Slice& last_key_before = - ExtractUserKey(files[left - 1].largest_key); - if (user_cmp->Equal(first_key_in_range, last_key_before)) { - // The first user key in range overlaps with the previous file's last - // key + auto& largest = files[left - 1].file_metadata->largest; + if (sstableKeyCompare(user_cmp, smallest, largest) == 0) { left++; continue; } @@ -2297,16 +2350,15 @@ void VersionStorageInfo::ExtendFileRangeWithinInterval( } // shrink from right to left while (left <= right) { - const Slice last_key_in_range = ExtractUserKey(files[right].largest_key); - if (user_cmp->Compare(last_key_in_range, user_end) > 0) { + auto& largest = files[right].file_metadata->largest; + if (sstableKeyCompare(user_cmp, largest, end) > 0) { right--; continue; } if (right < static_cast(level_files_brief_[level].num_files) - 1) { // If not the last file - const Slice first_key_after = - ExtractUserKey(files[right + 1].smallest_key); - if (user_cmp->Equal(last_key_in_range, first_key_after)) { + auto& smallest = files[right + 1].file_metadata->smallest; + if (sstableKeyCompare(user_cmp, smallest, largest) == 0) { // The last user key in range overlaps with the next file's first key right--; continue; diff --git a/db/version_set.h b/db/version_set.h index 832857f6334..dd991b2dd9c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -197,8 +197,8 @@ class VersionStorageInfo { void GetOverlappingInputsRangeBinarySearch( int level, // level > 0 - const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys + const InternalKey* begin, // nullptr means before all keys + const InternalKey* end, // nullptr means after all keys std::vector* inputs, int hint_index, // index of overlap file int* file_index, // return index of overlap file @@ -207,20 +207,20 @@ class VersionStorageInfo { void ExtendFileRangeOverlappingInterval( int level, - const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys - unsigned int index, // start extending from this index - int* startIndex, // return the startIndex of input range - int* endIndex) // return the endIndex of input range + const InternalKey* begin, // nullptr means before all keys + const InternalKey* end, // nullptr means after all keys + unsigned int index, // start extending from this index + int* startIndex, // return the startIndex of input range + int* endIndex) // return the endIndex of input range const; void ExtendFileRangeWithinInterval( int level, - const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys - unsigned int index, // start extending from this index - int* startIndex, // return the startIndex of input range - int* endIndex) // return the endIndex of input range + const InternalKey* begin, // nullptr means before all keys + const InternalKey* end, // nullptr means after all keys + unsigned int index, // start extending from this index + int* startIndex, // return the startIndex of input range + int* endIndex) // return the endIndex of input range const; // Returns true iff some file in the specified level overlaps diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 090e074cf0d..b1cbe652038 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -9,6 +9,7 @@ #include "db/version_set.h" #include "util/logging.h" +#include "util/string_util.h" #include "util/testharness.h" #include "util/testutil.h" @@ -137,6 +138,35 @@ class VersionStorageInfoTest : public testing::Test { f->num_deletions = 0; vstorage_.AddFile(level, f); } + + void Add(int level, uint32_t file_number, const InternalKey& smallest, + const InternalKey& largest, uint64_t file_size = 0) { + assert(level < vstorage_.num_levels()); + FileMetaData* f = new FileMetaData; + f->fd = FileDescriptor(file_number, 0, file_size); + f->smallest = smallest; + f->largest = largest; + f->compensated_file_size = file_size; + f->refs = 0; + f->num_entries = 0; + f->num_deletions = 0; + vstorage_.AddFile(level, f); + } + + std::string GetOverlappingFiles(int level, const InternalKey& begin, + const InternalKey& end) { + std::vector inputs; + vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs); + + std::string result; + for (size_t i = 0; i < inputs.size(); ++i) { + if (i > 0) { + result += ","; + } + AppendNumberTo(&result, inputs[i]->fd.GetNumber()); + } + return result; + } }; TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) { @@ -259,6 +289,40 @@ TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) { ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize()); } +TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { + // Two files that overlap at the range deletion tombstone sentinel. + Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1); + Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1); + // Two files that overlap at the same user key. + Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1); + Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1); + // Two files that do not overlap. + Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1); + Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1); + vstorage_.UpdateNumNonEmptyLevels(); + vstorage_.GenerateLevelFilesBrief(); + + ASSERT_EQ("1,2", GetOverlappingFiles( + 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue})); + ASSERT_EQ("1", GetOverlappingFiles( + 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("2", GetOverlappingFiles( + 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue})); + ASSERT_EQ("3", GetOverlappingFiles( + 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue})); + ASSERT_EQ("5", GetOverlappingFiles( + 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue})); + ASSERT_EQ("6", GetOverlappingFiles( + 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); +} + + class FindLevelFileTest : public testing::Test { public: LevelFilesBrief file_level_;