Skip to content

Commit

Permalink
Support switching on / off UDT together with in-Memtable-only feature (
Browse files Browse the repository at this point in the history
…#11623)

Summary:
Add support to allow enabling / disabling user-defined timestamps feature for an existing column family in combination with the in-Memtable only feature.

To do this, this PR includes:
1) Log the `persist_user_defined_timestamps` option per column family in Manifest to facilitate detecting an attempt to enable / disable UDT. This entry is enforced to be logged in the same VersionEdit as the user comparator name entry.

2) User-defined timestamps related options are validated when re-opening a column family, including user comparator name and the `persist_user_defined_timestamps` flag. These type of settings and settings change are considered valid:
     a) no user comparator change and no effective `persist_user_defined_timestamp` flag change.
     b) switch user comparator to enable UDT provided the immediately effective `persist_user_defined_timestamps` flag
         is false.
     c) switch user comparator to disable UDT provided that the before-change `persist_user_defined_timestamps` is
         already false.
3) when an attempt to enable UDT is detected, we mark all its existing SST files as "having no UDT" by marking its `FileMetaData.user_defined_timestamps_persisted` flag to false and handle their file boundaries `FileMetaData.smallest`, `FileMetaData.largest` by padding a min timestamp.

4) while enabling / disabling UDT feature, timestamp size inconsistency in existing WAL logs are handled to make it compatible with the running user comparator.

Pull Request resolved: #11623

Test Plan:
```
make all check
./db_with_timestamp_basic_test --gtest-filter="*EnableDisableUDT*"
./db_wal_test --gtest_filter="*EnableDisableUDT*"
```

Reviewed By: ltamasi

Differential Revision: D47636862

Pulled By: jowlyzhang

fbshipit-source-id: dcd19f67292da3c3cc9584c09ad00331c9ab9322
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Jul 27, 2023
1 parent 4ea7b79 commit c24ef26
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 88 deletions.
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3191,6 +3191,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(cf_options.comparator->Name());
edit.SetPersistUserDefinedTimestamps(
cf_options.persist_user_defined_timestamps);

// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Expand Down
21 changes: 12 additions & 9 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,10 +1203,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
Status::Corruption("log record too small"));
continue;
}

// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
WriteBatch batch;
std::unique_ptr<WriteBatch> new_batch;

status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
Expand All @@ -1215,26 +1215,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,

const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader.GetRecordedTimestampSize();
// TODO(yuzhangyu): update mode to kReconcileInconsistency when user
// comparator can be changed.
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency);
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch);
if (!status.ok()) {
return status;
}

bool batch_updated = new_batch != nullptr;
WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch);
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
batch_to_use);
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
&record_checksum);
status = WriteBatchInternal::UpdateProtectionInfo(
&batch, 8 /* bytes_per_key */, &record_checksum);
batch_to_use, 8 /* bytes_per_key */,
batch_updated ? nullptr : &record_checksum);
if (!status.ok()) {
return status;
}

SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);

if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
Expand All @@ -1255,7 +1258,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// and returns true.
if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
status, stop_replay_by_wal_filter,
batch)) {
*batch_to_use)) {
continue;
}

Expand All @@ -1266,7 +1269,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_,
batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
Expand Down
137 changes: 70 additions & 67 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,30 +318,25 @@ class DBWALTestWithTimestamp
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}

void SetUp() override {
persist_udt_ = test::ShouldPersistUDT(GetParam());
DBBasicTestWithTimestampBase::SetUp();
}

Status CreateAndReopenWithCFWithTs(const std::vector<std::string>& cfs,
Options& ts_options,
bool avoid_flush_during_recovery = false) {
Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
const Options& ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
persist_udt ? true : false;
DestroyAndReopen(default_options);
CreateColumnFamilies(cfs, ts_options);
return ReopenColumnFamiliesWithTs(cfs, ts_options,
return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
avoid_flush_during_recovery);
}

Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options,
Options ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
persist_udt ? true : false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;

Expand Down Expand Up @@ -369,9 +364,6 @@ class DBWALTestWithTimestamp
ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}

protected:
bool persist_udt_;
};

TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
Expand All @@ -388,20 +380,21 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// stripped when the `persist_user_defined_timestamps` flag is false, so that
// all written timestamps are available for testing user-defined time travel
// read.
ts_options.persist_user_defined_timestamps = persist_udt_;
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
bool avoid_flush_during_recovery = true;

ReadOptions read_opts;
do {
Slice ts_slice = ts1;
read_opts.timestamp = &ts_slice;
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, "foo", ts1, "v1"));
ASSERT_OK(Put(1, "baz", ts1, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
// Do a timestamped read with ts1 after second reopen.
Expand All @@ -415,75 +408,40 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
std::string ts3;
PutFixed64(&ts3, 3);
ASSERT_OK(Put(1, "foo", ts3, "v4"));

// All the key value pairs available for read:
// "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")]
// "bar" -> [(ts2, "v2")]
// "baz" -> [(ts1, "v5")]
// Do a timestamped read with ts1 after third reopen.
// read_opts.timestamp is set to ts1 for below reads
CheckGet(read_opts, 1, "foo", "v1", ts1);
std::string value;
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts2 after third reopen.
ts_slice = ts2;
// read_opts.timestamp is set to ts2 for below reads.
CheckGet(read_opts, 1, "foo", "v3", ts2);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts3 after third reopen.
ts_slice = ts3;
// read_opts.timestamp is set to ts3 for below reads.
CheckGet(read_opts, 1, "foo", "v4", ts3);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
} while (ChangeWalOptions());
}

