Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Jun 21, 2024
1 parent efba8f5 commit 6e5f737
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 27 deletions.
7 changes: 5 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ class DbStressListener : public EventListener {
DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families,
Env* env)
Env* env, SharedState* shared)
: db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0),
unique_ids_(db_name, env) {}
unique_ids_(db_name, env),
shared_(shared) {}

const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DBStressListener"; }
Expand All @@ -81,6 +82,7 @@ class DbStressListener : public EventListener {
fault_fs_guard->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataWrite);
}
shared_->SetPersistSeqno(info.largest_seqno);
}

void OnFlushBegin(DB* /*db*/,
Expand Down Expand Up @@ -385,6 +387,7 @@ class DbStressListener : public EventListener {
std::vector<ColumnFamilyDescriptor> column_families_;
std::atomic<int> num_pending_file_creations_;
UniqueIdVerifier unique_ids_;
SharedState* shared_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
13 changes: 13 additions & 0 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class SharedState {
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint32_t>");
}
if (!std::atomic<uint64_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint64_t>");
}
if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on when "
Expand Down Expand Up @@ -255,6 +260,14 @@ class SharedState {
return expected_state_manager_->ClearColumnFamily(cf);
}

void SetPersistSeqno(SequenceNumber seqno) {
return expected_state_manager_->SetPersistSeqno(seqno);
}

SequenceNumber GetPersistSeqno() {
return expected_state_manager_->GetPersistSeqno();
}

// Prepare a Put that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down
13 changes: 11 additions & 2 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3408,8 +3408,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}

options_.listeners.clear();
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors,
db_stress_listener_env, shared));
RegisterAdditionalListeners();

// If this is for DB reopen, error injection may have been enabled.
Expand Down Expand Up @@ -3658,6 +3659,14 @@ void StressTest::Open(SharedState* shared, bool reopen) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}

if (shared->GetPersistSeqno() > db_->GetLatestSequenceNumber()) {
fprintf(stderr,
"DB of latest sequence number %ld did not recover to the persisted "
"sequence number %ld from last DB session\n",
db_->GetLatestSequenceNumber(), shared->GetPersistSeqno());
exit(1);
}
}

void StressTest::Reopen(ThreadState* thread) {
Expand Down
82 changes: 59 additions & 23 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
}
}

FileExpectedState::FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families)
FileExpectedState::FileExpectedState(
std::string expected_state_file_path,
std::string expected_persist_seqno_file_path, size_t max_key,
size_t num_column_families)
: ExpectedState(max_key, num_column_families),
expected_state_file_path_(expected_state_file_path) {}
expected_state_file_path_(expected_state_file_path),
expected_persist_seqno_file_path_(expected_persist_seqno_file_path) {}

Status FileExpectedState::Open(bool create) {
size_t expected_values_size = GetValuesLen();
Expand All @@ -136,29 +139,42 @@ Status FileExpectedState::Open(bool create) {

Status status;
if (create) {
std::unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
status = default_env->NewWritableFile(expected_state_file_path_, &wfile,
soptions);
status = CreateFile(default_env, EnvOptions(), expected_state_file_path_,
std::string(expected_values_size, '\0'));
if (status.ok()) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
status = CreateFile(
default_env, EnvOptions(), expected_persist_seqno_file_path_,
std::string(sizeof(std::atomic<SequenceNumber>), '\0'));
}
}
if (status.ok()) {
status = default_env->NewMemoryMappedFileBuffer(
expected_state_file_path_, &expected_state_mmap_buffer_);
status =
MemoryMappedFile(default_env, expected_state_file_path_,
expected_state_mmap_buffer_, expected_values_size);
if (status.ok()) {
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
if (create) {
Reset();
}
} else {
assert(values_ == nullptr);
}
}

if (status.ok()) {
assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
status = MemoryMappedFile(default_env, expected_persist_seqno_file_path_,
expected_persist_seqno_mmap_buffer_,
sizeof(std::atomic<SequenceNumber>));
if (status.ok()) {
persist_seqno_ = static_cast<std::atomic<SequenceNumber>*>(
expected_persist_seqno_mmap_buffer_->GetBase());
if (create) {
persist_seqno_->store(0, std::memory_order_relaxed);
}
} else {
assert(persist_seqno_ == nullptr);
}
} else {
assert(values_ == nullptr);
}
return status;
}
Expand Down Expand Up @@ -192,6 +208,9 @@ ExpectedStateManager::~ExpectedStateManager() = default;
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
const std::string FileExpectedStateManager::kPersistSeqnoBasename = "PERSIST";
const std::string FileExpectedStateManager::kPersistSeqnoFilenameSuffix =
".seqno";
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";

