Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jul 27, 2023
1 parent 20da703 commit f767683
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 57 deletions.
27 changes: 14 additions & 13 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,40 +1203,41 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
Status::Corruption("log record too small"));
continue;
}

// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
std::unique_ptr<WriteBatch> batch(new WriteBatch());
WriteBatch batch;
std::unique_ptr<WriteBatch> new_batch;

status = WriteBatchInternal::SetContents(batch.get(), record);
status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
return status;
}

const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader.GetRecordedTimestampSize();
bool batch_updated = false;
status = HandleWriteBatchTimestampSizeDifference(
batch.get(), running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &batch,
&batch_updated);
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch);
if (!status.ok()) {
return status;
}

bool batch_updated = new_batch != nullptr;
WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
batch.get());
batch_to_use);
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
&record_checksum);
status = WriteBatchInternal::UpdateProtectionInfo(
batch.get(), 8 /* bytes_per_key */,
batch_to_use, 8 /* bytes_per_key */,
batch_updated ? nullptr : &record_checksum);
if (!status.ok()) {
return status;
}

SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get());
SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);

if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
Expand All @@ -1257,7 +1258,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// and returns true.
if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
status, stop_replay_by_wal_filter,
*batch)) {
*batch_to_use)) {
continue;
}

Expand All @@ -1268,7 +1269,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
batch.get(), column_family_memtables_.get(), &flush_scheduler_,
batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
Expand Down Expand Up @@ -2232,4 +2233,4 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE
225 changes: 224 additions & 1 deletion db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,229 @@ TEST_F(DBWALTest, Recover) {
} while (ChangeWalOptions());
}

class DBWALTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
public:
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}

Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
const Options& ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.allow_concurrent_memtable_write =
persist_udt ? true : false;
DestroyAndReopen(default_options);
CreateColumnFamilies(cfs, ts_options);
return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
avoid_flush_during_recovery);
}

Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.allow_concurrent_memtable_write =
persist_udt ? true : false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;

std::vector<Options> cf_options(cfs.size(), ts_options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
cf_options.insert(cf_options.begin(), default_options);
Close();
return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
}

Status Put(uint32_t cf, const Slice& key, const Slice& ts,
const Slice& value) {
WriteOptions write_opts;
return db_->Put(write_opts, handles_[cf], key, ts, value);
}

void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
const std::string& expected_value,
const std::string& expected_ts) {
std::string actual_value;
std::string actual_ts;
ASSERT_OK(
db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}
};

TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// Set up the option that enables user defined timestmp size.
std::string ts1;
PutFixed64(&ts1, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
// Test that user-defined timestamps are recovered from WAL regardless of
// the value of this flag because UDTs are saved in WAL nonetheless.
// We however need to explicitly disable flush during recovery by setting
// `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
// stripped when the `persist_user_defined_timestamps` flag is false, so that
// all written timestamps are available for testing user-defined time travel
// read.
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
bool avoid_flush_during_recovery = true;

ReadOptions read_opts;
do {
Slice ts_slice = ts1;
read_opts.timestamp = &ts_slice;
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, "foo", ts1, "v1"));
ASSERT_OK(Put(1, "baz", ts1, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
// Do a timestamped read with ts1 after second reopen.
CheckGet(read_opts, 1, "foo", "v1", ts1);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Write more value versions for key "foo" and "bar" before and after second
// reopen.
std::string ts2;
PutFixed64(&ts2, 2);
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
std::string ts3;
PutFixed64(&ts3, 3);
ASSERT_OK(Put(1, "foo", ts3, "v4"));

// All the key value pairs available for read:
// "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")]
// "bar" -> [(ts2, "v2")]
// "baz" -> [(ts1, "v5")]
// Do a timestamped read with ts1 after third reopen.
// read_opts.timestamp is set to ts1 for below reads
CheckGet(read_opts, 1, "foo", "v1", ts1);
std::string value;
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts2 after third reopen.
ts_slice = ts2;
// read_opts.timestamp is set to ts2 for below reads.
CheckGet(read_opts, 1, "foo", "v3", ts2);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts3 after third reopen.
ts_slice = ts3;
// read_opts.timestamp is set to ts3 for below reads.
CheckGet(read_opts, 1, "foo", "v4", ts3);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
} while (ChangeWalOptions());
}

TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestamp size.
std::string min_ts;
std::string write_ts;
PutFixed64(&min_ts, 0);
PutFixed64(&write_ts, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;

std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";

ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
// No flush, no sst files, because of no data.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
// Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
// defaults to false, created one L0 file.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);

std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
} else {
ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
}
}

// Param 0: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
P, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));

TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
Options options;
options.create_if_missing = true;
options.comparator = BytewiseComparator();
bool avoid_flush_during_recovery = true;
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */,
avoid_flush_during_recovery));

ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));

