Skip to content

Commit

Permalink
Merge branch 'main' into verify_flushed_data_recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 authored Jun 26, 2024
2 parents 4fabe2b + 0d93c8a commit 800b609
Show file tree
Hide file tree
Showing 16 changed files with 304 additions and 266 deletions.
45 changes: 45 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,42 @@ struct rocksdb_universal_compaction_options_t {
ROCKSDB_NAMESPACE::CompactionOptionsUniversal* rep;
};

struct rocksdb_callback_logger_t : public Logger {
static const ssize_t STACK_BUFSZ = 512;
rocksdb_callback_logger_t(InfoLogLevel log_level,
void (*logv_cb)(void*, unsigned, char*, size_t),
void* priv)
: Logger(log_level), logv_cb_(logv_cb), priv_(priv) {}

using Logger::Logv;
void Logv(const InfoLogLevel level, const char* fmt, va_list ap0) override {
char stack_buf[STACK_BUFSZ];
char* alloc_buf = nullptr;
char* buf = stack_buf;
int len = 0;
va_list ap1;
if (!logv_cb_) return;
va_copy(ap1, ap0);
len = vsnprintf(buf, STACK_BUFSZ, fmt, ap0);
if (len <= 0)
goto cleanup;
else if (len >= STACK_BUFSZ) {
buf = alloc_buf = reinterpret_cast<char*>(malloc(len + 1));
if (!buf) goto cleanup;
len = vsnprintf(buf, len + 1, fmt, ap1);
if (len <= 0) goto cleanup;
}
logv_cb_(priv_, unsigned(level), buf, size_t(len));
cleanup:
va_end(ap1);
free(alloc_buf);
}

private:
void (*logv_cb_)(void*, unsigned, char*, size_t) = nullptr;
void* priv_ = nullptr;
};

