Skip to content

Commit

Permalink
Enable commit_bypass_memtable in no_batched_ops_stress tests (#13203)
Browse files Browse the repository at this point in the history
Summary:
expand the test coverage to the more comprehensive no_batched_ops_stress. Small refactoring in db_crashtest.py.

Pull Request resolved: #13203

Test Plan: ran a couple stress test jobs internally: https://fburl.com/sandcastle/nohosh7i

Reviewed By: jowlyzhang

Differential Revision: D67057497

Pulled By: cbi42

fbshipit-source-id: eccc033f3ae3dbd20729cd8f1f8f8d8b7c2cd057
  • Loading branch information
cbi42 authored and facebook-github-bot committed Dec 13, 2024
1 parent 85d8ee7 commit 7de9c0f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 33 deletions.
1 change: 0 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,6 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
if (!status.ok()) {
mutex_.Lock();
assert(!error_handler_.IsBGWorkStopped());
// Maybe change the return status to void?
error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
mutex_.Unlock();
}
Expand Down
15 changes: 10 additions & 5 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,8 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
}

Status StressTest::NewTxn(WriteOptions& write_opts, ThreadState* thread,
std::unique_ptr<Transaction>* out_txn) {
std::unique_ptr<Transaction>* out_txn,
bool* commit_bypass_memtable) {
if (!FLAGS_use_txn) {
return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
}
Expand All @@ -826,6 +827,9 @@ Status StressTest::NewTxn(WriteOptions& write_opts, ThreadState* thread,
assert(FLAGS_user_timestamp_size == 0);
txn_options.commit_bypass_memtable =
thread->rand.OneIn(FLAGS_commit_bypass_memtable_one_in);
if (commit_bypass_memtable) {
*commit_bypass_memtable = txn_options.commit_bypass_memtable;
}
}
out_txn->reset(txn_db_->BeginTransaction(write_opts, txn_options));
auto istr = std::to_string(txn_id.fetch_add(1));
Expand Down Expand Up @@ -882,11 +886,12 @@ Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) {
return s;
}

Status StressTest::ExecuteTransaction(
WriteOptions& write_opts, ThreadState* thread,
std::function<Status(Transaction&)>&& ops) {
Status StressTest::ExecuteTransaction(WriteOptions& write_opts,
ThreadState* thread,
std::function<Status(Transaction&)>&& ops,
bool* commit_bypass_memtable) {
std::unique_ptr<Transaction> txn;
Status s = NewTxn(write_opts, thread, &txn);
Status s = NewTxn(write_opts, thread, &txn, commit_bypass_memtable);
std::string try_again_messages;
if (s.ok()) {
for (int tries = 1;; ++tries) {
Expand Down
17 changes: 14 additions & 3 deletions db_stress_tool/db_stress_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,19 @@ class StressTest {
void UpdateIfInitialWriteFails(Env* db_stress_env, const Status& write_s,
Status* initial_write_s,
bool* initial_wal_write_may_succeed,
uint64_t* wait_for_recover_start_time) {
uint64_t* wait_for_recover_start_time,
bool commit_bypass_memtable = false) {
assert(db_stress_env && initial_write_s && initial_wal_write_may_succeed &&
wait_for_recover_start_time);
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when the
// first write fails
if (!write_s.ok() && (*initial_write_s).ok()) {
*initial_write_s = write_s;
// With commit_bypass_memtable, we create a new WAL after WAL write
// succeeds, that wal creation may fail due to injected error. So the
// initial wal write may succeed even if status is failed to write to wal
*initial_wal_write_may_succeed =
commit_bypass_memtable ||
!FaultInjectionTestFS::IsFailedToWriteToWALError(*initial_write_s);
*wait_for_recover_start_time = db_stress_env->NowMicros();
}
Expand Down Expand Up @@ -138,13 +143,19 @@ class StressTest {
SharedState* shared);

// ExecuteTransaction is recommended instead
// @param commit_bypass_memtable Whether commit_bypass_memtable is set to
// true in transaction options.
Status NewTxn(WriteOptions& write_opts, ThreadState* thread,
std::unique_ptr<Transaction>* out_txn);
std::unique_ptr<Transaction>* out_txn,
bool* commit_bypass_memtable = nullptr);
Status CommitTxn(Transaction& txn, ThreadState* thread = nullptr);

// Creates a transaction, executes `ops`, and tries to commit
// @param commit_bypass_memtable Whether commit_bypass_memtable is set to
// true in transaction options.
Status ExecuteTransaction(WriteOptions& write_opts, ThreadState* thread,
std::function<Status(Transaction&)>&& ops);
std::function<Status(Transaction&)>&& ops,
bool* commit_bypass_memtable = nullptr);

virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}

Expand Down
37 changes: 21 additions & 16 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ class NonBatchedOpsStressTest : public StressTest {
// To track whether WAL write may have succeeded during the initial failed
// write
bool initial_wal_write_may_succeed = true;
bool commit_bypass_memtable = false;

PendingExpectedValue pending_expected_value =
shared->PreparePut(rand_column_family, rand_key);
Expand Down Expand Up @@ -1713,9 +1714,10 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->Put(write_opts, cfh, k, write_ts, v);
}
} else {
s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
return txn.Put(cfh, k, v);
});
s = ExecuteTransaction(
write_opts, thread,
[&](Transaction& txn) { return txn.Put(cfh, k, v); },
&commit_bypass_memtable);
}
}
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
Expand Down Expand Up @@ -1779,6 +1781,7 @@ class NonBatchedOpsStressTest : public StressTest {
// To track whether WAL write may have succeeded during the initial failed
// write
bool initial_wal_write_may_succeed = true;
bool commit_bypass_memtable = false;

// Use delete if the key may be overwritten and a single deletion
// otherwise.
Expand All @@ -1803,13 +1806,14 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->Delete(write_opts, cfh, key, write_ts);
}
} else {
s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
return txn.Delete(cfh, key);
});
s = ExecuteTransaction(
write_opts, thread,
[&](Transaction& txn) { return txn.Delete(cfh, key); },
&commit_bypass_memtable);
}
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
&initial_wal_write_may_succeed,
&wait_for_recover_start_time);
UpdateIfInitialWriteFails(
db_stress_env, s, &initial_write_s, &initial_wal_write_may_succeed,
&wait_for_recover_start_time, commit_bypass_memtable);
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
initial_wal_write_may_succeed);

