Skip to content

Commit

Permalink
[Improve](txn) Add some fuzzy test stub in txn (#26712)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Nov 16, 2023
1 parent 624d372 commit 7e82e76
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
13 changes: 13 additions & 0 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/threadpool.h"

namespace doris {
Expand Down Expand Up @@ -91,6 +92,18 @@ Status EnginePublishVersionTask::finish() {
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("EnginePublishVersionTask.finish.random random failed");
return Status::InternalError("debug engine publish version task random failed");
}
});
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
Expand Down
61 changes: 56 additions & 5 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);

DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
return Status::InternalError("debug prepare txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});

/// Step 1: check if the transaction is already exist
do {
auto iter = txn_tablet_map.find(key);
Expand Down Expand Up @@ -296,11 +309,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
key.first, key.second, tablet_info.to_string());
}

DBUG_EXECUTE_IF(
"TxnManager.commit_txn_random_failed",
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
return Status::InternalError("debug commit txn random failed");
});
DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.commit_txn.random_failed");
return Status::InternalError("debug commit txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});

std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
// this while loop just run only once, just for if break
Expand Down Expand Up @@ -356,6 +376,12 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
if (!is_recovery) {
Status save_status = RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
rowset_ptr->rowset_meta()->get_rowset_pb());
DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
if (!save_status.ok()) {
return Status::Error<ROWSET_SAVE_FAILED>(
"save committed rowset failed. when commit txn rowset_id: {}, tablet id: {}, "
Expand Down Expand Up @@ -430,13 +456,38 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
"tablet={}",
partition_id, transaction_id, tablet_info.to_string());
}
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
return Status::InternalError("debug publish txn before save rs meta random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});

/// Step 2: make rowset visible
// save meta need access disk, it maybe very slow, so that it is not in global txn lock
// it is under a single txn lock
// TODO(ygl): rowset is already set version here, memory is changed, if save failed
// it maybe a fatal error
rowset->make_visible(version);

DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
return Status::InternalError("debug publish txn after save rs meta random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
// update delete_bitmap
if (tablet_txn_info->unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
Expand Down
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -164,6 +165,19 @@ public void setFinishedTimeMs(long finishedTimeMs) {
this.finishedTimeMs = finishedTimeMs;
}

// /api/debug_point/add/{name}?value=100
private void stateWait(final String name) {
long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0);
if (waitTimeMs > 0) {
try {
LOG.info("debug point {} wait {} ms", name, waitTimeMs);
Thread.sleep(waitTimeMs);
} catch (InterruptedException e) {
LOG.warn(name, e);
}
}
}

/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
Expand All @@ -180,15 +194,24 @@ public synchronized void run() {
return;
}

// /api/debug_point/add/FE.STOP_ALTER_JOB_RUN
if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) {
LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change schedule stopped");
return;
}

try {
switch (jobState) {
case PENDING:
stateWait("FE.ALTER_JOB_V2_PENDING");
runPendingJob();
break;
case WAITING_TXN:
stateWait("FE.ALTER_JOB_V2_WAITING_TXN");
runWaitingTxnJob();
break;
case RUNNING:
stateWait("FE.ALTER_JOB_V2_RUNNING");
runRunningJob();
break;
default:
Expand Down

0 comments on commit 7e82e76

Please sign in to comment.