Expand Down Expand Up @@ -256,12 +275,15 @@ Status FileExpectedStateManager::Open() {

std::string expected_state_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string expected_persist_seqno_file_path =
GetPathForFilename(kPersistSeqnoBasename + kPersistSeqnoFilenameSuffix);
bool found = false;
if (s.ok()) {
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
if (exists_status.ok()) {
found = true;
} else if (exists_status.IsNotFound()) {
assert(Env::Default()->FileExists(expected_state_file_path).IsNotFound());
found = false;
} else {
s = exists_status;
Expand All @@ -274,7 +296,10 @@ Status FileExpectedStateManager::Open() {
// the incomplete expected values file.
std::string temp_expected_state_file_path =
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string temp_expected_persist_seqno_file_path = GetTempPathForFilename(
kPersistSeqnoBasename + kPersistSeqnoFilenameSuffix);
FileExpectedState temp_expected_state(temp_expected_state_file_path,
temp_expected_persist_seqno_file_path,
max_key_, num_column_families_);
if (s.ok()) {
s = temp_expected_state.Open(true /* create */);
Expand All @@ -283,11 +308,17 @@ Status FileExpectedStateManager::Open() {
s = Env::Default()->RenameFile(temp_expected_state_file_path,
expected_state_file_path);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_persist_seqno_file_path,
expected_persist_seqno_file_path);
}
}

if (s.ok()) {
latest_.reset(new FileExpectedState(std::move(expected_state_file_path),
max_key_, num_column_families_));
latest_.reset(
new FileExpectedState(std::move(expected_state_file_path),
std::move(expected_persist_seqno_file_path),
max_key_, num_column_families_));
s = latest_->Open(false /* create */);
}
return s;
Expand Down Expand Up @@ -652,6 +683,9 @@ Status FileExpectedStateManager::Restore(DB* db) {
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);

std::string persist_seqno_file_path =
GetPathForFilename(kPersistSeqnoBasename + kPersistSeqnoFilenameSuffix);

if (s.ok()) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
Expand All @@ -666,7 +700,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
std::unique_ptr<ExpectedState> state;
std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
if (s.ok()) {
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
state.reset(new FileExpectedState(latest_file_temp_path,
persist_seqno_file_path, max_key_,
num_column_families_));
s = state->Open(false /* create */);
}
Expand Down Expand Up @@ -712,7 +747,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
nullptr /* dbg */);
}
if (s.ok()) {
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
latest_.reset(new FileExpectedState(latest_file_path,
persist_seqno_file_path, max_key_,
num_column_families_));
s = latest_->Open(false /* create */);
}
Expand Down
45 changes: 45 additions & 0 deletions db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class ExpectedState {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf);

// Requires external locking
void SetPersistSeqno(SequenceNumber seqno) {
persist_seqno_->store(seqno, std::memory_order_relaxed);
}

// Requires external locking
SequenceNumber GetPersistSeqno() {
return persist_seqno_->load(std::memory_order_relaxed);
}

// Prepare a Put that will be started but not finished yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -124,21 +134,48 @@ class ExpectedState {
void Reset();

std::atomic<uint32_t>* values_;
std::atomic<SequenceNumber>* persist_seqno_;
};

// A `FileExpectedState` implements `ExpectedState` backed by a file.
class FileExpectedState : public ExpectedState {
public:
explicit FileExpectedState(std::string expected_state_file_path,
std::string expected_persist_seqno_file_path,
size_t max_key, size_t num_column_families);

// Requires external locking preventing concurrent execution with any other
// member function.
Status Open(bool create) override;

private:
static Status CreateFile(Env* env, const EnvOptions& options,
const std::string& file_path,
const std::string& content) {
std::unique_ptr<WritableFile> wfile;
Status status = env->NewWritableFile(file_path, &wfile, options);
if (status.ok()) {
status = wfile->Append(content);
}
return status;
}

static Status MemoryMappedFile(
Env* env, const std::string& file_path,
std::unique_ptr<MemoryMappedFileBuffer>& memory_mapped_file_buffer,
std::size_t size) {
Status status =
env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer);
if (status.ok()) {
assert(memory_mapped_file_buffer->GetLen() == size);
}
return status;
}

const std::string expected_state_file_path_;
const std::string expected_persist_seqno_file_path_;
std::unique_ptr<MemoryMappedFileBuffer> expected_state_mmap_buffer_;
std::unique_ptr<MemoryMappedFileBuffer> expected_persist_seqno_mmap_buffer_;
};

// An `AnonExpectedState` implements `ExpectedState` backed by a memory
Expand Down Expand Up @@ -196,6 +233,12 @@ class ExpectedStateManager {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }

void SetPersistSeqno(SequenceNumber seqno) {
return latest_->SetPersistSeqno(seqno);
}

SequenceNumber GetPersistSeqno() { return latest_->GetPersistSeqno(); }

// See ExpectedState::PreparePut()
PendingExpectedValue PreparePut(int cf, int64_t key) {
return latest_->PreparePut(cf, key);
Expand Down Expand Up @@ -290,6 +333,8 @@ class FileExpectedStateManager : public ExpectedStateManager {
static const std::string kLatestBasename;
static const std::string kStateFilenameSuffix;
static const std::string kTraceFilenameSuffix;
static const std::string kPersistSeqnoBasename;
static const std::string kPersistSeqnoFilenameSuffix;
static const std::string kTempFilenamePrefix;
static const std::string kTempFilenameSuffix;

Expand Down

0 comments on commit 6e5f737

Please sign in to comment.