options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
// Test handle timestamp size inconsistency in WAL when enabling user-defined
// timestamps.
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
false /* persist_udt */,
avoid_flush_during_recovery));

std::string ts;
PutFixed64(&ts, 0);
Slice ts_slice = ts;
ReadOptions read_opts;
read_opts.timestamp = &ts_slice;
// Pre-existing entries are treated as if they have the min timestamp.
CheckGet(read_opts, 1, "foo", "v1", ts);
CheckGet(read_opts, 1, "baz", "v5", ts);
ts.clear();
PutFixed64(&ts, 1);
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
CheckGet(read_opts, 1, "foo", "v2", ts);
CheckGet(read_opts, 1, "baz", "v6", ts);

options.comparator = BytewiseComparator();
// Open the column family again with the UDT feature disabled. Test handle
// timestamp size inconsistency in WAL when disabling user-defined timestamps
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
true /* persist_udt */,
avoid_flush_during_recovery));
ASSERT_EQ("v2", Get(1, "foo"));
ASSERT_EQ("v6", Get(1, "baz"));
}

TEST_F(DBWALTest, RecoverWithTableHandle) {
do {
Options options = CurrentOptions();
Expand Down Expand Up @@ -2440,4 +2663,4 @@ int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
31 changes: 12 additions & 19 deletions util/udt_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,27 @@ enum class ToggleUDT {
kInvalidChange,
};

bool IsPrefix(const char* str, const char* prefix) {
return strncmp(str, prefix, strlen(prefix)) == 0;
}

ToggleUDT CompareComparator(const Comparator* new_comparator,
const std::string& old_comparator_name) {
static const char* kUDTSuffix = ".u64ts";
static const Slice kSuffixSlice = kUDTSuffix;
static const size_t kSuffixSize = 6;
size_t ts_sz = new_comparator->timestamp_size();
(void)ts_sz;
const char* new_comparator_name = new_comparator->Name();
size_t new_name_size = strlen(new_comparator_name);
size_t old_name_size = old_comparator_name.size();
if (new_name_size == old_name_size &&
IsPrefix(new_comparator_name, old_comparator_name.data())) {
Slice new_ucmp_name(new_comparator->Name());
Slice old_ucmp_name(old_comparator_name);
if (new_ucmp_name.compare(old_ucmp_name) == 0) {
return ToggleUDT::kUnchanged;
}
if (new_name_size == old_name_size + kSuffixSize &&
IsPrefix(new_comparator_name, old_comparator_name.data()) &&
IsPrefix(new_comparator_name + old_name_size, kUDTSuffix)) {
if (new_ucmp_name.size() == old_ucmp_name.size() + kSuffixSize &&
new_ucmp_name.starts_with(old_ucmp_name) &&
new_ucmp_name.ends_with(kSuffixSlice)) {
assert(ts_sz == 8);
return ToggleUDT::kEnableUDT;
}
if (new_name_size + kSuffixSize == old_name_size &&
IsPrefix(old_comparator_name.data(), new_comparator_name) &&
IsPrefix(old_comparator_name.data() + new_name_size, kUDTSuffix)) {
if (old_ucmp_name.size() == new_ucmp_name.size() + kSuffixSize &&
old_ucmp_name.starts_with(new_ucmp_name) &&
old_ucmp_name.ends_with(kSuffixSlice)) {
assert(ts_sz == 0);
return ToggleUDT::kDisableUDT;
}
Expand Down Expand Up @@ -276,7 +271,7 @@ Status HandleWriteBatchTimestampSizeDifference(
const UnorderedMap<uint32_t, size_t>& running_ts_sz,
const UnorderedMap<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode,
std::unique_ptr<WriteBatch>* new_batch, bool* batch_updated) {
std::unique_ptr<WriteBatch>* new_batch) {
// Quick path to bypass checking the WriteBatch.
if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
return Status::OK();
Expand All @@ -288,7 +283,6 @@ Status HandleWriteBatchTimestampSizeDifference(
return status;
} else if (need_recovery) {
assert(new_batch);
assert(batch_updated);
SequenceNumber sequence = WriteBatchInternal::Sequence(batch);
TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz);
status = batch->Iterate(&recovery_handler);
Expand All @@ -297,7 +291,6 @@ Status HandleWriteBatchTimestampSizeDifference(
} else {
*new_batch = recovery_handler.TransferNewBatch();
WriteBatchInternal::SetSequence(new_batch->get(), sequence);
*batch_updated = true;
}
}
return Status::OK();
Expand Down Expand Up @@ -347,4 +340,4 @@ Status ValidateUserDefinedTimestampsOptions(
return Status::InvalidArgument(
"Unsupported user defined timestamps settings change.");
}
} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit f767683

Please sign in to comment.