Expand Down Expand Up @@ -1859,13 +1863,14 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->SingleDelete(write_opts, cfh, key, write_ts);
}
} else {
s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
return txn.SingleDelete(cfh, key);
});
s = ExecuteTransaction(
write_opts, thread,
[&](Transaction& txn) { return txn.SingleDelete(cfh, key); },
&commit_bypass_memtable);
}
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
&initial_wal_write_may_succeed,
&wait_for_recover_start_time);
UpdateIfInitialWriteFails(
db_stress_env, s, &initial_write_s, &initial_wal_write_may_succeed,
&wait_for_recover_start_time, commit_bypass_memtable);
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
initial_wal_write_may_succeed);

Expand Down Expand Up @@ -2875,7 +2880,7 @@ class NonBatchedOpsStressTest : public StressTest {
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const Slice v(value, sz);

if (op == Op::PutOrPutEntity) {
if (op == Op::PutOrPutEntity || !FLAGS_use_merge) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
s = txn->PutEntity(cfh, k, GenerateWideColumns(value_base, v));
Expand Down
25 changes: 17 additions & 8 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ def is_direct_io_supported(dbname):
"inplace_update_support": 0,
# TimedPut is not supported in transaction
"use_timed_put_one_in": 0,
# txn commit with this option will create a new memtable, keep the
# frequency low to reduce stalls
"commit_bypass_memtable_one_in": random.choice([0] * 2 + [500, 1000]),
}

# For optimistic transaction db
Expand Down Expand Up @@ -786,12 +789,20 @@ def finalize_and_sanitize(src_params):
# files, which would be problematic when unsynced data can be lost in
# crash recoveries.
dest_params["enable_compaction_filter"] = 0
# Remove the following once write-prepared/write-unprepared with/without
# unordered write supports timestamped snapshots
if dest_params.get("create_timestamped_snapshot_one_in", 0) > 0:
dest_params["txn_write_policy"] = 0
dest_params["unordered_write"] = 0
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
# unordered_write is only enabled with --txn, and txn_params disables inplace_update_support, so
# setting allow_concurrent_memtable_write=1 won't conflcit with inplace_update_support.
# don't overwrite txn_write_policy
if dest_params.get("unordered_write", 0) == 1:
dest_params["txn_write_policy"] = 1
dest_params["allow_concurrent_memtable_write"] = 1
if dest_params.get("txn_write_policy", 0) == 1:
dest_params["allow_concurrent_memtable_write"] = 1
else:
dest_params["unordered_write"] = 0
if dest_params.get("disable_wal", 0) == 1:
dest_params["atomic_flush"] = 1
dest_params["sync"] = 0
Expand Down Expand Up @@ -837,11 +848,6 @@ def finalize_and_sanitize(src_params):
dest_params["write_fault_one_in"] = 0
dest_params["skip_verifydb"] = 1
dest_params["verify_db_one_in"] = 0
# Remove the following once write-prepared/write-unprepared with/without
# unordered write supports timestamped snapshots
if dest_params.get("create_timestamped_snapshot_one_in", 0) > 0:
dest_params["txn_write_policy"] = 0
dest_params["unordered_write"] = 0
# For TransactionDB, correctness testing with unsync data loss is currently
# compatible with only write committed policy
if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy", 0) != 0:
Expand All @@ -853,6 +859,8 @@ def finalize_and_sanitize(src_params):
dest_params["use_put_entity_one_in"] = 0
# MultiCfIterator is currently only compatible with write committed policy
dest_params["use_multi_cf_iterator"] = 0
# only works with write committed policy
dest_params["commit_bypass_memtable_one_in"] = 0
# TODO(hx235): enable test_multi_ops_txns with fault injection after stabilizing the CI
if dest_params.get("test_multi_ops_txns") == 1:
dest_params["write_fault_one_in"] = 0
Expand Down Expand Up @@ -989,7 +997,7 @@ def finalize_and_sanitize(src_params):
or dest_params.get("delrangepercent") == 0
):
dest_params["test_ingest_standalone_range_deletion_one_in"] = 0
if dest_params.get("commit_bypass_memtable_one_in", 0) > 0:
if dest_params.get("use_txn", 0) == 1 and dest_params.get("commit_bypass_memtable_one_in", 0) > 0:
dest_params["enable_blob_files"] = 0
dest_params["allow_setting_blob_options_dynamically"] = 0
dest_params["atomic_flush"] = 0
Expand All @@ -1000,6 +1008,7 @@ def finalize_and_sanitize(src_params):
dest_params["use_merge"] = 0
dest_params["use_full_merge_v1"] = 0
dest_params["enable_pipelined_write"] = 0
dest_params["use_attribute_group"] = 0

return dest_params

Expand Down

0 comments on commit 7de9c0f

Please sign in to comment.