-
Notifications
You must be signed in to change notification settings - Fork 6.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix corruption with intra-L0 on ingested files #5958
Changes from 4 commits
1c28bb7
18c7b1a
45cd5f6
21f132b
0da1373
2b90d27
cf58bd0
950a7a4
caafdb3
d66cd3f
c83af5c
8b306fe
7d30d7a
eaae847
b311481
67bd957
febcccb
9af6cdb
e2ad348
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
#include "port/stack_trace.h" | ||
#include "rocksdb/concurrent_task_limiter.h" | ||
#include "rocksdb/experimental.h" | ||
#include "rocksdb/sst_file_writer.h" | ||
#include "rocksdb/utilities/convenience.h" | ||
#include "test_util/fault_injection_test_env.h" | ||
#include "test_util/sync_point.h" | ||
|
@@ -4658,6 +4659,7 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { | |
ASSERT_OK(dbfull()->TEST_WaitForCompact()); | ||
Close(); | ||
} | ||
|
||
TEST_F(DBCompactionTest, ConsistencyFailTest) { | ||
Options options = CurrentOptions(); | ||
DestroyAndReopen(options); | ||
|
@@ -4683,6 +4685,153 @@ TEST_F(DBCompactionTest, ConsistencyFailTest) { | |
ASSERT_NOK(Put("foo", "bar")); | ||
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | ||
} | ||
|
||
void IngestOneKeyValue(DBImpl* db, const std::string& key, const std::string& value, const Options& options) { | ||
ExternalSstFileInfo info; | ||
std::string f = test::PerThreadDBPath("sst_file" + key); | ||
EnvOptions env; | ||
rocksdb::SstFileWriter writer(env, options); | ||
auto s = writer.Open(f); | ||
ASSERT_OK(s); | ||
// ASSERT_OK(writer.Put(Key(), "")); | ||
ASSERT_OK(writer.Put(key, value)); | ||
|
||
ASSERT_OK(writer.Finish(&info)); | ||
IngestExternalFileOptions ingest_opt; | ||
|
||
ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt)); | ||
} | ||
|
||
TEST_P(DBCompactionTestWithParam, FlushAfterL0IntraCompactionCheckConsistencyFail) { | ||
Options options = CurrentOptions(); | ||
options.force_consistency_checks = true; | ||
options.compression = kNoCompression; | ||
options.level0_file_num_compaction_trigger = 5; | ||
options.max_background_compactions = 2; | ||
options.max_subcompactions = max_subcompactions_; | ||
DestroyAndReopen(options); | ||
|
||
const size_t kValueSize = 1 << 20; | ||
Random rnd(301); | ||
std::string value(RandomString(&rnd, kValueSize)); | ||
|
||
rocksdb::SyncPoint::GetInstance()->LoadDependency( | ||
{{"LevelCompactionPicker::PickCompactionBySize:0", | ||
"CompactionJob::Run():Start"}}); | ||
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | ||
|
||
// prevents trivial move | ||
for (int i = 0; i < 10; ++i) { | ||
ASSERT_OK(Put(Key(i), "")); // prevents trivial move | ||
} | ||
ASSERT_OK(Flush()); | ||
Compact("", Key(99)); | ||
ASSERT_EQ(0, NumTableFilesAtLevel(0)); | ||
|
||
// Flush 5 L0 sst. | ||
for (int i = 0; i < 5; ++i) { | ||
ASSERT_OK(Put(Key(i + 1), value)); | ||
ASSERT_OK(Flush()); | ||
} | ||
ASSERT_EQ(5, NumTableFilesAtLevel(0)); | ||
|
||
// Put one key, to make smallest log sequence number in this memtable is less than sst which would be ingested in next step. | ||
ASSERT_OK(Put(Key(0), "a")); | ||
|
||
ASSERT_EQ(5, NumTableFilesAtLevel(0)); | ||
|
||
// Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction. | ||
for (int i = 5; i < 10; i ++) { | ||
IngestOneKeyValue(dbfull(), Key(i), value, options); | ||
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); | ||
} | ||
|
||
// Put one key, to make biggest log sequence number in this memtable is bigger than sst which would be ingested in next step. | ||
ASSERT_OK(Put(Key(2), "b")); | ||
ASSERT_EQ(10, NumTableFilesAtLevel(0)); | ||
dbfull()->TEST_WaitForCompact(); | ||
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | ||
std::vector<std::vector<FileMetaData>> level_to_files; | ||
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), | ||
&level_to_files); | ||
ASSERT_GT(level_to_files[0].size(), 0); | ||
|
||
ASSERT_OK(Flush()); | ||
} | ||
|
||
TEST_P(DBCompactionTestWithParam, IntraL0CompactionAfterFlushCheckConsistencyFail) { | ||
Options options = CurrentOptions(); | ||
options.force_consistency_checks = true; | ||
options.compression = kNoCompression; | ||
options.level0_file_num_compaction_trigger = 5; | ||
options.max_background_compactions = 2; | ||
options.max_subcompactions = max_subcompactions_; | ||
options.write_buffer_size = 2 << 20; | ||
options.max_write_buffer_number = 6; | ||
DestroyAndReopen(options); | ||
|
||
const size_t kValueSize = 1 << 20; | ||
Random rnd(301); | ||
std::string value(RandomString(&rnd, kValueSize)); | ||
|
||
// prevents trivial move | ||
for (int i = 0; i < 10; ++i) { | ||
ASSERT_OK(Put(Key(i), "")); // prevents trivial move | ||
} | ||
ASSERT_OK(Flush()); | ||
Compact("", Key(99)); | ||
ASSERT_EQ(0, NumTableFilesAtLevel(0)); | ||
rocksdb::SyncPoint::GetInstance()->LoadDependency( | ||
{{"LevelCompactionPicker::PickCompactionBySize:0", | ||
"CompactionJob::Run():Start"}}); | ||
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | ||
|
||
// Make 6 L0 sst. | ||
for (int i = 0; i < 6; ++i) { | ||
if (i % 2 == 0) { | ||
IngestOneKeyValue(dbfull(), Key(i), value, options); | ||
} else { | ||
ASSERT_OK(Put(Key(i), value)); | ||
ASSERT_OK(Flush()); | ||
} | ||
} | ||
|
||
ASSERT_EQ(6, NumTableFilesAtLevel(0)); | ||
|
||
// Stop run flush job | ||
env_->SetBackgroundThreads(1, Env::HIGH); | ||
test::SleepingBackgroundTask sleeping_tasks; | ||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, | ||
&sleeping_tasks, Env::Priority::HIGH); | ||
sleeping_tasks.WaitUntilSleeping(); | ||
|
||
|
||
// Put many keys to make memtable request to flush | ||
for (int i = 0; i < 6; ++i) { | ||
ASSERT_OK(Put(Key(i), value+value)); | ||
} | ||
|
||
ASSERT_EQ(6, NumTableFilesAtLevel(0)); | ||
// ingest file to trigger IntraL0Compaction | ||
for (int i = 6; i < 10; i ++) { | ||
ASSERT_EQ(i, NumTableFilesAtLevel(0)); | ||
IngestOneKeyValue(dbfull(), Key(i), value, options); | ||
} | ||
ASSERT_EQ(10, NumTableFilesAtLevel(0)); | ||
|
||
// Wake up flush job | ||
sleeping_tasks.WakeUp(); | ||
sleeping_tasks.WaitUntilDone(); | ||
|
||
|
||
dbfull()->TEST_WaitForCompact(); | ||
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | ||
uint64_t error_count = 0; | ||
db_->GetIntProperty("rocksdb.background-errors", &error_count); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In these two tests, other than asserting no error, can we validate Get() the keys we inserted returns the correct results? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do either of these tests expose the corruption bug that is present before the fix? That is, if we set I modified this test setup so |
||
ASSERT_EQ(error_count, 0); | ||
} | ||
|
||
|
||
#endif // !defined(ROCKSDB_LITE) | ||
} // namespace rocksdb | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add several tests to cover more scenarios in FindIntraL0Compaction()? For example, when being_compacted shows up in L0? Also, it will be nice to directly cover the earliest_mem_seqno scenarios in tests here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have add a tests for
FindIntraL0Compaction
, which make sure that it would skip sst which of largest lsn is larger than earliest_seqno.