diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6621b20f64a9..97de4541db43 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2071,15 +2071,18 @@ class DBImpl : public DB { bool read_only, int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed); + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info); void SetupLogFileProcessing(uint64_t wal_number); - Status InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader); + Status InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, + bool* const old_log_record, Status* const reporter_status, + DBOpenLogReporter* reporter, std::unique_ptr& reader); Status ProcessLogRecord( Slice record, const std::unique_ptr& reader, const UnorderedMap& running_ts_sz, uint64_t wal_number, @@ -2116,8 +2119,13 @@ class DBImpl : public DB { bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number, bool* corrupted_wal_found); - void FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status); + Status UpdatePredecessorWALInfo(uint64_t wal_number, + const SequenceNumber* next_sequence, + const std::string& fname, + PredecessorWALInfo& predecessor_wal_info); + + void FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence); // Return `Status::Corruption()` when `stop_replay_for_corruption == true` and // exits inconsistency between SST and WAL data @@ -2309,7 +2317,8 @@ class DBImpl : public DB { const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, @@ -2554,6 +2563,7 @@ class DBImpl : public DB { IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log); // Validate self-consistency of DB options diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d3c472640c6b..5adcb451665b 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -754,6 +754,11 @@ Status DBImpl::Recover( } } + if (!is_new_db && immutable_db_options_.track_and_verify_wals && + wal_files.empty()) { + return Status::Corruption("Opening an existing DB with no WAL files"); + } + if (immutable_db_options_.track_and_verify_wals_in_manifest) { if (!immutable_db_options_.best_efforts_recovery) { // Verify WALs in MANIFEST. @@ -1189,14 +1194,15 @@ Status DBImpl::ProcessLogFiles( bool stop_replay_for_corruption = false; bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; + PredecessorWALInfo predecessor_wal_info; for (auto wal_number : wal_numbers) { if (status.ok()) { - status = - ProcessLogFile(wal_number, min_wal_number, is_retry, read_only, - job_id, next_sequence, &stop_replay_for_corruption, - &stop_replay_by_wal_filter, &corrupted_wal_number, - corrupted_wal_found, version_edits, &flushed); + status = ProcessLogFile( + wal_number, min_wal_number, is_retry, read_only, job_id, + next_sequence, &stop_replay_for_corruption, + &stop_replay_by_wal_filter, &corrupted_wal_number, + corrupted_wal_found, version_edits, &flushed, predecessor_wal_info); } } @@ -1217,7 +1223,8 @@ Status DBImpl::ProcessLogFile( int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed) { + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info) { assert(stop_replay_by_wal_filter); // Variable initialization starts @@ -1264,7 +1271,10 @@ Status DBImpl::ProcessLogFile( } Status init_status = InitializeLogReader( - wal_number, is_retry, fname, &old_log_record, &status, &reporter, reader); + wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number, + predecessor_wal_info, &old_log_record, &status, &reporter, reader); + + // FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases if (!init_status.ok()) { assert(status.ok()); status.PermitUncheckedError(); @@ -1272,6 +1282,8 @@ Status DBImpl::ProcessLogFile( } else if (reader == nullptr) { // TODO(hx235): remove this case since it's confusing assert(status.ok()); + // Fail initializing log reader for one log file with an ok status. + // Try next one. return status; } @@ -1311,13 +1323,19 @@ Status DBImpl::ProcessLogFile( "Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number, *next_sequence); + if (status.ok()) { + status = UpdatePredecessorWALInfo(wal_number, next_sequence, fname, + predecessor_wal_info); + } + if (!status.ok() || old_log_record) { status = HandleNonOkStatusOrOldLogRecord( wal_number, next_sequence, status, &old_log_record, stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found); } - FinishLogFileProcessing(next_sequence, status); + FinishLogFileProcessing(status, next_sequence); + return status; } @@ -1332,12 +1350,12 @@ void DBImpl::SetupLogFileProcessing(uint64_t wal_number) { static_cast(immutable_db_options_.wal_recovery_mode)); } -Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, - bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader) { +Status DBImpl::InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record, + Status* const reporter_status, DBOpenLogReporter* reporter, + std::unique_ptr& reader) { assert(old_log_record); assert(reporter_status); assert(reporter); @@ -1375,9 +1393,11 @@ Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - reader.reset(new log::Reader(immutable_db_options_.info_log, - std::move(file_reader), reporter, - true /*checksum*/, wal_number)); + reader.reset(new log::Reader( + immutable_db_options_.info_log, std::move(file_reader), reporter, + true /*checksum*/, wal_number, + immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption, + min_wal_number, predecessor_wal_info)); return status; } @@ -1630,8 +1650,26 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord( return status; } } -void DBImpl::FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status) { + +Status DBImpl::UpdatePredecessorWALInfo( + uint64_t wal_number, const SequenceNumber* next_sequence, + const std::string& fname, PredecessorWALInfo& predecessor_wal_info) { + uint64_t bytes; + + Status s = env_->GetFileSize(fname, &bytes); + if (!s.ok()) { + return s; + } + + assert(next_sequence); + predecessor_wal_info = PredecessorWALInfo( + wal_number, bytes, + *next_sequence == kMaxSequenceNumber ? 0 : *next_sequence - 1); + return s; +} + +void DBImpl::FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence) { if (status.ok()) { assert(next_sequence); flush_scheduler_.Clear(); @@ -2193,6 +2231,7 @@ Status DB::OpenAndTrimHistory( IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log) { IOStatus io_s; std::unique_ptr lfile; @@ -2236,9 +2275,15 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush, - immutable_db_options_.wal_compression); + immutable_db_options_.wal_compression, + immutable_db_options_.track_and_verify_wals); io_s = (*new_log)->AddCompressionTypeRecord(write_options); + if (io_s.ok()) { + io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options, + predecessor_wal_info); + } } + return io_s; } @@ -2331,8 +2376,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, log::Writer* new_log = nullptr; const size_t preallocate_block_size = impl->GetWalPreallocateBlockSize(max_write_buffer_size); + // TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL + // created during DB open with predecessor WALs from previous DB session due + // to `avoid_flush_during_recovery == true`. This can protect the last WAL + // recovered. s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/, - preallocate_block_size, &new_log); + preallocate_block_size, + PredecessorWALInfo() /* predecessor_wal_info */, + &new_log); if (s.ok()) { // Prevent log files created by previous instance from being recycled. // They might be in alive_log_file_, and might get recycled otherwise. @@ -2367,7 +2418,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, recovered_seq); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(write_options, false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 9f627430d68a..39b03dc7e3ac 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1558,7 +1558,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); @@ -1584,7 +1585,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (!io_s.ok()) { return io_s; } - io_s = log_writer->AddRecord(write_options, log_entry); + io_s = log_writer->AddRecord(write_options, log_entry, sequence); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); @@ -1634,7 +1635,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1760,7 +1761,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -2443,10 +2444,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); if (creating_new_log) { + PredecessorWALInfo info; + log_write_mutex_.Lock(); + if (!logs_.empty()) { + log::Writer* cur_log_writer = logs_.back().writer; + info = PredecessorWALInfo(cur_log_writer->get_log_number(), + cur_log_writer->file()->GetFileSize(), + cur_log_writer->GetLastSeqnoRecorded()); + } + log_write_mutex_.Unlock(); // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. io_s = CreateWAL(write_options, new_log_number, recycle_log_number, - preallocate_block_size, &new_log); + preallocate_block_size, info, &new_log); if (s.ok()) { s = io_s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9ac09082a07c..3221a1d689e8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1838,9 +1838,125 @@ class RecoveryTestHelper { } }; -class DBWALTestWithParams : public DBWALTestBase, - public ::testing::WithParamInterface< - std::tuple> { +TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.recycle_log_file_num = 1; + options.track_and_verify_wals = true; + + DestroyAndReopen(options); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + FlushOptions fo; + fo.wait = true; + ASSERT_OK(dbfull()->Flush(fo)); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + ASSERT_OK(dbfull()->Flush(fo)); + + // Stop background flush to avoid deleting any WAL + env_->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + // Recycle the first WAL + ASSERT_OK(Put("key1", "old_value")); + + // Create WAL hole + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_GE(log_files.size(), 1); + ASSERT_OK(test::TruncateFile( + options.env, LogFileName(dbname_, log_files.back()->LogNumber()), + 0 /* new_length */)); + + // Recycle the second WAL + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + Status s = TryReopen(options); + + ASSERT_OK(s); + + ASSERT_EQ("wal_to_recycle", Get("key_ignore2")); + ASSERT_EQ("NOT_FOUND", Get("key1")); + + Close(); +} + +class DBWALTrackAndVerifyWALsWithParamsTest + : public DBWALTestBase, + public ::testing::WithParamInterface { + public: + DBWALTrackAndVerifyWALsWithParamsTest() + : DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {} +}; + +INSTANTIATE_TEST_CASE_P( + DBWALTrackAndVerifyWALsWithParamsTest, + DBWALTrackAndVerifyWALsWithParamsTest, + ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, + WALRecoveryMode::kAbsoluteConsistency, + WALRecoveryMode::kPointInTimeRecovery, + WALRecoveryMode::kSkipAnyCorruptedRecords)); + +TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.track_and_verify_wals = true; + options.wal_recovery_mode = GetParam(); + + DestroyAndReopen(options); + + // Stop background flush to avoid deleting any WAL + env_->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + ASSERT_OK(Put("key1", "old_value")); + + // Create WAL hole + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_EQ(log_files.size(), 1); + ASSERT_OK(test::TruncateFile( + options.env, LogFileName(dbname_, log_files.back()->LogNumber()), + 0 /* new_length */)); + + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + Status s = TryReopen(options); + if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { + ASSERT_OK(s); + ASSERT_EQ("NOT_FOUND", Get("key1")); + } else if (options.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency || + options.wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find( + "Mismatched last sequence number recorded in the WAL") != + std::string::npos); + } else { + ASSERT_OK(s); + ASSERT_EQ("new_value", Get("key1")); + } + + Close(); +} + +class DBWALTestWithParams + : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple> { public: DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} }; @@ -1853,7 +1969,8 @@ INSTANTIATE_TEST_CASE_P( RecoveryTestHelper::kWALFilesCount, 1), ::testing::Values(CompressionType::kNoCompression, - CompressionType::kZSTD))); + CompressionType::kZSTD), + ::testing::Bool())); class DBWALTestWithParamsVaryingRecoveryMode : public DBWALTestBase, @@ -1891,6 +2008,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); // test checksum failure or parsing RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, @@ -1914,6 +2032,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { // Verify clean slate behavior Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); options.create_if_missing = false; ASSERT_OK(TryReopen(options)); @@ -2164,6 +2283,7 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); @@ -2221,6 +2341,7 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); diff --git a/db/dbformat.h b/db/dbformat.h index f3a9b9a1a523..02aa632d106d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -1179,4 +1179,54 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; +class PredecessorWALInfo { + public: + PredecessorWALInfo() + : log_number_(0), + size_bytes_(0), + last_seqno_recorded_(0), + initialized_(false) {} + + explicit PredecessorWALInfo(uint64_t log_number, uint64_t size_bytes, + SequenceNumber last_seqno_recorded) + : log_number_(log_number), + size_bytes_(size_bytes), + last_seqno_recorded_(last_seqno_recorded), + initialized_(true) {} + + uint64_t GetLogNumber() const { return log_number_; } + + uint64_t GetSizeBytes() const { return size_bytes_; } + + SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; } + + bool IsInitialized() const { return initialized_; } + + inline void EncodeTo(std::string* dst) const { + assert(dst != nullptr); + PutFixed64(dst, log_number_); + PutFixed64(dst, size_bytes_); + PutFixed64(dst, last_seqno_recorded_); + } + + inline Status DecodeFrom(Slice* src) { + if (!GetFixed64(src, &log_number_)) { + return Status::Corruption("Error decoding log number"); + } + if (!GetFixed64(src, &size_bytes_)) { + return Status::Corruption("Error decoding size bytes"); + } + if (!GetFixed64(src, &last_seqno_recorded_)) { + return Status::Corruption("Error decoding last seqno recorded"); + } + initialized_ = true; + return Status::OK(); + } + + private: + uint64_t log_number_; + uint64_t size_bytes_; + SequenceNumber last_seqno_recorded_; + bool initialized_; +}; } // namespace ROCKSDB_NAMESPACE diff --git a/db/log_format.h b/db/log_format.h index 9b691eeb5d7f..e213fa71921e 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -41,10 +41,14 @@ enum RecordType : uint8_t { // User-defined timestamp sizes kUserDefinedTimestampSizeType = 10, kRecyclableUserDefinedTimestampSizeType = 11, + + // For WAL verification + kPredecessorWALInfoType = 129, + kRecyclePredecessorWALInfoType = 130, }; // Unknown type of value with the 8-th bit set will be ignored constexpr uint8_t kRecordTypeSafeIgnoreMask = 1 << 7; -constexpr uint8_t kMaxRecordType = kRecyclableUserDefinedTimestampSizeType; +constexpr uint8_t kMaxRecordType = kRecyclePredecessorWALInfoType; constexpr unsigned int kBlockSize = 32768; diff --git a/db/log_reader.cc b/db/log_reader.cc index cae4fd7739d4..177e9be31226 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,7 +24,10 @@ Reader::Reporter::~Reporter() = default; Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) + Reporter* reporter, bool checksum, uint64_t log_num, + bool track_and_verify_wals, bool stop_replay_for_corruption, + uint64_t min_wal_number_to_keep, + const PredecessorWALInfo& predecessor_wal_info) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -37,6 +40,10 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), + track_and_verify_wals_(track_and_verify_wals), + stop_replay_for_corruption_(stop_replay_for_corruption), + min_wal_number_to_keep_(min_wal_number_to_keep), + predecessor_wal_info_(predecessor_wal_info), recycled_(false), first_record_read_(false), compression_type_(kNoCompression), @@ -65,6 +72,9 @@ Reader::~Reader() { // // TODO krad: Evaluate if we need to move to a more strict mode where we // restrict the inconsistency to only the last log +// TODO (hx235): move `wal_recovery_mode` to be a member data like other +// information (e.g, `stop_replay_for_corruption`) to decide whether to +// check for and surface corruption in `ReadRecord()` bool Reader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode wal_recovery_mode, uint64_t* record_checksum) { @@ -185,6 +195,23 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + + PredecessorWALInfo expected_predecessor_wal_info; + Status s = expected_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + expected_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record && !scratch->empty()) { @@ -329,6 +356,54 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, return false; } +void Reader::MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& expected_predecessor_wal_info) { + if (!track_and_verify_wals_ || + wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords || + stop_replay_for_corruption_) { + return; + } + uint64_t expected_predecessor_log_number = + expected_predecessor_wal_info.GetLogNumber(); + + // This is the first WAL recovered thus with no predecessor WAL info has been + // initialized + if (!predecessor_wal_info_.IsInitialized()) { + if (expected_predecessor_log_number >= min_wal_number_to_keep_) { + std::string reason = "Missing WAL of log number " + + std::to_string(expected_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } + } else { + if (predecessor_wal_info_.GetLogNumber() != + expected_predecessor_log_number) { + std::string reason = "Missing WAL of log number " + + std::to_string(expected_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } else if (predecessor_wal_info_.GetLastSeqnoRecorded() != + expected_predecessor_wal_info.GetLastSeqnoRecorded()) { + std::string reason = + "Mismatched last sequence number recorded in the WAL of log number " + + std::to_string(expected_predecessor_log_number) + ". Expected " + + std::to_string(expected_predecessor_wal_info.GetLastSeqnoRecorded()) + + ". Actual " + + std::to_string(predecessor_wal_info_.GetLastSeqnoRecorded()) + + ". (Last sequence number equal to 0 indicates no WAL records)"; + ReportCorruption(fragment.size(), reason.c_str()); + } else if (predecessor_wal_info_.GetSizeBytes() != + expected_predecessor_wal_info.GetSizeBytes()) { + std::string reason = + "Mismatched size of the WAL of log number " + + std::to_string(expected_predecessor_log_number) + ". Expected " + + std::to_string(expected_predecessor_wal_info.GetSizeBytes()) + + " bytes. Actual " + + std::to_string(predecessor_wal_info_.GetSizeBytes()) + " bytes."; + ReportCorruption(fragment.size(), reason.c_str()); + } + } +} + uint64_t Reader::LastRecordOffset() { return last_record_offset_; } uint64_t Reader::LastRecordEnd() { @@ -486,7 +561,8 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, int header_size = kHeaderSize; const bool is_recyclable_type = ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType); + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType); if (is_recyclable_type) { header_size = kRecyclableHeaderSize; if (first_record_read_ && !recycled_) { @@ -551,6 +627,8 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *result = Slice(header + header_size, length); @@ -640,7 +718,9 @@ Status Reader::UpdateRecordedTimestampSize( } bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, - WALRecoveryMode /*unused*/, + WALRecoveryMode wal_recovery_mode + + , uint64_t* /* checksum */) { assert(record != nullptr); assert(scratch != nullptr); @@ -730,7 +810,24 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + fragments_.clear(); + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + PredecessorWALInfo expected_predecessor_wal_info; + Status s = expected_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + expected_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record_ && !scratch->empty()) { @@ -871,7 +968,8 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, const uint32_t length = a | (b << 8); int header_size = kHeaderSize; if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType) { + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType) { if (first_record_read_ && !recycled_) { // A recycled log should have started with a recycled record *fragment_type_or_err = kBadRecord; @@ -927,6 +1025,8 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *fragment = Slice(header + header_size, length); diff --git a/db/log_reader.h b/db/log_reader.h index a39f5b9cbb31..da4d03e4f889 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -58,9 +58,14 @@ class Reader { // live while this Reader is in use. // // If "checksum" is true, verify checksums if available. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters Reader(std::shared_ptr info_log, std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num); + bool checksum, uint64_t log_num, bool track_and_verify_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& predecessor_wal_info = PredecessorWALInfo()); // No copying allowed Reader(const Reader&) = delete; void operator=(const Reader&) = delete; @@ -148,6 +153,17 @@ class Reader { // which log number this is uint64_t const log_number_; + // See `Optinos::track_and_verify_wals` + bool track_and_verify_wals_; + // Below variables are used for WAL verification + // TODO(hx235): to revise `stop_replay_for_corruption_` in `LogReader` since + // we have `predecessor_wal_info_` to verify against the `PredecessorWALInfo` + // recorded in current WAL. If there is no WAL hole, we can revise + // `stop_replay_for_corruption_` to be false. + bool stop_replay_for_corruption_; + uint64_t min_wal_number_to_keep_; + PredecessorWALInfo predecessor_wal_info_; + // Whether this is a recycled log file bool recycled_; @@ -211,14 +227,24 @@ class Reader { Status UpdateRecordedTimestampSize( const std::vector>& cf_to_ts_sz); + + void MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& expected_predecessor_wal_info); }; class FragmentBufferedReader : public Reader { public: - FragmentBufferedReader(std::shared_ptr info_log, - std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) - : Reader(info_log, std::move(_file), reporter, checksum, log_num), + FragmentBufferedReader( + std::shared_ptr info_log, + std::unique_ptr&& _file, Reporter* reporter, + bool checksum, uint64_t log_num, bool verify_and_track_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& predecessor_wal_info = PredecessorWALInfo()) + : Reader(info_log, std::move(_file), reporter, checksum, log_num, + verify_and_track_wals, stop_replay_for_corruption, + min_wal_number_to_keep, predecessor_wal_info), fragments_(), in_fragmented_record_(false) {} ~FragmentBufferedReader() override {} diff --git a/db/log_writer.cc b/db/log_writer.cc index f178d6281b50..ae4cb821184a 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -22,7 +22,7 @@ namespace ROCKSDB_NAMESPACE::log { Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush, - CompressionType compression_type) + CompressionType compression_type, bool track_and_verify_wals) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), @@ -31,7 +31,9 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize), manual_flush_(manual_flush), compression_type_(compression_type), - compress_(nullptr) { + compress_(nullptr), + track_and_verify_wals_(track_and_verify_wals), + last_seqno_recorded_(0) { for (uint8_t i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); @@ -52,19 +54,12 @@ Writer::~Writer() { } IOStatus Writer::WriteBuffer(const WriteOptions& write_options) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); + s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (!s.ok()) { return s; } @@ -92,17 +87,10 @@ bool Writer::PublishIfClosed() { } IOStatus Writer::AddRecord(const WriteOptions& write_options, - const Slice& slice) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + const Slice& slice, const SequenceNumber& seqno) { + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } const char* ptr = slice.data(); size_t left = slice.size(); @@ -118,7 +106,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, compress_start = true; } - IOStatus s; IOOptions opts; s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (s.ok()) { @@ -196,6 +183,10 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, } } + if (s.ok()) { + last_seqno_recorded_ = std::max(last_seqno_recorded_, seqno); + } + return s; } @@ -208,23 +199,16 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return IOStatus::OK(); } - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } CompressionTypeRecord record(compression_type_); std::string encode; record.EncodeTo(&encode); - IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType, - encode.data(), encode.size()); + s = EmitPhysicalRecord(write_options, kSetCompressionType, encode.data(), + encode.size()); if (s.ok()) { if (!manual_flush_) { IOOptions io_opts; @@ -251,6 +235,44 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return s; } +IOStatus Writer::MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info) { + IOStatus s = MaybeHandleSeenFileWriterError(); + + if (!s.ok()) { + return s; + } + + if (!track_and_verify_wals_ || !info.IsInitialized()) { + return IOStatus::OK(); + } + + std::string encode; + info.EncodeTo(&encode); + + s = MaybeSwitchToNewBlock(write_options, encode); + if (!s.ok()) { + return s; + } + + RecordType type = recycle_log_files_ ? kRecyclePredecessorWALInfoType + : kPredecessorWALInfoType; + s = EmitPhysicalRecord(write_options, type, encode.data(), encode.size()); + + if (!s.ok()) { + return s; + } + + if (!manual_flush_) { + IOOptions io_opts; + s = WritableFileWriter::PrepareIOOptions(write_options, io_opts); + if (s.ok()) { + s = dest_->Flush(io_opts); + } + } + return s; +} + IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( const WriteOptions& write_options, const UnorderedMap& cf_to_ts_sz) { @@ -275,22 +297,9 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType : kUserDefinedTimestampSizeType; - // If there's not enough space for this record, switch to a new block. - const int64_t leftover = kBlockSize - block_offset_; - if (leftover < header_size_ + (int)encoded.size()) { - IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); - if (!s.ok()) { - return s; - } - - std::vector trailer(leftover, '\x00'); - s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); - if (!s.ok()) { - return s; - } - - block_offset_ = 0; + IOStatus s = MaybeSwitchToNewBlock(write_options, encoded); + if (!s.ok()) { + return s; } return EmitPhysicalRecord(write_options, type, encoded.data(), @@ -313,7 +322,7 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, uint32_t crc = type_crc_[t]; if (t < kRecyclableFullType || t == kSetCompressionType || - t == kUserDefinedTimestampSizeType) { + t == kPredecessorWALInfoType || t == kUserDefinedTimestampSizeType) { // Legacy record format assert(block_offset_ + kHeaderSize + n <= kBlockSize); header_size = kHeaderSize; @@ -352,4 +361,42 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, return s; } +IOStatus Writer::MaybeHandleSeenFileWriterError() { + if (dest_->seen_error()) { +#ifndef NDEBUG + if (dest_->seen_injected_error()) { + std::stringstream msg; + msg << "Seen " << FaultInjectionTestFS::kInjected + << " error. Skip writing buffer."; + return IOStatus::IOError(msg.str()); + } +#endif // NDEBUG + return IOStatus::IOError("Seen error. Skip writing buffer."); + } + return IOStatus::OK(); +} + +IOStatus Writer::MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write) { + IOStatus s; + const int64_t leftover = kBlockSize - block_offset_; + // If there's not enough space for this record, switch to a new block. + if (leftover < header_size_ + (int)content_to_write.size()) { + IOOptions opts; + s = WritableFileWriter::PrepareIOOptions(write_options, opts); + if (!s.ok()) { + return s; + } + + std::vector trailer(leftover, '\x00'); + s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); + if (!s.ok()) { + return s; + } + + block_offset_ = 0; + } + return s; +} + } // namespace ROCKSDB_NAMESPACE::log diff --git a/db/log_writer.h b/db/log_writer.h index 7cae52dd51c4..f7aef75197d5 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include #include +#include "db/dbformat.h" #include "db/log_format.h" #include "rocksdb/compression_type.h" #include "rocksdb/env.h" @@ -76,18 +77,24 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters explicit Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush = false, - CompressionType compressionType = kNoCompression); + CompressionType compressionType = kNoCompression, + bool track_and_verify_wals = false); // No copying allowed Writer(const Writer&) = delete; void operator=(const Writer&) = delete; ~Writer(); - IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice); + IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice, + const SequenceNumber& seqno = 0); IOStatus AddCompressionTypeRecord(const WriteOptions& write_options); + IOStatus MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info); // If there are column families in `cf_to_ts_sz` not included in // `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero, @@ -116,6 +123,8 @@ class Writer { size_t TEST_block_offset() const { return block_offset_; } + SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; }; + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block @@ -131,6 +140,11 @@ class Writer { IOStatus EmitPhysicalRecord(const WriteOptions& write_options, RecordType type, const char* ptr, size_t length); + IOStatus MaybeHandleSeenFileWriterError(); + + IOStatus MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write); + // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() bool manual_flush_; @@ -145,6 +159,11 @@ class Writer { // Since the user-defined timestamp size cannot be changed while the DB is // running, existing entry in this map cannot be updated. UnorderedMap recorded_cf_to_ts_sz_; + + // See `Options::track_and_verify_wals` + bool track_and_verify_wals_; + + SequenceNumber last_seqno_recorded_; }; } // namespace log diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 62c79ac0e5e8..0c1ef4e11369 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -421,6 +421,7 @@ DECLARE_int32(test_ingest_standalone_range_deletion_one_in); DECLARE_bool(allow_unprepared_value); DECLARE_string(file_temperature_age_thresholds); DECLARE_uint32(commit_bypass_memtable_one_in); +DECLARE_bool(track_and_verify_wals); constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index c49846b9f194..1859e6940fb9 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -847,6 +847,10 @@ DEFINE_bool(allow_unprepared_value, ROCKSDB_NAMESPACE::ReadOptions().allow_unprepared_value, "Allow lazy loading of values for range scans"); +DEFINE_bool(track_and_verify_wals, + ROCKSDB_NAMESPACE::Options().track_and_verify_wals, + "See Options::track_and_verify_wals"); + static bool ValidateInt32Percent(const char* flagname, int32_t value) { if (value < 0 || value > 100) { fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", flagname, diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3f7b8e646216..a79aecda30ed 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -4117,6 +4117,7 @@ void InitializeOptionsFromFlags( options.level_compaction_dynamic_level_bytes = FLAGS_level_compaction_dynamic_level_bytes; options.track_and_verify_wals_in_manifest = true; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; options.verify_sst_unique_id_in_manifest = FLAGS_verify_sst_unique_id_in_manifest; options.memtable_protection_bytes_per_key = diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b27f53b4a849..0a8ed2fe021a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -632,6 +632,25 @@ struct DBOptions { // Default: false bool track_and_verify_wals_in_manifest = false; + // EXPERIMENTAL + // + // If true, various information about predecessor WAL will be recorded in the + // current WAL for verification on the predecessor WAL during WAL recovery. + // + // It verifies the following: + // 1. There exists at least some WAL in the DB + // - It's not compatible with `RepairDB()` since this option imposes a + // stricter requirement on WAL than the DB went through `RepariDB()` can + // normally meet + // 2. There exists no WAL hole where new WAL data presents while some old WAL + // data is missing + // + // This is intended to be a better replacement to + // `track_and_verify_wals_in_manifest`. + // + // Default: false + bool track_and_verify_wals = false; + // If true, verifies the SST unique id between MANIFEST and actual file // each time an SST file is opened. This check ensures an SST file is not // overwritten or misplaced. A corruption error will be reported if mismatch diff --git a/options/db_options.cc b/options/db_options.cc index 967bb9b964a6..29e7632473a0 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -230,6 +230,10 @@ static std::unordered_map track_and_verify_wals_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"track_and_verify_wals", + {offsetof(struct ImmutableDBOptions, track_and_verify_wals), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"verify_sst_unique_id_in_manifest", {offsetof(struct ImmutableDBOptions, verify_sst_unique_id_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -716,6 +720,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) compaction_verify_record_count(options.compaction_verify_record_count), track_and_verify_wals_in_manifest( options.track_and_verify_wals_in_manifest), + track_and_verify_wals(options.track_and_verify_wals), verify_sst_unique_id_in_manifest( options.verify_sst_unique_id_in_manifest), env(options.env), @@ -820,6 +825,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { " " "Options.track_and_verify_wals_in_manifest: %d", track_and_verify_wals_in_manifest); + ROCKS_LOG_HEADER(log, + " " + "Options.track_and_verify_wals: %d", + track_and_verify_wals); ROCKS_LOG_HEADER(log, " Options.verify_sst_unique_id_in_manifest: %d", verify_sst_unique_id_in_manifest); ROCKS_LOG_HEADER(log, " Options.env: %p", diff --git a/options/db_options.h b/options/db_options.h index ac76ea40d8eb..25318ec1a616 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -27,6 +27,7 @@ struct ImmutableDBOptions { bool flush_verify_memtable_count; bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; + bool track_and_verify_wals; bool verify_sst_unique_id_in_manifest; Env* env; std::shared_ptr rate_limiter; diff --git a/options/options_helper.cc b/options/options_helper.cc index f05d90f7c1c6..0e9b0199a443 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -71,6 +71,7 @@ void BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.compaction_verify_record_count; options.track_and_verify_wals_in_manifest = immutable_db_options.track_and_verify_wals_in_manifest; + options.track_and_verify_wals = immutable_db_options.track_and_verify_wals; options.verify_sst_unique_id_in_manifest = immutable_db_options.verify_sst_unique_id_in_manifest; options.env = immutable_db_options.env; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 6036d0513122..d5d92838308e 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -406,6 +406,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "flush_verify_memtable_count=true;" "compaction_verify_record_count=true;" "track_and_verify_wals_in_manifest=true;" + "track_and_verify_wals=true;" "verify_sst_unique_id_in_manifest=true;" "is_fd_close_on_exec=false;" "bytes_per_sync=4295013613;" diff --git a/options/options_test.cc b/options/options_test.cc index 2f9b12d3a1d8..ef5e62c3fed3 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -146,6 +146,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -329,6 +330,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); @@ -901,6 +903,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"daily_offpeak_time_utc", "06:30-23:30"}, @@ -916,6 +919,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); db_options_map["unknown_option"] = "1"; @@ -2450,6 +2454,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -2638,6 +2643,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); ASSERT_EQ(new_db_opt.use_fsync, true); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 49ab2ebfd7d5..35884a7b3789 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -308,6 +308,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { db_opt->is_fd_close_on_exec = rnd->Uniform(2); db_opt->paranoid_checks = rnd->Uniform(2); db_opt->track_and_verify_wals_in_manifest = rnd->Uniform(2); + db_opt->track_and_verify_wals = rnd->Uniform(2); db_opt->verify_sst_unique_id_in_manifest = rnd->Uniform(2); db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f643552608f6..66aacd230988 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1808,6 +1808,8 @@ DEFINE_bool(build_info, false, DEFINE_bool(track_and_verify_wals_in_manifest, false, "If true, enable WAL tracking in the MANIFEST"); +DEFINE_bool(track_and_verify_wals, false, "See Options.track_and_verify_wals"); + namespace ROCKSDB_NAMESPACE { namespace { static Status CreateMemTableRepFactory( @@ -4721,6 +4723,7 @@ class Benchmark { options.allow_data_in_errors = FLAGS_allow_data_in_errors; options.track_and_verify_wals_in_manifest = FLAGS_track_and_verify_wals_in_manifest; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index f9943a2c7649..59912dbe9daa 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -343,6 +343,7 @@ "universal_max_read_amp": lambda: random.choice([-1] * 3 + [0, 4, 10]), "paranoid_memory_checks": lambda: random.choice([0] * 7 + [1]), "allow_unprepared_value": lambda: random.choice([0, 1]), + "track_and_verify_wals": lambda: random.choice([0, 1]), } _TEST_DIR_ENV_VAR = "TEST_TMPDIR" # If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR diff --git a/unreleased_history/new_features/track_and_verify_wals_api.md b/unreleased_history/new_features/track_and_verify_wals_api.md new file mode 100644 index 000000000000..b0889d0f9bb9 --- /dev/null +++ b/unreleased_history/new_features/track_and_verify_wals_api.md @@ -0,0 +1 @@ +Provide a new option `track_and_verify_wals` to track and verify various information about WAL during WAL recovery. This is intended to be a better replacement to `track_and_verify_wals_in_manifest`.