class TestTsSzComparator : public Comparator {
public:
explicit TestTsSzComparator(size_t ts_sz) : Comparator(ts_sz) {}

int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTsSzComparator.u64ts"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};

TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// Set up the option that enables user defined timestmp size.
std::string ts;
PutFixed16(&ts, 1);
TestTsSzComparator test_cmp(2);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ts_options.persist_user_defined_timestamps = persist_udt_;

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));

// In real use cases, switching to a different user comparator is prohibited
// by a sanity check during DB open that does a user comparator name
// comparison. This test mocked and bypassed that sanity check because the
// before and after user comparator are both named "TestTsSzComparator.u64ts".
// This is to test the user-defined timestamp recovery logic for WAL files
// have the intended consistency check.
// `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details.
TestTsSzComparator diff_test_cmp(3);
ts_options.comparator = &diff_test_cmp;
ASSERT_TRUE(
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
}

TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestamp size.
std::string min_ts;
Expand All @@ -493,18 +451,19 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ts_options.persist_user_defined_timestamps = persist_udt_;
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;

std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
// No flush, no sst files, because of no data.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
// Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
// defaults to false, created one L0 file.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);
Expand All @@ -515,7 +474,7 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt_) {
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
} else {
Expand All @@ -526,11 +485,55 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {

// Param 0: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
DBWALTestWithTimestamp, DBWALTestWithTimestamp,
P, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));

TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
Options options;
options.create_if_missing = true;
options.comparator = BytewiseComparator();
bool avoid_flush_during_recovery = true;
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */,
avoid_flush_during_recovery));

ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));

options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
// Test handle timestamp size inconsistency in WAL when enabling user-defined
// timestamps.
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
false /* persist_udt */,
avoid_flush_during_recovery));

std::string ts;
PutFixed64(&ts, 0);
Slice ts_slice = ts;
ReadOptions read_opts;
read_opts.timestamp = &ts_slice;
// Pre-existing entries are treated as if they have the min timestamp.
CheckGet(read_opts, 1, "foo", "v1", ts);
CheckGet(read_opts, 1, "baz", "v5", ts);
ts.clear();
PutFixed64(&ts, 1);
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
CheckGet(read_opts, 1, "foo", "v2", ts);
CheckGet(read_opts, 1, "baz", "v6", ts);

options.comparator = BytewiseComparator();
// Open the column family again with the UDT feature disabled. Test handle
// timestamp size inconsistency in WAL when disabling user-defined timestamps
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
true /* persist_udt */,
avoid_flush_during_recovery));
ASSERT_EQ("v2", Get(1, "foo"));
ASSERT_EQ("v6", Get(1, "baz"));
}

TEST_F(DBWALTest, RecoverWithTableHandle) {
do {
Options options = CurrentOptions();
Expand Down
63 changes: 63 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3346,6 +3346,69 @@ INSTANTIATE_TEST_CASE_P(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));

TEST_F(DBBasicTestWithTimestamp, EnableDisableUDT) {
Options options = CurrentOptions();
options.env = env_;
// Create a column family without user-defined timestamps.
options.comparator = BytewiseComparator();
options.persist_user_defined_timestamps = true;
DestroyAndReopen(options);

// Create one SST file, its user keys have no user-defined timestamps.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "val1"));
ASSERT_OK(Flush(0));
Close();

// Reopen the existing column family and enable user-defined timestamps
// feature for it.
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
options.allow_concurrent_memtable_write = false;
Reopen(options);

std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "foo", &value).IsInvalidArgument());
std::string read_ts;
PutFixed64(&read_ts, 0);
ReadOptions ropts;
Slice read_ts_slice = read_ts;
ropts.timestamp = &read_ts_slice;
std::string key_ts;
// Entries in pre-existing SST files are treated as if they have minimum
// user-defined timestamps.
ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts));
ASSERT_EQ("val1", value);
ASSERT_EQ(read_ts, key_ts);

// Do timestamped read / write.
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val2"));
read_ts.clear();
PutFixed64(&read_ts, 1);
ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts));
ASSERT_EQ("val2", value);
ASSERT_EQ(write_ts, key_ts);
// The user keys in this SST file don't have user-defined timestamps either,
// because `persist_user_defined_timestamps` flag is set to false.
ASSERT_OK(Flush(0));
Close();

// Reopen the existing column family while disabling user-defined timestamps.
options.comparator = BytewiseComparator();
Reopen(options);

ASSERT_TRUE(db_->Get(ropts, "foo", &value).IsInvalidArgument());
ASSERT_OK(db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("val2", value);

// Continue to write / read the column family without user-defined timestamps.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "val3"));
ASSERT_OK(db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("val3", value);
Close();
}

TEST_F(DBBasicTestWithTimestamp,
GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
Options options = CurrentOptions();
Expand Down
1 change: 1 addition & 0 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class FlushJobTestBase : public testing::Test {
new_cf.AddColumnFamily(column_family_names_[i]);
new_cf.SetColumnFamily(cf_id++);
new_cf.SetComparatorName(ucmp_->Name());
new_cf.SetPersistUserDefinedTimestamps(persist_udt_);
new_cf.SetLogNumber(0);
new_cf.SetNextFile(2);
new_cf.SetLastSequence(last_seq++);
Expand Down
Loading

0 comments on commit c24ef26

Please sign in to comment.