Skip to content

Commit

Permalink
Support switching on / off UDT together with in-Memtable-only feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jul 27, 2023
1 parent 4ea7b79 commit 20da703
Show file tree
Hide file tree
Showing 19 changed files with 418 additions and 250 deletions.
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3191,6 +3191,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(cf_options.comparator->Name());
edit.SetPersistUserDefinedTimestamps(
cf_options.persist_user_defined_timestamps);

// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Expand Down
24 changes: 13 additions & 11 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1206,35 +1206,37 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,

// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
WriteBatch batch;
std::unique_ptr<WriteBatch> batch(new WriteBatch());

status = WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::SetContents(batch.get(), record);
if (!status.ok()) {
return status;
}

const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader.GetRecordedTimestampSize();
// TODO(yuzhangyu): update mode to kReconcileInconsistency when user
// comparator can be changed.
bool batch_updated = false;
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency);
batch.get(), running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &batch,
&batch_updated);
if (!status.ok()) {
return status;
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch);
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
batch.get());
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
&record_checksum);
status = WriteBatchInternal::UpdateProtectionInfo(
&batch, 8 /* bytes_per_key */, &record_checksum);
batch.get(), 8 /* bytes_per_key */,
batch_updated ? nullptr : &record_checksum);
if (!status.ok()) {
return status;
}

SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get());

if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
Expand All @@ -1255,7 +1257,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// and returns true.
if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
status, stop_replay_by_wal_filter,
batch)) {
*batch)) {
continue;
}

Expand All @@ -1266,7 +1268,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_,
batch.get(), column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
Expand Down
220 changes: 0 additions & 220 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,226 +311,6 @@ TEST_F(DBWALTest, Recover) {
} while (ChangeWalOptions());
}

class DBWALTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
public:
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}

void SetUp() override {
persist_udt_ = test::ShouldPersistUDT(GetParam());
DBBasicTestWithTimestampBase::SetUp();
}

Status CreateAndReopenWithCFWithTs(const std::vector<std::string>& cfs,
Options& ts_options,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
DestroyAndReopen(default_options);
CreateColumnFamilies(cfs, ts_options);
return ReopenColumnFamiliesWithTs(cfs, ts_options,
avoid_flush_during_recovery);
}

Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;

std::vector<Options> cf_options(cfs.size(), ts_options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
cf_options.insert(cf_options.begin(), default_options);
Close();
return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
}

Status Put(uint32_t cf, const Slice& key, const Slice& ts,
const Slice& value) {
WriteOptions write_opts;
return db_->Put(write_opts, handles_[cf], key, ts, value);
}

void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
const std::string& expected_value,
const std::string& expected_ts) {
std::string actual_value;
std::string actual_ts;
ASSERT_OK(
db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}

protected:
bool persist_udt_;
};

TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// Set up the option that enables user defined timestmp size.
std::string ts1;
PutFixed64(&ts1, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
// Test that user-defined timestamps are recovered from WAL regardless of
// the value of this flag because UDTs are saved in WAL nonetheless.
// We however need to explicitly disable flush during recovery by setting
// `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
// stripped when the `persist_user_defined_timestamps` flag is false, so that
// all written timestamps are available for testing user-defined time travel
// read.
ts_options.persist_user_defined_timestamps = persist_udt_;
bool avoid_flush_during_recovery = true;

ReadOptions read_opts;
do {
Slice ts_slice = ts1;
read_opts.timestamp = &ts_slice;
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, "foo", ts1, "v1"));
ASSERT_OK(Put(1, "baz", ts1, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
// Do a timestamped read with ts1 after second reopen.
CheckGet(read_opts, 1, "foo", "v1", ts1);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Write more value versions for key "foo" and "bar" before and after second
// reopen.
std::string ts2;
PutFixed64(&ts2, 2);
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
std::string ts3;
PutFixed64(&ts3, 3);
ASSERT_OK(Put(1, "foo", ts3, "v4"));

// Do a timestamped read with ts1 after third reopen.
CheckGet(read_opts, 1, "foo", "v1", ts1);
std::string value;
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts2 after third reopen.
ts_slice = ts2;
CheckGet(read_opts, 1, "foo", "v3", ts2);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts3 after third reopen.
ts_slice = ts3;
CheckGet(read_opts, 1, "foo", "v4", ts3);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
} while (ChangeWalOptions());
}

class TestTsSzComparator : public Comparator {
public:
explicit TestTsSzComparator(size_t ts_sz) : Comparator(ts_sz) {}

int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTsSzComparator.u64ts"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};

TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// Set up the option that enables user defined timestmp size.
std::string ts;
PutFixed16(&ts, 1);
TestTsSzComparator test_cmp(2);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ts_options.persist_user_defined_timestamps = persist_udt_;

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));