static bool SaveError(char** errptr, const Status& s) {
assert(errptr != nullptr);
if (s.ok()) {
Expand Down Expand Up @@ -3037,6 +3073,15 @@ rocksdb_logger_t* rocksdb_logger_create_stderr_logger(int log_level,
return logger;
}

rocksdb_logger_t* rocksdb_logger_create_callback_logger(
int log_level, void (*callback)(void*, unsigned, char*, size_t),
void* priv) {
rocksdb_logger_t* logger = new rocksdb_logger_t;
logger->rep = std::make_shared<rocksdb_callback_logger_t>(
static_cast<InfoLogLevel>(log_level), callback, priv);
return logger;
}

void rocksdb_logger_destroy(rocksdb_logger_t* logger) { delete logger; }

void rocksdb_options_set_env(rocksdb_options_t* opt, rocksdb_env_t* env) {
Expand Down
60 changes: 18 additions & 42 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,13 @@ class CfConsistencyStressTest : public StressTest {
status = db_->Write(write_opts, &batch);
}

if (IsRetryableInjectedError(status)) {
fprintf(stdout, "multi put or merge error: %s\n",
status.ToString().c_str());
} else if (!status.ok()) {
if (status.ok()) {
auto num = static_cast<long>(rand_column_families.size());
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
} else if (!IsErrorInjectedAndRetryable(status)) {
fprintf(stderr, "multi put or merge error: %s\n",
status.ToString().c_str());
thread->stats.AddErrors(1);
} else {
auto num = static_cast<long>(rand_column_families.size());
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
}

return status;
Expand All @@ -99,13 +96,11 @@ class CfConsistencyStressTest : public StressTest {
batch.Delete(cfh, key);
}
Status s = db_->Write(write_opts, &batch);
if (IsRetryableInjectedError(s)) {
fprintf(stdout, "multidel error: %s\n", s.ToString().c_str());
} else if (!s.ok()) {
if (s.ok()) {
thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
} else if (!IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
}
return s;
}
Expand All @@ -130,14 +125,12 @@ class CfConsistencyStressTest : public StressTest {
batch.DeleteRange(cfh, key, end_key);
}
Status s = db_->Write(write_opts, &batch);
if (IsRetryableInjectedError(s)) {
fprintf(stdout, "multi del range error: %s\n", s.ToString().c_str());
} else if (!s.ok()) {
fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
if (s.ok()) {
thread->stats.AddRangeDeletions(
static_cast<long>(rand_column_families.size()));
} else if (!IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
return s;
}
Expand Down Expand Up @@ -248,9 +241,7 @@ class CfConsistencyStressTest : public StressTest {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else if (IsRetryableInjectedError(s)) {
fprintf(stdout, "TestGet error: %s\n", s.ToString().c_str());
} else {
} else if (!IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
Expand Down Expand Up @@ -286,9 +277,7 @@ class CfConsistencyStressTest : public StressTest {
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else if (IsRetryableInjectedError(s)) {
fprintf(stdout, "MultiGet error: %s\n", s.ToString().c_str());
} else {
} else if (!IsErrorInjectedAndRetryable(s)) {
// errors case
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
Expand Down Expand Up @@ -324,12 +313,7 @@ class CfConsistencyStressTest : public StressTest {
PinnableWideColumns result;
s = db_->GetEntity(read_opts, cfh, key, &result);

if (IsRetryableInjectedError(s)) {
fprintf(stdout,
"GetEntity error: inconsistent columns for key %s, entity %s\n",
StringToHex(key).c_str(),
WideColumnsToHex(result.columns()).c_str());
} else if (s.ok()) {
if (s.ok()) {
if (!VerifyWideColumns(result.columns())) {
fprintf(
stderr,
Expand Down Expand Up @@ -517,9 +501,7 @@ class CfConsistencyStressTest : public StressTest {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else if (IsRetryableInjectedError(s)) {
fprintf(stdout, "TestGetEntity error: %s\n", s.ToString().c_str());
} else {
} else if (!IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "TestGetEntity error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
Expand Down Expand Up @@ -590,9 +572,7 @@ class CfConsistencyStressTest : public StressTest {
for (size_t j = 0; j < num_cfs; ++j) {
const Status& s = result[j].status();
const WideColumns& columns = result[j].columns();
if (IsRetryableInjectedError(s)) {
fprintf(stdout, "TestMultiGetEntity (AttributeGroup) error: %s\n",
s.ToString().c_str());
if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
break;
} else if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "TestMultiGetEntity (AttributeGroup) error: %s\n",
Expand Down Expand Up @@ -699,9 +679,7 @@ class CfConsistencyStressTest : public StressTest {
const Status& s = statuses[j];
const WideColumns& columns = results[j].columns();

if (IsRetryableInjectedError(s)) {
fprintf(stdout, "TestMultiGetEntity error: %s\n",
s.ToString().c_str());
if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
break;
} else if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "TestMultiGetEntity error: %s\n",
Expand Down Expand Up @@ -838,9 +816,7 @@ class CfConsistencyStressTest : public StressTest {
s = iter->status();
}

if (IsRetryableInjectedError(s)) {
fprintf(stdout, "TestPrefixScan error: %s\n", s.ToString().c_str());
} else if (!s.ok()) {
if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);

Expand Down
10 changes: 5 additions & 5 deletions db_stress_tool/db_stress_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ bool RunStressTestImpl(SharedState* shared) {
stress->TrackExpectedState(shared);
}

// Since wrie fault and sync fault implementations are coupled with each
// other in `TestFSWritableFile()`, we can not enable or disable only one
// of the two.
// TODO(hx235): decouple implementations of write fault injection and sync
// fault injection.
if (FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0) {
fault_fs_guard->SetFilesystemDirectWritable(false);
fault_fs_guard->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection);
if (FLAGS_exclude_wal_from_write_fault_injection) {
fault_fs_guard->SetFileTypesExcludedFromWriteFaultInjection(
{FileType::kWalFile});
}
}
now = clock->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
Expand Down
3 changes: 3 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,9 @@ DEFINE_int32(write_fault_one_in, 0,
"On non-zero, enables fault injection on write. Currently only"
"injects write error when writing to SST files.");

DEFINE_bool(exclude_wal_from_write_fault_injection, false,
"If true, we won't inject write fault when writing to WAL file");

DEFINE_int32(metadata_write_fault_one_in, 1000,
"On non-zero, enables fault injection on metadata write (i.e, "
"directory and file metadata write)");
Expand Down
4 changes: 2 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class DbStressListener : public EventListener {
FaultInjectionIOType::kMetadataWrite);
// TODO(hx235): only exempt the flush thread during error recovery instead
// of all the flush threads from error injection
fault_fs_guard->SetIOActivtiesExemptedFromFaultInjection(
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection(
{Env::IOActivity::kFlush});
}
}
Expand All @@ -302,7 +302,7 @@ class DbStressListener : public EventListener {
FaultInjectionIOType::kMetadataRead);
fault_fs_guard->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataWrite);
fault_fs_guard->SetIOActivtiesExemptedFromFaultInjection({});
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection({});
}
}

Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DECLARE_int32(metadata_read_fault_one_in);
DECLARE_int32(metadata_write_fault_one_in);
DECLARE_int32(read_fault_one_in);
DECLARE_int32(write_fault_one_in);
DECLARE_bool(exclude_wal_from_write_fault_injection);
DECLARE_int32(open_metadata_read_fault_one_in);
DECLARE_int32(open_metadata_write_fault_one_in);
DECLARE_int32(open_write_fault_one_in);
Expand Down
Loading

0 comments on commit 800b609

Please sign in to comment.