Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 29, 2024
1 parent 0be9160 commit 127cd9e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll
while (it->has_next()) {
total_iteration_cnt++;
auto [k, v] = it->next();
LOG(INFO) << "check txn info txn_info_key=" << hex(k);
VLOG_DEBUG << "check txn info txn_info_key=" << hex(k);
TxnInfoPB info_pb;
if (!info_pb.ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,32 +446,35 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) {

public static boolean checkFailedTxnsByCoordinator(TransactionState txn) {
TxnCoordinator coordinator = txn.getCoordinator();
boolean offline = true;
if (coordinator.sourceType == TransactionState.TxnSourceType.FE) {
List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
for (Frontend fe : frontends) {
if (fe.getHost().equals(coordinator.ip) && fe.getLastStartupTime() > coordinator.startTime) {
return true;
if (fe.getHost().equals(coordinator.ip)) {
offline = false;
if (fe.getLastStartupTime() > coordinator.startTime) {
return true;
}
}
}
} else if (coordinator.sourceType == TransactionState.TxnSourceType.BE) {
Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id);
if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime
|| (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) {
return true;
if (be != null) {
offline = false;
if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime
|| (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) {
return true;
}
}
}
return false;
return offline;
}

public static List<TransactionState> checkFailedTxns(List<TransactionState> conflictTxns) {
List<TransactionState> failedTxns = new ArrayList<>();
for (TransactionState txn : conflictTxns) {
boolean failed = false;
if (!failed) {
failed = checkFailedTxnsByCoordinator(txn);
}
if (failed) {
if (checkFailedTxnsByCoordinator(txn)) {
failedTxns.add(txn);
}
}
Expand Down

0 comments on commit 127cd9e

Please sign in to comment.