Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify flushed data are recovered upon reopen in crash test #12787

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ class DbStressListener : public EventListener {
DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families,
Env* env)
Env* env, SharedState* shared)
: db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0),
unique_ids_(db_name, env) {}
unique_ids_(db_name, env),
shared_(shared) {}

const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DBStressListener"; }
Expand All @@ -74,6 +75,7 @@ class DbStressListener : public EventListener {
if (fault_fs_guard) {
fault_fs_guard->DisableAllThreadLocalErrorInjection();
}
shared_->SetPersistedSeqno(info.largest_seqno);
archang19 marked this conversation as resolved.
Show resolved Hide resolved
}

void OnFlushBegin(DB* /*db*/,
Expand Down Expand Up @@ -358,6 +360,7 @@ class DbStressListener : public EventListener {
std::vector<ColumnFamilyDescriptor> column_families_;
std::atomic<int> num_pending_file_creations_;
UniqueIdVerifier unique_ids_;
SharedState* shared_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
25 changes: 21 additions & 4 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ class SharedState {
// expected state. Only then should we permit bypassing the below feature
// compatibility checks.
if (!FLAGS_expected_values_dir.empty()) {
if (!std::atomic<uint32_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint32_t>");
if (!std::atomic<uint32_t>{}.is_lock_free() ||
!std::atomic<uint64_t>{}.is_lock_free()) {
std::ostringstream status_s;
status_s << "Cannot use --expected_values_dir on platforms without "
"lock-free "
<< (!std::atomic<uint32_t>{}.is_lock_free()
? "std::atomic<uint32_t>"
: "std::atomic<uint64_t>");
status = Status::InvalidArgument(status_s.str());
}

if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on when "
Expand Down Expand Up @@ -260,6 +266,16 @@ class SharedState {
return expected_state_manager_->ClearColumnFamily(cf);
}

void SetPersistedSeqno(SequenceNumber seqno) {
hx235 marked this conversation as resolved.
Show resolved Hide resolved
MutexLock l(&persist_seqno_mu_);
return expected_state_manager_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() {
MutexLock l(&persist_seqno_mu_);
return expected_state_manager_->GetPersistedSeqno();
}

archang19 marked this conversation as resolved.
Show resolved Hide resolved
// Prepare a Put that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -396,6 +412,7 @@ class SharedState {

port::Mutex mu_;
port::CondVar cv_;
port::Mutex persist_seqno_mu_;
const uint32_t seed_;
const int64_t max_key_;
const uint32_t log2_keys_per_lock_;
Expand Down
14 changes: 12 additions & 2 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3472,8 +3472,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}

options_.listeners.clear();
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors,
db_stress_listener_env, shared));
RegisterAdditionalListeners();

// If this is for DB reopen, error injection may have been enabled.
Expand Down Expand Up @@ -3714,6 +3715,15 @@ void StressTest::Open(SharedState* shared, bool reopen) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}

if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) {
archang19 marked this conversation as resolved.
Show resolved Hide resolved
fprintf(stderr,
"DB of latest sequence number %" PRIu64
"did not recover to the persisted "
"sequence number %" PRIu64 " from last DB session\n",
db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno());
exit(1);
}
}

void StressTest::Reopen(ThreadState* thread) {
Expand Down
112 changes: 80 additions & 32 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
}
}

FileExpectedState::FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families)
FileExpectedState::FileExpectedState(
const std::string& expected_state_file_path,
const std::string& expected_persisted_seqno_file_path, size_t max_key,
size_t num_column_families)
: ExpectedState(max_key, num_column_families),
expected_state_file_path_(expected_state_file_path) {}
expected_state_file_path_(expected_state_file_path),
expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {}

Status FileExpectedState::Open(bool create) {
size_t expected_values_size = GetValuesLen();
Expand All @@ -144,30 +147,53 @@ Status FileExpectedState::Open(bool create) {

Status status;
if (create) {
std::unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
status = default_env->NewWritableFile(expected_state_file_path_, &wfile,
soptions);
if (status.ok()) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
}
}
if (status.ok()) {
status = default_env->NewMemoryMappedFileBuffer(
expected_state_file_path_, &expected_state_mmap_buffer_);
}
if (status.ok()) {
assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
}
} else {
status = CreateFile(default_env, EnvOptions(), expected_state_file_path_,
archang19 marked this conversation as resolved.
Show resolved Hide resolved
std::string(expected_values_size, '\0'));
if (!status.ok()) {
return status;
}

status = CreateFile(default_env, EnvOptions(),
expected_persisted_seqno_file_path_,
std::string(sizeof(std::atomic<SequenceNumber>), '\0'));

if (!status.ok()) {
return status;
}
}

status = MemoryMappedFile(default_env, expected_state_file_path_,
expected_state_mmap_buffer_, expected_values_size);
if (!status.ok()) {
assert(values_ == nullptr);
return status;
}

values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
}

// TODO(hx235): Find a way to mmap persisted seqno and expected state into the
// same LATEST file so we can obselete the logic to handle this extra file for
// persisted seqno
status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_,
expected_persisted_seqno_mmap_buffer_,
sizeof(std::atomic<SequenceNumber>));
if (!status.ok()) {
assert(persisted_seqno_ == nullptr);
return status;
}

persisted_seqno_ = static_cast<std::atomic<SequenceNumber>*>(
expected_persisted_seqno_mmap_buffer_->GetBase());
assert(persisted_seqno_ != nullptr);
if (create) {
persisted_seqno_->store(0, std::memory_order_relaxed);
}

