Skip to content

Commit

Permalink
verify
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 19, 2024
1 parent d957e1a commit 9f98a86
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 29 deletions.
7 changes: 5 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ class DbStressListener : public EventListener {
DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families,
Env* env)
Env* env, SharedState* shared)
: db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0),
unique_ids_(db_name, env) {}
unique_ids_(db_name, env),
shared_(shared) {}

const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DBStressListener"; }
Expand All @@ -74,6 +75,7 @@ class DbStressListener : public EventListener {
if (fault_fs_guard) {
fault_fs_guard->DisableAllThreadLocalErrorInjection();
}
shared_->SetPersistedSeqno(info.largest_seqno);
}

void OnFlushBegin(DB* /*db*/,
Expand Down Expand Up @@ -358,6 +360,7 @@ class DbStressListener : public EventListener {
std::vector<ColumnFamilyDescriptor> column_families_;
std::atomic<int> num_pending_file_creations_;
UniqueIdVerifier unique_ids_;
SharedState* shared_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
13 changes: 13 additions & 0 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class SharedState {
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint32_t>");
}
if (!std::atomic<uint64_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint64_t>");
}
if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on when "
Expand Down Expand Up @@ -260,6 +265,14 @@ class SharedState {
return expected_state_manager_->ClearColumnFamily(cf);
}

void SetPersistedSeqno(SequenceNumber seqno) {
return expected_state_manager_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() {
return expected_state_manager_->GetPersistedSeqno();
}

// Prepare a Put that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down
14 changes: 12 additions & 2 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3472,8 +3472,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}

options_.listeners.clear();
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors,
db_stress_listener_env, shared));
RegisterAdditionalListeners();

// If this is for DB reopen, error injection may have been enabled.
Expand Down Expand Up @@ -3714,6 +3715,15 @@ void StressTest::Open(SharedState* shared, bool reopen) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}

if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) {
fprintf(stderr,
"DB of latest sequence number %" PRIu64
"did not recover to the persisted "
"sequence number %" PRIu64 "from last DB session\n",
db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno());
exit(1);
}
}

void StressTest::Reopen(ThreadState* thread) {
Expand Down
91 changes: 66 additions & 25 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
}
}

FileExpectedState::FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families)
FileExpectedState::FileExpectedState(
std::string expected_state_file_path,
std::string expected_persisted_seqno_file_path, size_t max_key,
size_t num_column_families)
: ExpectedState(max_key, num_column_families),
expected_state_file_path_(expected_state_file_path) {}
expected_state_file_path_(expected_state_file_path),
expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {}

Status FileExpectedState::Open(bool create) {
size_t expected_values_size = GetValuesLen();
Expand All @@ -144,29 +147,44 @@ Status FileExpectedState::Open(bool create) {

Status status;
if (create) {
std::unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
status = default_env->NewWritableFile(expected_state_file_path_, &wfile,
soptions);
status = CreateFile(default_env, EnvOptions(), expected_state_file_path_,
std::string(expected_values_size, '\0'));
if (status.ok()) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
status = CreateFile(
default_env, EnvOptions(), expected_persisted_seqno_file_path_,
std::string(sizeof(std::atomic<SequenceNumber>), '\0'));
}
}
if (status.ok()) {
status = default_env->NewMemoryMappedFileBuffer(
expected_state_file_path_, &expected_state_mmap_buffer_);
status =
MemoryMappedFile(default_env, expected_state_file_path_,
expected_state_mmap_buffer_, expected_values_size);
if (status.ok()) {
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
}
} else {
assert(values_ == nullptr);
}
}

if (status.ok()) {
assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_,
expected_persisted_seqno_mmap_buffer_,
sizeof(std::atomic<SequenceNumber>));
if (status.ok()) {
persisted_seqno_ = static_cast<std::atomic<SequenceNumber>*>(
expected_persisted_seqno_mmap_buffer_->GetBase());
assert(persisted_seqno_ != nullptr);
if (create) {
persisted_seqno_->store(0, std::memory_order_relaxed);
}
} else {
assert(persisted_seqno_ == nullptr);
}
} else {
assert(values_ == nullptr);
}
return status;
}
Expand Down Expand Up @@ -200,6 +218,9 @@ ExpectedStateManager::~ExpectedStateManager() = default;
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST";
const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix =
".seqno";
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";

