diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 16a78ce89d7d..3afd4d441805 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1203,40 +1203,41 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, Status::Corruption("log record too small")); continue; } - // We create a new batch and initialize with a valid prot_info_ to store // the data checksums - std::unique_ptr batch(new WriteBatch()); + WriteBatch batch; + std::unique_ptr new_batch; - status = WriteBatchInternal::SetContents(batch.get(), record); + status = WriteBatchInternal::SetContents(&batch, record); if (!status.ok()) { return status; } const UnorderedMap& record_ts_sz = reader.GetRecordedTimestampSize(); - bool batch_updated = false; status = HandleWriteBatchTimestampSizeDifference( - batch.get(), running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &batch, - &batch_updated); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch); if (!status.ok()) { return status; } + + bool batch_updated = new_batch != nullptr; + WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch; TEST_SYNC_POINT_CALLBACK( "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", - batch.get()); + batch_to_use); TEST_SYNC_POINT_CALLBACK( "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum", &record_checksum); status = WriteBatchInternal::UpdateProtectionInfo( - batch.get(), 8 /* bytes_per_key */, + batch_to_use, 8 /* bytes_per_key */, batch_updated ? nullptr : &record_checksum); if (!status.ok()) { return status; } - SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get()); + SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use); if (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { @@ -1257,7 +1258,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // and returns true. if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter, status, stop_replay_by_wal_filter, - *batch)) { + *batch_to_use)) { continue; } @@ -1268,7 +1269,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // That's why we set ignore missing column families to true bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( - batch.get(), column_family_memtables_.get(), &flush_scheduler_, + batch_to_use, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, true, wal_number, this, false /* concurrent_memtable_writes */, next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); @@ -2232,4 +2233,4 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } return s; } -} // namespace ROCKSDB_NAMESPACE +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 59ffbabf41cd..104ffd2a5c57 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -311,6 +311,229 @@ TEST_F(DBWALTest, Recover) { } while (ChangeWalOptions()); } +class DBWALTestWithTimestamp + : public DBBasicTestWithTimestampBase, + public testing::WithParamInterface { + public: + DBWALTestWithTimestamp() + : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {} + + Status CreateAndReopenWithTs(const std::vector& cfs, + const Options& ts_options, bool persist_udt, + bool avoid_flush_during_recovery = false) { + Options default_options = CurrentOptions(); + default_options.allow_concurrent_memtable_write = + persist_udt ? true : false; + DestroyAndReopen(default_options); + CreateColumnFamilies(cfs, ts_options); + return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt, + avoid_flush_during_recovery); + } + + Status ReopenColumnFamiliesWithTs(const std::vector& cfs, + Options ts_options, bool persist_udt, + bool avoid_flush_during_recovery = false) { + Options default_options = CurrentOptions(); + default_options.create_if_missing = false; + default_options.allow_concurrent_memtable_write = + persist_udt ? true : false; + default_options.avoid_flush_during_recovery = avoid_flush_during_recovery; + ts_options.create_if_missing = false; + + std::vector cf_options(cfs.size(), ts_options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + cf_options.insert(cf_options.begin(), default_options); + Close(); + return TryReopenWithColumnFamilies(cfs_plus_default, cf_options); + } + + Status Put(uint32_t cf, const Slice& key, const Slice& ts, + const Slice& value) { + WriteOptions write_opts; + return db_->Put(write_opts, handles_[cf], key, ts, value); + } + + void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key, + const std::string& expected_value, + const std::string& expected_ts) { + std::string actual_value; + std::string actual_ts; + ASSERT_OK( + db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts)); + ASSERT_EQ(expected_value, actual_value); + ASSERT_EQ(expected_ts, actual_ts); + } +}; + +TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { + // Set up the option that enables user defined timestmp size. + std::string ts1; + PutFixed64(&ts1, 1); + Options ts_options; + ts_options.create_if_missing = true; + ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + // Test that user-defined timestamps are recovered from WAL regardless of + // the value of this flag because UDTs are saved in WAL nonetheless. + // We however need to explicitly disable flush during recovery by setting + // `avoid_flush_during_recovery=true` so that we can avoid timestamps getting + // stripped when the `persist_user_defined_timestamps` flag is false, so that + // all written timestamps are available for testing user-defined time travel + // read. + bool persist_udt = test::ShouldPersistUDT(GetParam()); + ts_options.persist_user_defined_timestamps = persist_udt; + bool avoid_flush_during_recovery = true; + + ReadOptions read_opts; + do { + Slice ts_slice = ts1; + read_opts.timestamp = &ts_slice; + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt, + avoid_flush_during_recovery)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); + ASSERT_OK(Put(1, "foo", ts1, "v1")); + ASSERT_OK(Put(1, "baz", ts1, "v5")); + + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, + avoid_flush_during_recovery)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); + // Do a timestamped read with ts1 after second reopen. + CheckGet(read_opts, 1, "foo", "v1", ts1); + CheckGet(read_opts, 1, "baz", "v5", ts1); + + // Write more value versions for key "foo" and "bar" before and after second + // reopen. + std::string ts2; + PutFixed64(&ts2, 2); + ASSERT_OK(Put(1, "bar", ts2, "v2")); + ASSERT_OK(Put(1, "foo", ts2, "v3")); + + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, + avoid_flush_during_recovery)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); + std::string ts3; + PutFixed64(&ts3, 3); + ASSERT_OK(Put(1, "foo", ts3, "v4")); + + // All the key value pairs available for read: + // "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")] + // "bar" -> [(ts2, "v2")] + // "baz" -> [(ts1, "v5")] + // Do a timestamped read with ts1 after third reopen. + // read_opts.timestamp is set to ts1 for below reads + CheckGet(read_opts, 1, "foo", "v1", ts1); + std::string value; + ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound()); + CheckGet(read_opts, 1, "baz", "v5", ts1); + + // Do a timestamped read with ts2 after third reopen. + ts_slice = ts2; + // read_opts.timestamp is set to ts2 for below reads. + CheckGet(read_opts, 1, "foo", "v3", ts2); + CheckGet(read_opts, 1, "bar", "v2", ts2); + CheckGet(read_opts, 1, "baz", "v5", ts1); + + // Do a timestamped read with ts3 after third reopen. + ts_slice = ts3; + // read_opts.timestamp is set to ts3 for below reads. + CheckGet(read_opts, 1, "foo", "v4", ts3); + CheckGet(read_opts, 1, "bar", "v2", ts2); + CheckGet(read_opts, 1, "baz", "v5", ts1); + } while (ChangeWalOptions()); +} + +TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { + // Set up the option that enables user defined timestamp size. + std::string min_ts; + std::string write_ts; + PutFixed64(&min_ts, 0); + PutFixed64(&write_ts, 1); + Options ts_options; + ts_options.create_if_missing = true; + ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + bool persist_udt = test::ShouldPersistUDT(GetParam()); + ts_options.persist_user_defined_timestamps = persist_udt; + + std::string smallest_ukey_without_ts = "baz"; + std::string largest_ukey_without_ts = "foo"; + + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt)); + // No flush, no sst files, because of no data. + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); + ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1")); + ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5")); + + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt)); + // Memtable recovered from WAL flushed because `avoid_flush_during_recovery` + // defaults to false, created one L0 file. + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U); + + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files); + ASSERT_GT(level_to_files.size(), 1); + // L0 only has one SST file. + ASSERT_EQ(level_to_files[0].size(), 1); + auto meta = level_to_files[0][0]; + if (persist_udt) { + ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key()); + ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key()); + } else { + ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key()); + ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key()); + } +} + +// Param 0: test mode for the user-defined timestamp feature +INSTANTIATE_TEST_CASE_P( + P, DBWALTestWithTimestamp, + ::testing::Values( + test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp, + test::UserDefinedTimestampTestMode::kNormal)); + +TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) { + Options options; + options.create_if_missing = true; + options.comparator = BytewiseComparator(); + bool avoid_flush_during_recovery = true; + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */, + avoid_flush_during_recovery)); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5")); + + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.persist_user_defined_timestamps = false; + // Test handle timestamp size inconsistency in WAL when enabling user-defined + // timestamps. + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, + false /* persist_udt */, + avoid_flush_during_recovery)); + + std::string ts; + PutFixed64(&ts, 0); + Slice ts_slice = ts; + ReadOptions read_opts; + read_opts.timestamp = &ts_slice; + // Pre-existing entries are treated as if they have the min timestamp. + CheckGet(read_opts, 1, "foo", "v1", ts); + CheckGet(read_opts, 1, "baz", "v5", ts); + ts.clear(); + PutFixed64(&ts, 1); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2")); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6")); + CheckGet(read_opts, 1, "foo", "v2", ts); + CheckGet(read_opts, 1, "baz", "v6", ts); + + options.comparator = BytewiseComparator(); + // Open the column family again with the UDT feature disabled. Test handle + // timestamp size inconsistency in WAL when disabling user-defined timestamps + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, + true /* persist_udt */, + avoid_flush_during_recovery)); + ASSERT_EQ("v2", Get(1, "foo")); + ASSERT_EQ("v6", Get(1, "baz")); +} + TEST_F(DBWALTest, RecoverWithTableHandle) { do { Options options = CurrentOptions(); @@ -2440,4 +2663,4 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} +} \ No newline at end of file diff --git a/util/udt_util.cc b/util/udt_util.cc index 77dde74775d6..5f2b7117cb34 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -108,32 +108,27 @@ enum class ToggleUDT { kInvalidChange, }; -bool IsPrefix(const char* str, const char* prefix) { - return strncmp(str, prefix, strlen(prefix)) == 0; -} - ToggleUDT CompareComparator(const Comparator* new_comparator, const std::string& old_comparator_name) { static const char* kUDTSuffix = ".u64ts"; + static const Slice kSuffixSlice = kUDTSuffix; static const size_t kSuffixSize = 6; size_t ts_sz = new_comparator->timestamp_size(); (void)ts_sz; - const char* new_comparator_name = new_comparator->Name(); - size_t new_name_size = strlen(new_comparator_name); - size_t old_name_size = old_comparator_name.size(); - if (new_name_size == old_name_size && - IsPrefix(new_comparator_name, old_comparator_name.data())) { + Slice new_ucmp_name(new_comparator->Name()); + Slice old_ucmp_name(old_comparator_name); + if (new_ucmp_name.compare(old_ucmp_name) == 0) { return ToggleUDT::kUnchanged; } - if (new_name_size == old_name_size + kSuffixSize && - IsPrefix(new_comparator_name, old_comparator_name.data()) && - IsPrefix(new_comparator_name + old_name_size, kUDTSuffix)) { + if (new_ucmp_name.size() == old_ucmp_name.size() + kSuffixSize && + new_ucmp_name.starts_with(old_ucmp_name) && + new_ucmp_name.ends_with(kSuffixSlice)) { assert(ts_sz == 8); return ToggleUDT::kEnableUDT; } - if (new_name_size + kSuffixSize == old_name_size && - IsPrefix(old_comparator_name.data(), new_comparator_name) && - IsPrefix(old_comparator_name.data() + new_name_size, kUDTSuffix)) { + if (old_ucmp_name.size() == new_ucmp_name.size() + kSuffixSize && + old_ucmp_name.starts_with(new_ucmp_name) && + old_ucmp_name.ends_with(kSuffixSlice)) { assert(ts_sz == 0); return ToggleUDT::kDisableUDT; } @@ -276,7 +271,7 @@ Status HandleWriteBatchTimestampSizeDifference( const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch, bool* batch_updated) { + std::unique_ptr* new_batch) { // Quick path to bypass checking the WriteBatch. if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { return Status::OK(); @@ -288,7 +283,6 @@ Status HandleWriteBatchTimestampSizeDifference( return status; } else if (need_recovery) { assert(new_batch); - assert(batch_updated); SequenceNumber sequence = WriteBatchInternal::Sequence(batch); TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); status = batch->Iterate(&recovery_handler); @@ -297,7 +291,6 @@ Status HandleWriteBatchTimestampSizeDifference( } else { *new_batch = recovery_handler.TransferNewBatch(); WriteBatchInternal::SetSequence(new_batch->get(), sequence); - *batch_updated = true; } } return Status::OK(); @@ -347,4 +340,4 @@ Status ValidateUserDefinedTimestampsOptions( return Status::InvalidArgument( "Unsupported user defined timestamps settings change."); } -} // namespace ROCKSDB_NAMESPACE +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/util/udt_util.h b/util/udt_util.h index d2e8cfb63e34..555c11e0d37d 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -214,8 +214,7 @@ Status HandleWriteBatchTimestampSizeDifference( const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch = nullptr, - bool* batch_updated = nullptr); + std::unique_ptr* new_batch = nullptr); // This util function is used when opening an existing column family and // processing its VersionEdit. It does a sanity check for the column family's @@ -247,4 +246,4 @@ Status ValidateUserDefinedTimestampsOptions( const Comparator* new_comparator, const std::string& old_comparator_name, bool new_persist_udt, bool old_persist_udt, bool* mark_sst_files_has_no_udt); -} // namespace ROCKSDB_NAMESPACE +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index 1f60891c2b65..346e538dc13e 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -214,13 +214,10 @@ TEST_F(HandleTimestampSizeDifferenceTest, &batch, running_ts_sz, record_ts_sz, TimestampSizeConsistencyMode::kVerifyConsistency)); std::unique_ptr new_batch(nullptr); - bool batch_updated = false; ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch, - &batch_updated)); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); - ASSERT_FALSE(batch_updated); } TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { @@ -235,13 +232,10 @@ TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { &batch, running_ts_sz, record_ts_sz, TimestampSizeConsistencyMode::kVerifyConsistency)); std::unique_ptr new_batch(nullptr); - bool batch_updated = false; ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch, - &batch_updated)); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); - ASSERT_FALSE(batch_updated); } TEST_F(HandleTimestampSizeDifferenceTest, @@ -259,13 +253,10 @@ TEST_F(HandleTimestampSizeDifferenceTest, .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); - bool batch_updated = false; ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch, - &batch_updated)); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); - ASSERT_TRUE(batch_updated); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::nullopt /* dropped_cf */); } @@ -287,13 +278,10 @@ TEST_F(HandleTimestampSizeDifferenceTest, .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); - bool batch_updated = false; ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch, - &batch_updated)); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); - ASSERT_TRUE(batch_updated); CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t)); } @@ -305,17 +293,14 @@ TEST_F(HandleTimestampSizeDifferenceTest, WriteBatch batch; CreateWriteBatch(record_ts_sz, &batch); std::unique_ptr new_batch(nullptr); - bool batch_updated = false; // kReconcileInconsistency tolerate inconsistency for dropped column family // and all related entries copied over to the new WriteBatch. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch, - &batch_updated)); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); - ASSERT_TRUE(batch_updated); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::optional(2)); } @@ -458,4 +443,4 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} +} \ No newline at end of file