return status;
}

Expand Down Expand Up @@ -200,6 +226,9 @@ ExpectedStateManager::~ExpectedStateManager() = default;
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST";
const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix =
".seqno";
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";

Expand Down Expand Up @@ -264,13 +293,17 @@ Status FileExpectedStateManager::Open() {

std::string expected_state_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string expected_persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);
bool found = false;
if (s.ok()) {
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
if (exists_status.ok()) {
found = true;
} else if (exists_status.IsNotFound()) {
found = false;
assert(Env::Default()
archang19 marked this conversation as resolved.
Show resolved Hide resolved
->FileExists(expected_persisted_seqno_file_path)
.IsNotFound());
} else {
s = exists_status;
}
Expand All @@ -282,20 +315,30 @@ Status FileExpectedStateManager::Open() {
// the incomplete expected values file.
std::string temp_expected_state_file_path =
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
FileExpectedState temp_expected_state(temp_expected_state_file_path,
max_key_, num_column_families_);
std::string temp_expected_persisted_seqno_file_path =
GetTempPathForFilename(kPersistedSeqnoBasename +
kPersistedSeqnoFilenameSuffix);
FileExpectedState temp_expected_state(
temp_expected_state_file_path, temp_expected_persisted_seqno_file_path,
max_key_, num_column_families_);
if (s.ok()) {
s = temp_expected_state.Open(true /* create */);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_state_file_path,
expected_state_file_path);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path,
expected_persisted_seqno_file_path);
}
}

if (s.ok()) {
latest_.reset(new FileExpectedState(std::move(expected_state_file_path),
max_key_, num_column_families_));
latest_.reset(
new FileExpectedState(std::move(expected_state_file_path),
std::move(expected_persisted_seqno_file_path),
max_key_, num_column_families_));
s = latest_->Open(false /* create */);
}
return s;
Expand Down Expand Up @@ -660,6 +703,9 @@ Status FileExpectedStateManager::Restore(DB* db) {
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);

std::string persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);

if (s.ok()) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
Expand All @@ -674,7 +720,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
std::unique_ptr<ExpectedState> state;
std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
if (s.ok()) {
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
state.reset(new FileExpectedState(latest_file_temp_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = state->Open(false /* create */);
}
Expand Down Expand Up @@ -720,7 +767,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
nullptr /* dbg */);
}
if (s.ok()) {
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
latest_.reset(new FileExpectedState(latest_file_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = latest_->Open(false /* create */);
}
Expand Down
53 changes: 51 additions & 2 deletions db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class ExpectedState {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf);

// Requires external locking
void SetPersistedSeqno(SequenceNumber seqno) {
persisted_seqno_->store(
std::max(persisted_seqno_->load(std::memory_order_relaxed), seqno),
std::memory_order_relaxed);
}

// Requires external locking
SequenceNumber GetPersistedSeqno() {
return persisted_seqno_->load(std::memory_order_relaxed);
}

// Prepare a Put that will be started but not finished yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -123,21 +135,50 @@ class ExpectedState {
void Reset();

std::atomic<uint32_t>* values_;
std::atomic<SequenceNumber>* persisted_seqno_;
};

// A `FileExpectedState` implements `ExpectedState` backed by a file.
class FileExpectedState : public ExpectedState {
public:
explicit FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families);
explicit FileExpectedState(
const std::string& expected_state_file_path,
const std::string& expected_persisted_seqno_file_path, size_t max_key,
size_t num_column_families);

// Requires external locking preventing concurrent execution with any other
// member function.
Status Open(bool create) override;

private:
static Status CreateFile(Env* env, const EnvOptions& options,
const std::string& file_path,
const std::string& content) {
std::unique_ptr<WritableFile> wfile;
Status status = env->NewWritableFile(file_path, &wfile, options);
if (status.ok()) {
status = wfile->Append(content);
}
return status;
}

static Status MemoryMappedFile(
Env* env, const std::string& file_path,
std::unique_ptr<MemoryMappedFileBuffer>& memory_mapped_file_buffer,
std::size_t size) {
Status status =
env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer);
if (status.ok()) {
assert(memory_mapped_file_buffer->GetLen() == size);
}
(void)size;
return status;
}

const std::string expected_state_file_path_;
const std::string expected_persisted_seqno_file_path_;
std::unique_ptr<MemoryMappedFileBuffer> expected_state_mmap_buffer_;
std::unique_ptr<MemoryMappedFileBuffer> expected_persisted_seqno_mmap_buffer_;
};

// An `AnonExpectedState` implements `ExpectedState` backed by a memory
Expand Down Expand Up @@ -195,6 +236,12 @@ class ExpectedStateManager {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }

void SetPersistedSeqno(SequenceNumber seqno) {
return latest_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() { return latest_->GetPersistedSeqno(); }

// See ExpectedState::PreparePut()
PendingExpectedValue PreparePut(int cf, int64_t key) {
return latest_->PreparePut(cf, key);
Expand Down Expand Up @@ -289,6 +336,8 @@ class FileExpectedStateManager : public ExpectedStateManager {
static const std::string kLatestBasename;
static const std::string kStateFilenameSuffix;
static const std::string kTraceFilenameSuffix;
static const std::string kPersistedSeqnoBasename;
static const std::string kPersistedSeqnoFilenameSuffix;
static const std::string kTempFilenamePrefix;
static const std::string kTempFilenameSuffix;

Expand Down
Loading