// In real use cases, switching to a different user comparator is prohibited
// by a sanity check during DB open that does a user comparator name
// comparison. This test mocked and bypassed that sanity check because the
// before and after user comparator are both named "TestTsSzComparator.u64ts".
// This is to test the user-defined timestamp recovery logic for WAL files
// have the intended consistency check.
// `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details.
TestTsSzComparator diff_test_cmp(3);
ts_options.comparator = &diff_test_cmp;
ASSERT_TRUE(
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
}

TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestamp size.
std::string min_ts;
std::string write_ts;
PutFixed64(&min_ts, 0);
PutFixed64(&write_ts, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ts_options.persist_user_defined_timestamps = persist_udt_;

std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
// No flush, no sst files, because of no data.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
// Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
// defaults to false, created one L0 file.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);

std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt_) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
} else {
ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
}
}

// Param 0: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
DBWALTestWithTimestamp, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));

TEST_F(DBWALTest, RecoverWithTableHandle) {
do {
Options options = CurrentOptions();
Expand Down
62 changes: 62 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3346,6 +3346,68 @@ INSTANTIATE_TEST_CASE_P(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));

TEST_F(DBBasicTestWithTimestamp, EnableDisableUDT) {
Options options = CurrentOptions();
options.env = env_;
// Create a column family without user-defined timestamps.
options.comparator = BytewiseComparator();
options.persist_user_defined_timestamps = true;
DestroyAndReopen(options);

// Create one SST file, its user keys have no user-defined timestamps.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "val1"));
ASSERT_OK(Flush(0));
Close();

// Reopen the existing column family and enable user-defined timestamps
// feature for it.
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
Reopen(options);

std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "foo", &value).IsInvalidArgument());
std::string read_ts;
PutFixed64(&read_ts, 0);
ReadOptions ropts;
Slice read_ts_slice = read_ts;
ropts.timestamp = &read_ts_slice;
std::string key_ts;
// Entries in pre-existing SST files are treated as if they have minimum
// user-defined timestamps.
ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts));
ASSERT_EQ("val1", value);
ASSERT_EQ(read_ts, key_ts);

// Do timestamped read / write.
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(db_->Put(WriteOptions(), "foo", write_ts, "val2"));
read_ts.clear();
PutFixed64(&read_ts, 1);
ASSERT_OK(db_->Get(ropts, "foo", &value, &key_ts));
ASSERT_EQ("val2", value);
ASSERT_EQ(write_ts, key_ts);
// The user keys in this SST file don't have user-defined timestamps either,
// because `persist_user_defined_timestamps` flag is set to false.
ASSERT_OK(Flush(0));
Close();

// Reopen the existing column family while disabling user-defined timestamps.
options.comparator = BytewiseComparator();
Reopen(options);

ASSERT_TRUE(db_->Get(ropts, "foo", &value).IsInvalidArgument());
ASSERT_OK(db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("val2", value);

// Continue to write / read the column family without user-defined timestamps.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "val3"));
ASSERT_OK(db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("val3", value);
Close();
}

TEST_F(DBBasicTestWithTimestamp,
GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
Options options = CurrentOptions();
Expand Down
1 change: 1 addition & 0 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class FlushJobTestBase : public testing::Test {
new_cf.AddColumnFamily(column_family_names_[i]);
new_cf.SetColumnFamily(cf_id++);
new_cf.SetComparatorName(ucmp_->Name());
new_cf.SetPersistUserDefinedTimestamps(persist_udt_);
new_cf.SetLogNumber(0);
new_cf.SetNextFile(2);
new_cf.SetLastSequence(last_seq++);
Expand Down
3 changes: 3 additions & 0 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class Repairer {

VersionEdit edit;
edit.SetComparatorName(opts.comparator->Name());
edit.SetPersistUserDefinedTimestamps(opts.persist_user_defined_timestamps);
edit.SetLogNumber(0);
edit.SetColumnFamily(cf_id);
ColumnFamilyData* cfd;
Expand Down Expand Up @@ -720,6 +721,8 @@ class Repairer {
// recovered epoch numbers
VersionEdit edit;
edit.SetComparatorName(cfd->user_comparator()->Name());
edit.SetPersistUserDefinedTimestamps(
cfd->ioptions()->persist_user_defined_timestamps);
edit.SetLogNumber(0);
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());
Expand Down
Loading

0 comments on commit 20da703

Please sign in to comment.