Expand Down Expand Up @@ -264,12 +285,17 @@ Status FileExpectedStateManager::Open() {

std::string expected_state_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string expected_persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);
bool found = false;
if (s.ok()) {
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
if (exists_status.ok()) {
found = true;
} else if (exists_status.IsNotFound()) {
assert(Env::Default()
->FileExists(expected_persisted_seqno_file_path)
.IsNotFound());
found = false;
} else {
s = exists_status;
Expand All @@ -282,20 +308,30 @@ Status FileExpectedStateManager::Open() {
// the incomplete expected values file.
std::string temp_expected_state_file_path =
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
FileExpectedState temp_expected_state(temp_expected_state_file_path,
max_key_, num_column_families_);
std::string temp_expected_persisted_seqno_file_path =
GetTempPathForFilename(kPersistedSeqnoBasename +
kPersistedSeqnoFilenameSuffix);
FileExpectedState temp_expected_state(
temp_expected_state_file_path, temp_expected_persisted_seqno_file_path,
max_key_, num_column_families_);
if (s.ok()) {
s = temp_expected_state.Open(true /* create */);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_state_file_path,
expected_state_file_path);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path,
expected_persisted_seqno_file_path);
}
}

if (s.ok()) {
latest_.reset(new FileExpectedState(std::move(expected_state_file_path),
max_key_, num_column_families_));
latest_.reset(
new FileExpectedState(std::move(expected_state_file_path),
std::move(expected_persisted_seqno_file_path),
max_key_, num_column_families_));
s = latest_->Open(false /* create */);
}
return s;
Expand Down Expand Up @@ -660,6 +696,9 @@ Status FileExpectedStateManager::Restore(DB* db) {
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);

std::string persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);

if (s.ok()) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
Expand All @@ -674,7 +713,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
std::unique_ptr<ExpectedState> state;
std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
if (s.ok()) {
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
state.reset(new FileExpectedState(latest_file_temp_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = state->Open(false /* create */);
}
Expand Down Expand Up @@ -720,7 +760,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
nullptr /* dbg */);
}
if (s.ok()) {
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
latest_.reset(new FileExpectedState(latest_file_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = latest_->Open(false /* create */);
}
Expand Down
46 changes: 46 additions & 0 deletions db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class ExpectedState {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf);

// Requires external locking
void SetPersistedSeqno(SequenceNumber seqno) {
persisted_seqno_->store(seqno, std::memory_order_relaxed);
}

// Requires external locking
SequenceNumber GetPersistedSeqno() {
return persisted_seqno_->load(std::memory_order_relaxed);
}

// Prepare a Put that will be started but not finished yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -123,21 +133,49 @@ class ExpectedState {
void Reset();

std::atomic<uint32_t>* values_;
std::atomic<SequenceNumber>* persisted_seqno_;
};

// A `FileExpectedState` implements `ExpectedState` backed by a file.
class FileExpectedState : public ExpectedState {
public:
explicit FileExpectedState(std::string expected_state_file_path,
std::string expected_persisted_seqno_file_path,
size_t max_key, size_t num_column_families);

// Requires external locking preventing concurrent execution with any other
// member function.
Status Open(bool create) override;

private:
static Status CreateFile(Env* env, const EnvOptions& options,
const std::string& file_path,
const std::string& content) {
std::unique_ptr<WritableFile> wfile;
Status status = env->NewWritableFile(file_path, &wfile, options);
if (status.ok()) {
status = wfile->Append(content);
}
return status;
}

static Status MemoryMappedFile(
Env* env, const std::string& file_path,
std::unique_ptr<MemoryMappedFileBuffer>& memory_mapped_file_buffer,
std::size_t size) {
Status status =
env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer);
if (status.ok()) {
assert(memory_mapped_file_buffer->GetLen() == size);
}
(void)size;
return status;
}

const std::string expected_state_file_path_;
const std::string expected_persisted_seqno_file_path_;
std::unique_ptr<MemoryMappedFileBuffer> expected_state_mmap_buffer_;
std::unique_ptr<MemoryMappedFileBuffer> expected_persisted_seqno_mmap_buffer_;
};

// An `AnonExpectedState` implements `ExpectedState` backed by a memory
Expand Down Expand Up @@ -195,6 +233,12 @@ class ExpectedStateManager {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }

void SetPersistedSeqno(SequenceNumber seqno) {
return latest_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() { return latest_->GetPersistedSeqno(); }

// See ExpectedState::PreparePut()
PendingExpectedValue PreparePut(int cf, int64_t key) {
return latest_->PreparePut(cf, key);
Expand Down Expand Up @@ -289,6 +333,8 @@ class FileExpectedStateManager : public ExpectedStateManager {
static const std::string kLatestBasename;
static const std::string kStateFilenameSuffix;
static const std::string kTraceFilenameSuffix;
static const std::string kPersistedSeqnoBasename;
static const std::string kPersistedSeqnoFilenameSuffix;
static const std::string kTempFilenamePrefix;
static const std::string kTempFilenameSuffix;

Expand Down

0 comments on commit 9f98a86

Please sign in to comment.