diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1e47e6dd2c6..4433c310ead 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3191,6 +3191,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, edit.SetColumnFamily(new_id); edit.SetLogNumber(logfile_number_); edit.SetComparatorName(cf_options.comparator->Name()); + edit.SetPersistUserDefinedTimestamps( + cf_options.persist_user_defined_timestamps); // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 988283381f4..3d41520c869 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1203,10 +1203,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, Status::Corruption("log record too small")); continue; } - // We create a new batch and initialize with a valid prot_info_ to store // the data checksums WriteBatch batch; + std::unique_ptr new_batch; status = WriteBatchInternal::SetContents(&batch, record); if (!status.ok()) { @@ -1215,26 +1215,29 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, const UnorderedMap& record_ts_sz = reader.GetRecordedTimestampSize(); - // TODO(yuzhangyu): update mode to kReconcileInconsistency when user - // comparator can be changed. status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency); + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch); if (!status.ok()) { return status; } + + bool batch_updated = new_batch != nullptr; + WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch; TEST_SYNC_POINT_CALLBACK( - "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch); + "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", + batch_to_use); TEST_SYNC_POINT_CALLBACK( "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum", &record_checksum); status = WriteBatchInternal::UpdateProtectionInfo( - &batch, 8 /* bytes_per_key */, &record_checksum); + batch_to_use, 8 /* bytes_per_key */, + batch_updated ? nullptr : &record_checksum); if (!status.ok()) { return status; } - SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); + SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use); if (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { @@ -1255,7 +1258,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // and returns true. if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter, status, stop_replay_by_wal_filter, - batch)) { + *batch_to_use)) { continue; } @@ -1266,7 +1269,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // That's why we set ignore missing column families to true bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), &flush_scheduler_, + batch_to_use, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, true, wal_number, this, false /* concurrent_memtable_writes */, next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 915ebe0b0a6..72b6f7c7b82 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -318,30 +318,25 @@ class DBWALTestWithTimestamp DBWALTestWithTimestamp() : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {} - void SetUp() override { - persist_udt_ = test::ShouldPersistUDT(GetParam()); - DBBasicTestWithTimestampBase::SetUp(); - } - - Status CreateAndReopenWithCFWithTs(const std::vector& cfs, - Options& ts_options, - bool avoid_flush_during_recovery = false) { + Status CreateAndReopenWithTs(const std::vector& cfs, + const Options& ts_options, bool persist_udt, + bool avoid_flush_during_recovery = false) { Options default_options = CurrentOptions(); default_options.allow_concurrent_memtable_write = - persist_udt_ ? true : false; + persist_udt ? true : false; DestroyAndReopen(default_options); CreateColumnFamilies(cfs, ts_options); - return ReopenColumnFamiliesWithTs(cfs, ts_options, + return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt, avoid_flush_during_recovery); } Status ReopenColumnFamiliesWithTs(const std::vector& cfs, - Options ts_options, + Options ts_options, bool persist_udt, bool avoid_flush_during_recovery = false) { Options default_options = CurrentOptions(); default_options.create_if_missing = false; default_options.allow_concurrent_memtable_write = - persist_udt_ ? true : false; + persist_udt ? true : false; default_options.avoid_flush_during_recovery = avoid_flush_during_recovery; ts_options.create_if_missing = false; @@ -369,9 +364,6 @@ class DBWALTestWithTimestamp ASSERT_EQ(expected_value, actual_value); ASSERT_EQ(expected_ts, actual_ts); } - - protected: - bool persist_udt_; }; TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { @@ -388,20 +380,21 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { // stripped when the `persist_user_defined_timestamps` flag is false, so that // all written timestamps are available for testing user-defined time travel // read. - ts_options.persist_user_defined_timestamps = persist_udt_; + bool persist_udt = test::ShouldPersistUDT(GetParam()); + ts_options.persist_user_defined_timestamps = persist_udt; bool avoid_flush_during_recovery = true; ReadOptions read_opts; do { Slice ts_slice = ts1; read_opts.timestamp = &ts_slice; - ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options, - avoid_flush_during_recovery)); + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt, + avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); ASSERT_OK(Put(1, "foo", ts1, "v1")); ASSERT_OK(Put(1, "baz", ts1, "v5")); - ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); // Do a timestamped read with ts1 after second reopen. @@ -415,14 +408,19 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { ASSERT_OK(Put(1, "bar", ts2, "v2")); ASSERT_OK(Put(1, "foo", ts2, "v3")); - ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); std::string ts3; PutFixed64(&ts3, 3); ASSERT_OK(Put(1, "foo", ts3, "v4")); + // All the key value pairs available for read: + // "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")] + // "bar" -> [(ts2, "v2")] + // "baz" -> [(ts1, "v5")] // Do a timestamped read with ts1 after third reopen. + // read_opts.timestamp is set to ts1 for below reads CheckGet(read_opts, 1, "foo", "v1", ts1); std::string value; ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound()); @@ -430,60 +428,20 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { // Do a timestamped read with ts2 after third reopen. ts_slice = ts2; + // read_opts.timestamp is set to ts2 for below reads. CheckGet(read_opts, 1, "foo", "v3", ts2); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); // Do a timestamped read with ts3 after third reopen. ts_slice = ts3; + // read_opts.timestamp is set to ts3 for below reads. CheckGet(read_opts, 1, "foo", "v4", ts3); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); } while (ChangeWalOptions()); } -class TestTsSzComparator : public Comparator { - public: - explicit TestTsSzComparator(size_t ts_sz) : Comparator(ts_sz) {} - - int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/, - const ROCKSDB_NAMESPACE::Slice& /*b*/) const override { - return 0; - } - const char* Name() const override { return "TestTsSzComparator.u64ts"; } - void FindShortestSeparator( - std::string* /*start*/, - const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {} - void FindShortSuccessor(std::string* /*key*/) const override {} -}; - -TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { - // Set up the option that enables user defined timestmp size. - std::string ts; - PutFixed16(&ts, 1); - TestTsSzComparator test_cmp(2); - Options ts_options; - ts_options.create_if_missing = true; - ts_options.comparator = &test_cmp; - ts_options.persist_user_defined_timestamps = persist_udt_; - - ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options)); - ASSERT_OK(Put(1, "foo", ts, "v1")); - ASSERT_OK(Put(1, "baz", ts, "v5")); - - // In real use cases, switching to a different user comparator is prohibited - // by a sanity check during DB open that does a user comparator name - // comparison. This test mocked and bypassed that sanity check because the - // before and after user comparator are both named "TestTsSzComparator.u64ts". - // This is to test the user-defined timestamp recovery logic for WAL files - // have the intended consistency check. - // `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details. - TestTsSzComparator diff_test_cmp(3); - ts_options.comparator = &diff_test_cmp; - ASSERT_TRUE( - ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument()); -} - TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { // Set up the option that enables user defined timestamp size. std::string min_ts; @@ -493,18 +451,19 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { Options ts_options; ts_options.create_if_missing = true; ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); - ts_options.persist_user_defined_timestamps = persist_udt_; + bool persist_udt = test::ShouldPersistUDT(GetParam()); + ts_options.persist_user_defined_timestamps = persist_udt; std::string smallest_ukey_without_ts = "baz"; std::string largest_ukey_without_ts = "foo"; - ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options)); + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt)); // No flush, no sst files, because of no data. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1")); ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5")); - ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options)); + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt)); // Memtable recovered from WAL flushed because `avoid_flush_during_recovery` // defaults to false, created one L0 file. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U); @@ -515,7 +474,7 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { // L0 only has one SST file. ASSERT_EQ(level_to_files[0].size(), 1); auto meta = level_to_files[0][0]; - if (persist_udt_) { + if (persist_udt) { ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key()); } else { @@ -526,11 +485,55 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { // Param 0: test mode for the user-defined timestamp feature INSTANTIATE_TEST_CASE_P( - DBWALTestWithTimestamp, DBWALTestWithTimestamp, + P, DBWALTestWithTimestamp, ::testing::Values( test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp, test::UserDefinedTimestampTestMode::kNormal)); +TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) { + Options options; + options.create_if_missing = true; + options.comparator = BytewiseComparator(); + bool avoid_flush_during_recovery = true; + ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */, + avoid_flush_during_recovery)); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5")); + + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.persist_user_defined_timestamps = false; + // Test handle timestamp size inconsistency in WAL when enabling user-defined + // timestamps. + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, + false /* persist_udt */, + avoid_flush_during_recovery)); + + std::string ts; + PutFixed64(&ts, 0); + Slice ts_slice = ts; + ReadOptions read_opts; + read_opts.timestamp = &ts_slice; + // Pre-existing entries are treated as if they have the min timestamp. + CheckGet(read_opts, 1, "foo", "v1", ts); + CheckGet(read_opts, 1, "baz", "v5", ts); + ts.clear(); + PutFixed64(&ts, 1); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2")); + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6")); + CheckGet(read_opts, 1, "foo", "v2", ts); + CheckGet(read_opts, 1, "baz", "v6", ts); + + options.comparator = BytewiseComparator(); + // Open the column family again with the UDT feature disabled. Test handle + // timestamp size inconsistency in WAL when disabling user-defined timestamps + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, + true /* persist_udt */, + avoid_flush_during_recovery)); + ASSERT_EQ("v2", Get(1, "foo")); + ASSERT_EQ("v6", Get(1, "baz")); +} + TEST_F(DBWALTest, RecoverWithTableHandle) { do { Options options = CurrentOptions(); diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 9b8ca31d3d1..202c4c345cf 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3346,6 +3346,69 @@ INSTANTIATE_TEST_CASE_P( test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp, test::UserDefinedTimestampTestMode::kNormal)); +TEST_F(DBBasicTestWithTimestamp, EnableDisableUDT) { + Options options = CurrentOptions(); + options.env = env_; + // Create a column family without user-defined timestamps. + options.comparator = BytewiseComparator(); + options.persist_user_defined_timestamps = true; + DestroyAndReopen(options); + + // Create one SST file, its user keys have no user-defined timestamps. + ASSERT_OK(db_->Put(WriteOptions(), "foo", "val1")); + ASSERT_OK(Flush(0)); + Close(); + + // Reopen the existing column family and enable user-defined timestamps + // feature for it. + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.persist_user_defined_timestamps = false; + options.allow_concurrent_memtable_write = false; + Reopen(options); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "foo", &value).IsInvalidArgument()); + std::string read_ts; + PutFixed64(&read_ts, 0); + ReadOptions ropts; + Slice read_ts_slice = read_ts; + ropts.timestamp = &read_ts_slice; + std::string key_ts; + // Entries in pre-existing SST files are treated as if they have minimum + // user-defined timestamps. + ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts)); + ASSERT_EQ("val1", value); + ASSERT_EQ(read_ts, key_ts); + + // Do timestamped read / write. + std::string write_ts; + PutFixed64(&write_ts, 1); + ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val2")); + read_ts.clear(); + PutFixed64(&read_ts, 1); + ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts)); + ASSERT_EQ("val2", value); + ASSERT_EQ(write_ts, key_ts); + // The user keys in this SST file don't have user-defined timestamps either, + // because `persist_user_defined_timestamps` flag is set to false. + ASSERT_OK(Flush(0)); + Close(); + + // Reopen the existing column family while disabling user-defined timestamps. + options.comparator = BytewiseComparator(); + Reopen(options); + + ASSERT_TRUE(db_->Get(ropts, "foo", &value).IsInvalidArgument()); + ASSERT_OK(db_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("val2", value); + + // Continue to write / read the column family without user-defined timestamps. + ASSERT_OK(db_->Put(WriteOptions(), "foo", "val3")); + ASSERT_OK(db_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("val3", value); + Close(); +} + TEST_F(DBBasicTestWithTimestamp, GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) { Options options = CurrentOptions(); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 2e6c4d426a8..9fd9c13faf8 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -70,6 +70,7 @@ class FlushJobTestBase : public testing::Test { new_cf.AddColumnFamily(column_family_names_[i]); new_cf.SetColumnFamily(cf_id++); new_cf.SetComparatorName(ucmp_->Name()); + new_cf.SetPersistUserDefinedTimestamps(persist_udt_); new_cf.SetLogNumber(0); new_cf.SetNextFile(2); new_cf.SetLastSequence(last_seq++); diff --git a/db/repair.cc b/db/repair.cc index 58ada3aeb56..4b28ec2cd64 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -157,6 +157,7 @@ class Repairer { VersionEdit edit; edit.SetComparatorName(opts.comparator->Name()); + edit.SetPersistUserDefinedTimestamps(opts.persist_user_defined_timestamps); edit.SetLogNumber(0); edit.SetColumnFamily(cf_id); ColumnFamilyData* cfd; @@ -720,6 +721,8 @@ class Repairer { // recovered epoch numbers VersionEdit edit; edit.SetComparatorName(cfd->user_comparator()->Name()); + edit.SetPersistUserDefinedTimestamps( + cfd->ioptions()->persist_user_defined_timestamps); edit.SetLogNumber(0); edit.SetNextFile(next_file_number_); edit.SetColumnFamily(cfd->GetID()); diff --git a/db/version_edit.cc b/db/version_edit.cc index f5783eacd21..6459c2ff863 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -100,6 +100,7 @@ bool VersionEdit::EncodeTo(std::string* dst, PutLengthPrefixedSlice(dst, db_id_); } if (has_comparator_) { + assert(has_persist_user_defined_timestamps_); PutVarint32(dst, kComparator); PutLengthPrefixedSlice(dst, comparator_); } @@ -308,6 +309,15 @@ bool VersionEdit::EncodeTo(std::string* dst, PutVarint32(dst, kFullHistoryTsLow); PutLengthPrefixedSlice(dst, full_history_ts_low_); } + + if (HasPersistUserDefinedTimestamps()) { + // persist_user_defined_timestamps flag should be logged in the same + // VersionEdit as the user comparator name. + assert(has_comparator_); + PutVarint32(dst, kPersistUserDefinedTimestamps); + char p = static_cast(persist_user_defined_timestamps_); + PutLengthPrefixedSlice(dst, Slice(&p, 1)); + } return true; } @@ -777,6 +787,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kPersistUserDefinedTimestamps: + if (!GetLengthPrefixedSlice(&input, &str)) { + msg = "persist_user_defined_timestamps"; + } else if (str.size() != 1) { + msg = "persist_user_defined_timestamps field wrong size"; + } else { + persist_user_defined_timestamps_ = (str[0] == 1); + has_persist_user_defined_timestamps_ = true; + } + break; + default: if (tag & kTagSafeIgnoreMask) { // Tag from future which can be safely ignored. @@ -819,6 +840,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n Comparator: "); r.append(comparator_); } + if (has_persist_user_defined_timestamps_) { + r.append("\n PersistUserDefinedTimestamps: "); + r.append(persist_user_defined_timestamps_ ? "true" : "false"); + } if (has_log_number_) { r.append("\n LogNumber: "); AppendNumberTo(&r, log_number_); diff --git a/db/version_edit.h b/db/version_edit.h index cedccb3a230..a13d8e65f4b 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -71,6 +71,7 @@ enum Tag : uint32_t { kFullHistoryTsLow, kWalAddition2, kWalDeletion2, + kPersistUserDefinedTimestamps, }; enum NewFileCustomTag : uint32_t { @@ -397,6 +398,17 @@ class VersionEdit { bool HasComparatorName() const { return has_comparator_; } const std::string& GetComparatorName() const { return comparator_; } + void SetPersistUserDefinedTimestamps(bool persist_user_defined_timestamps) { + has_persist_user_defined_timestamps_ = true; + persist_user_defined_timestamps_ = persist_user_defined_timestamps; + } + bool HasPersistUserDefinedTimestamps() const { + return has_persist_user_defined_timestamps_; + } + bool GetPersistUserDefinedTimestamps() const { + return persist_user_defined_timestamps_; + } + void SetLogNumber(uint64_t num) { has_log_number_ = true; log_number_ = num; @@ -697,6 +709,7 @@ class VersionEdit { bool has_max_column_family_ = false; bool has_min_log_number_to_keep_ = false; bool has_last_sequence_ = false; + bool has_persist_user_defined_timestamps_ = false; // Compaction cursors for round-robin compaction policy CompactCursors compact_cursors_; @@ -724,6 +737,7 @@ class VersionEdit { uint32_t remaining_entries_ = 0; std::string full_history_ts_low_; + bool persist_user_defined_timestamps_ = true; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 732723996a6..7f8e303908b 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -17,6 +17,7 @@ #include "db/version_edit.h" #include "logging/logging.h" #include "monitoring/persistent_stats_history.h" +#include "util/udt_util.h" namespace ROCKSDB_NAMESPACE { @@ -613,15 +614,21 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, version_edit_params_.SetLogNumber(edit.log_number_); } } - if (edit.has_comparator_ && - edit.comparator_ != cfd->user_comparator()->Name()) { - if (!cf_to_cmp_names_) { - s = Status::InvalidArgument( - cfd->user_comparator()->Name(), - "does not match existing comparator " + edit.comparator_); - } else { + if (edit.has_comparator_) { + bool mark_sst_files_has_no_udt = false; + // If `persist_user_defined_timestamps` flag is recorded in manifest, it + // is guaranteed to be in the same VersionEdit as comparator. Otherwise, + // it's not recorded and it should have default value true. + s = ValidateUserDefinedTimestampsOptions( + cfd->user_comparator(), edit.comparator_, + cfd->ioptions()->persist_user_defined_timestamps, + edit.persist_user_defined_timestamps_, &mark_sst_files_has_no_udt); + if (!s.ok() && cf_to_cmp_names_) { cf_to_cmp_names_->emplace(cfd->GetID(), edit.comparator_); } + if (mark_sst_files_has_no_udt) { + cfds_to_mark_no_udt_.insert(cfd->GetID()); + } } if (edit.HasFullHistoryTsLow()) { const std::string& new_ts = edit.GetFullHistoryTsLow(); @@ -673,10 +680,17 @@ Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles( VersionEdit::NewFiles& new_files = edit.GetMutableNewFiles(); assert(!new_files.empty()); + // If true, enabling user-defined timestamp is detected for this column + // family. All its existing SST files need to have the file boundaries handled + // and their `persist_user_defined_timestamps` flag set to false regardless of + // its existing value. + bool mark_existing_ssts_with_no_udt = + cfds_to_mark_no_udt_.find(cfd->GetID()) != cfds_to_mark_no_udt_.end(); bool file_boundaries_need_handling = false; for (auto& new_file : new_files) { FileMetaData& meta = new_file.second; - if (meta.user_defined_timestamps_persisted) { + if (meta.user_defined_timestamps_persisted && + !mark_existing_ssts_with_no_udt) { // `FileMetaData.user_defined_timestamps_persisted` field is the value of // the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` // at the time when the SST file was created. As a result, all added SST @@ -689,6 +703,11 @@ Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles( break; } file_boundaries_need_handling = true; + assert(!meta.user_defined_timestamps_persisted || + mark_existing_ssts_with_no_udt); + if (mark_existing_ssts_with_no_udt) { + meta.user_defined_timestamps_persisted = false; + } std::string smallest_buf; std::string largest_buf; PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(), diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 54454cf70fa..dd55a4de91d 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -202,6 +202,7 @@ class VersionEditHandler : public VersionEditHandlerBase { bool initialized_; std::unique_ptr> cf_to_cmp_names_; EpochNumberRequirement epoch_number_requirement_; + std::unordered_set cfds_to_mark_no_udt_; private: Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index f3473b47653..c4738990173 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -58,6 +58,7 @@ TEST_F(VersionEditTest, EncodeDecode) { } edit.SetComparatorName("foo"); + edit.SetPersistUserDefinedTimestamps(true); edit.SetLogNumber(kBig + 100); edit.SetNextFile(kBig + 200); edit.SetLastSequence(kBig + 1000); @@ -95,6 +96,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) { edit.DeleteFile(4, 700); edit.SetComparatorName("foo"); + edit.SetPersistUserDefinedTimestamps(false); edit.SetLogNumber(kBig + 100); edit.SetNextFile(kBig + 200); edit.SetLastSequence(kBig + 1000); @@ -125,6 +127,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) { ASSERT_FALSE(new_files[1].second.user_defined_timestamps_persisted); ASSERT_TRUE(new_files[2].second.user_defined_timestamps_persisted); ASSERT_TRUE(new_files[3].second.user_defined_timestamps_persisted); + ASSERT_FALSE(parsed.GetPersistUserDefinedTimestamps()); } TEST_F(VersionEditTest, EncodeDecodeNewFile4HandleFileBoundary) { @@ -195,6 +198,7 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { edit.DeleteFile(4, 700); edit.SetComparatorName("foo"); + edit.SetPersistUserDefinedTimestamps(true); edit.SetLogNumber(kBig + 100); edit.SetNextFile(kBig + 200); edit.SetLastSequence(kBig + 1000); @@ -230,6 +234,7 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { ASSERT_EQ(3u, new_files[0].second.fd.GetPathId()); ASSERT_EQ(3u, new_files[1].second.fd.GetPathId()); ASSERT_EQ(1u, parsed.GetDeletedFiles().size()); + ASSERT_TRUE(parsed.GetPersistUserDefinedTimestamps()); } TEST_F(VersionEditTest, NewFile4NotSupportedField) { @@ -240,9 +245,10 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) { kBig + 600, true, Temperature::kUnknown, kInvalidBlobFileNumber, kUnknownOldestAncesterTime, kUnknownFileCreationTime, 300 /* epoch_number */, kUnknownFileChecksum, - kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0, true); + kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0, false); edit.SetComparatorName("foo"); + edit.SetPersistUserDefinedTimestamps(false); edit.SetLogNumber(kBig + 100); edit.SetNextFile(kBig + 200); edit.SetLastSequence(kBig + 1000); diff --git a/db/version_set.cc b/db/version_set.cc index e95e98f793f..32dd4b8d99f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6448,6 +6448,8 @@ Status VersionSet::WriteCurrentStateToManifest( } edit.SetComparatorName( cfd->internal_comparator().user_comparator()->Name()); + edit.SetPersistUserDefinedTimestamps( + cfd->ioptions()->persist_user_defined_timestamps); std::string record; if (!edit.EncodeTo(&record)) { return Status::Corruption("Unable to Encode VersionEdit:" + diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 1d5971c5900..135b2d64f88 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1349,6 +1349,8 @@ class VersionSetTestBase { new_cf.SetColumnFamily(new_id); new_cf.SetLogNumber(0); new_cf.SetComparatorName(cf_options.comparator->Name()); + new_cf.SetPersistUserDefinedTimestamps( + cf_options.persist_user_defined_timestamps); Status s; mutex_.Lock(); s = versions_->LogAndApply(/*column_family_data=*/nullptr, diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 5be134a9d51..ab394977d11 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -1176,8 +1176,7 @@ struct AdvancedColumnFamilyOptions { // persisted to WAL even if this flag is set to `false`. The benefit of this // is that user-defined timestamps can be recovered with the caveat that users // should flush all memtables so there is no active WAL files before doing a - // downgrade or toggling on / off the user-defined timestamp feature on a - // column family. + // downgrade. // // Note that setting this flag to false is not supported in combination with // atomic flush, or concurrent memtable write enabled by diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 3da65a8487d..3f6ebc71eb0 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1434,7 +1434,7 @@ public void getLiveFiles() throws RocksDBException { try (final RocksDB db = RocksDB.open(options, dbPath)) { final RocksDB.LiveFiles livefiles = db.getLiveFiles(true); assertThat(livefiles).isNotNull(); - assertThat(livefiles.manifestFileSize).isEqualTo(66); + assertThat(livefiles.manifestFileSize).isEqualTo(70); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005"); diff --git a/unreleased_history/new_features/enable_disable_udt.md b/unreleased_history/new_features/enable_disable_udt.md new file mode 100644 index 00000000000..d4a7ce850be --- /dev/null +++ b/unreleased_history/new_features/enable_disable_udt.md @@ -0,0 +1 @@ +Add support to allow enabling / disabling user-defined timestamps feature for an existing column family in combination with the in-Memtable only feature. \ No newline at end of file diff --git a/util/udt_util.cc b/util/udt_util.cc index 39422fa96bd..9380f456042 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -100,6 +100,40 @@ Status CheckWriteBatchTimestampSizeConsistency( } return Status::OK(); } + +enum class ToggleUDT { + kUnchanged, + kEnableUDT, + kDisableUDT, + kInvalidChange, +}; + +ToggleUDT CompareComparator(const Comparator* new_comparator, + const std::string& old_comparator_name) { + static const char* kUDTSuffix = ".u64ts"; + static const Slice kSuffixSlice = kUDTSuffix; + static const size_t kSuffixSize = 6; + size_t ts_sz = new_comparator->timestamp_size(); + (void)ts_sz; + Slice new_ucmp_name(new_comparator->Name()); + Slice old_ucmp_name(old_comparator_name); + if (new_ucmp_name.compare(old_ucmp_name) == 0) { + return ToggleUDT::kUnchanged; + } + if (new_ucmp_name.size() == old_ucmp_name.size() + kSuffixSize && + new_ucmp_name.starts_with(old_ucmp_name) && + new_ucmp_name.ends_with(kSuffixSlice)) { + assert(ts_sz == 8); + return ToggleUDT::kEnableUDT; + } + if (old_ucmp_name.size() == new_ucmp_name.size() + kSuffixSize && + old_ucmp_name.starts_with(new_ucmp_name) && + old_ucmp_name.ends_with(kSuffixSlice)) { + assert(ts_sz == 0); + return ToggleUDT::kDisableUDT; + } + return ToggleUDT::kInvalidChange; +} } // namespace TimestampRecoveryHandler::TimestampRecoveryHandler( @@ -261,4 +295,49 @@ Status HandleWriteBatchTimestampSizeDifference( } return Status::OK(); } + +Status ValidateUserDefinedTimestampsOptions( + const Comparator* new_comparator, const std::string& old_comparator_name, + bool new_persist_udt, bool old_persist_udt, + bool* mark_sst_files_has_no_udt) { + size_t ts_sz = new_comparator->timestamp_size(); + ToggleUDT res = CompareComparator(new_comparator, old_comparator_name); + switch (res) { + case ToggleUDT::kUnchanged: + if (old_persist_udt == new_persist_udt) { + return Status::OK(); + } + if (ts_sz == 0) { + return Status::OK(); + } + return Status::InvalidArgument( + "Cannot toggle the persist_user_defined_timestamps flag for a column " + "family with user-defined timestamps feature enabled."); + case ToggleUDT::kEnableUDT: + if (!new_persist_udt) { + *mark_sst_files_has_no_udt = true; + return Status::OK(); + } + return Status::InvalidArgument( + "Cannot open a column family and enable user-defined timestamps " + "feature without setting persist_user_defined_timestamps flag to " + "false."); + case ToggleUDT::kDisableUDT: + if (!old_persist_udt) { + return Status::OK(); + } + return Status::InvalidArgument( + "Cannot open a column family and disable user-defined timestamps " + "feature if its existing persist_user_defined_timestamps flag is not " + "false."); + case ToggleUDT::kInvalidChange: + return Status::InvalidArgument( + new_comparator->Name(), + "does not match existing comparator " + old_comparator_name); + default: + break; + } + return Status::InvalidArgument( + "Unsupported user defined timestamps settings change."); +} } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util.h b/util/udt_util.h index fc980a04b41..4bc83773983 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -215,4 +215,35 @@ Status HandleWriteBatchTimestampSizeDifference( const UnorderedMap& record_ts_sz, TimestampSizeConsistencyMode check_mode, std::unique_ptr* new_batch = nullptr); + +// This util function is used when opening an existing column family and +// processing its VersionEdit. It does a sanity check for the column family's +// old user comparator and the persist_user_defined_timestamps flag as recorded +// in the VersionEdit, against its new settings from the column family's +// ImmutableCFOptions. +// +// Valid settings change include: +// 1) no user comparator change and no effective persist_user_defined_timestamp +// flag change. +// 2) switch user comparator to enable user-defined timestamps feature provided +// the immediately effective persist_user_defined_timestamps flag is false. +// 3) switch user comparator to disable user-defined timestamps feature provided +// that the before-change persist_user_defined_timestamps is already false. +// +// Switch user comparator to disable/enable UDT is only sanity checked by a user +// comparator name comparison. The full check includes enforcing the new user +// comparator ranks user keys exactly the same as the old user comparator and +// only add / remove the user-defined timestamp comparison. We don't have ways +// to strictly enforce this so currently only the RocksDB builtin comparator +// wrapper `ComparatorWithU64TsImpl` is supported to enable / disable +// user-defined timestamps. It formats user-defined timestamps as uint64_t. +// +// When the settings indicate a legit change to enable user-defined timestamps +// feature on a column family, `mark_sst_files_has_no_udt` will be set to true +// to indicate marking all existing SST files has no user-defined timestamps +// when re-writing the manifest. +Status ValidateUserDefinedTimestampsOptions( + const Comparator* new_comparator, const std::string& old_comparator_name, + bool new_persist_udt, bool old_persist_udt, + bool* mark_sst_files_has_no_udt); } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index 63f35992a39..9fcbc9bc7d8 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -321,6 +321,122 @@ TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { TimestampSizeConsistencyMode::kReconcileInconsistency) .IsInvalidArgument()); } + +TEST(ValidateUserDefinedTimestampsOptionsTest, EnableUserDefinedTimestamps) { + bool mark_sst_files = false; + const Comparator* new_comparator = test::BytewiseComparatorWithU64TsWrapper(); + const Comparator* old_comparator = BytewiseComparator(); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + false /*new_persist_udt*/, true /*old_persist_udt*/, &mark_sst_files)); + ASSERT_TRUE(mark_sst_files); + + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + false /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_TRUE(mark_sst_files); +} + +TEST(ValidateUserDefinedTimestampsOptionsTest, + EnableUserDefinedTimestampsNewPersistUDTFlagIncorrect) { + bool mark_sst_files = false; + const Comparator* new_comparator = test::BytewiseComparatorWithU64TsWrapper(); + const Comparator* old_comparator = BytewiseComparator(); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + true /*new_persist_udt*/, true /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + true /*new_persist_udt*/, false /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); +} + +TEST(ValidateUserDefinedTimestampsOptionsTest, DisableUserDefinedTimestamps) { + bool mark_sst_files = false; + const Comparator* new_comparator = BytewiseComparator(); + const Comparator* old_comparator = test::BytewiseComparatorWithU64TsWrapper(); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + false /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + true /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); +} + +TEST(ValidateUserDefinedTimestampsOptionsTest, + DisableUserDefinedTimestampsOldPersistUDTFlagIncorrect) { + bool mark_sst_files = false; + const Comparator* new_comparator = BytewiseComparator(); + const Comparator* old_comparator = test::BytewiseComparatorWithU64TsWrapper(); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + false /*new_persist_udt*/, true /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + true /*new_persist_udt*/, true /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); +} + +TEST(ValidateUserDefinedTimestampsOptionsTest, UserComparatorUnchanged) { + bool mark_sst_files = false; + const Comparator* ucmp_without_ts = BytewiseComparator(); + const Comparator* ucmp_with_ts = test::BytewiseComparatorWithU64TsWrapper(); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_without_ts, std::string(ucmp_without_ts->Name()), + false /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_without_ts, std::string(ucmp_without_ts->Name()), + true /*new_persist_udt*/, true /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_without_ts, std::string(ucmp_without_ts->Name()), + true /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_without_ts, std::string(ucmp_without_ts->Name()), + false /*new_persist_udt*/, true /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_with_ts, std::string(ucmp_with_ts->Name()), true /*new_persist_udt*/, + true /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + ASSERT_OK(ValidateUserDefinedTimestampsOptions( + ucmp_with_ts, std::string(ucmp_with_ts->Name()), + false /*new_persist_udt*/, false /*old_persist_udt*/, &mark_sst_files)); + ASSERT_FALSE(mark_sst_files); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + ucmp_with_ts, std::string(ucmp_with_ts->Name()), + true /*new_persist_udt*/, false /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + ucmp_with_ts, std::string(ucmp_with_ts->Name()), + false /*new_persist_udt*/, true /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); +} + +TEST(ValidateUserDefinedTimestampsOptionsTest, InvalidUserComparatorChange) { + bool mark_sst_files = false; + const Comparator* new_comparator = BytewiseComparator(); + const Comparator* old_comparator = ReverseBytewiseComparator(); + ASSERT_TRUE(ValidateUserDefinedTimestampsOptions( + new_comparator, std::string(old_comparator->Name()), + false /*new_persist_udt*/, true /*old_persist_udt*/, + &mark_sst_files) + .IsInvalidArgument()); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {