Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : tccfence log table deleted should be optimized #4490

Merged
merged 18 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -62,6 +65,11 @@ private TCCFenceHandler() {

private static final int MAX_QUEUE_SIZE = 500;

/**
* limit of delete record by date (per sql)
*/
private static final int LIMIT_DELETE = 1000;

Bughue marked this conversation as resolved.
Show resolved Hide resolved
private static final LinkedBlockingQueue<FenceLogIdentity> LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);

private static FenceLogCleanRunnable fenceLogCleanRunnable;
Expand All @@ -76,6 +84,10 @@ private TCCFenceHandler() {
}
}

public static DataSource getDataSource() {
return TCCFenceHandler.dataSource;
}

public static void setDataSource(DataSource dataSource) {
TCCFenceHandler.dataSource = dataSource;
}
Expand Down Expand Up @@ -275,22 +287,32 @@ public static boolean deleteFence(String xid, Long branchId) {
});
}

/**
* Delete TCC Fence By Datetime
*
* @param datetime datetime
* @return the deleted row count
*/


public static int deleteFenceByDate(Date datetime) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
return TCC_FENCE_DAO.deleteTCCFenceDOByDate(conn, datetime);
} catch (RuntimeException e) {
status.setRollbackOnly();
throw e;
DataSource dataSource = TCCFenceHandler.getDataSource();
Connection connection = null;
int total = 0;
try {
connection = DataSourceUtils.getConnection(dataSource);
while (true) {
List<String> xids = new ArrayList<>();
Set<String> xidSet = TCC_FENCE_DAO.queryEndStatusXidsByDate(connection, datetime, LIMIT_DELETE);
xids.addAll(xidSet);
if (xids.isEmpty()) {
Bughue marked this conversation as resolved.
Show resolved Hide resolved
break;
}
total += TCC_FENCE_DAO.deleteTCCFenceDO(connection, xids);
}
});
} catch (RuntimeException e) {
LOGGER.error("delete fence log failed ", e);
} finally {
if (connection != null) {
DataSourceUtils.releaseConnection(connection, dataSource);
}
}
return total;

}

private static void initLogCleanExecutor() {
Expand Down
20 changes: 19 additions & 1 deletion tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.sql.Connection;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
* The TCC Fence Store
Expand All @@ -33,6 +35,14 @@ public interface TCCFenceStore {
*/
TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId);

/**
* Query xid.
* @param datetime the datetime
* @param limit the limit size
* @return the tcc fence do
*/
Set<String> queryEndStatusXidsByDate(Connection conn, Date datetime, int limit);

/**
* Insert tcc fence do boolean.
* @param tccFenceDO the tcc fence do
Expand All @@ -57,12 +67,20 @@ public interface TCCFenceStore {
*/
boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId);

/**
* Delete tcc fence do boolean.
* @param xids the global transaction ids
* @return the boolean
*/
int deleteTCCFenceDO(Connection conn, List<String> xids);

/**
* Delete tcc fence by datetime.
* @param datetime datetime
* @param limit limit
* @return the deleted row count
*/
int deleteTCCFenceDOByDate(Connection conn, Date datetime);
int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit);

/**
* Set LogTable Name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.sql.Timestamp;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* The type TCC Fence store data base dao
Expand Down Expand Up @@ -86,6 +89,28 @@ public TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId) {
}
}

@Override
public Set<String> queryEndStatusXidsByDate(Connection conn, Date datetime, int limit) {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = TCCFenceStoreSqls.getQueryEndStatusSQLByDate(logTableName);
ps = conn.prepareStatement(sql);
ps.setTimestamp(1, new Timestamp(datetime.getTime()));
ps.setInt(2, limit);
rs = ps.executeQuery();
Set<String> xids = new HashSet<>(limit);
while (rs.next()) {
xids.add(rs.getString("xid"));
}
return xids;
} catch (SQLException e) {
throw new DataAccessException(e);
} finally {
IOUtil.close(rs, ps);
}
}

@Override
public boolean insertTCCFenceDO(Connection conn, TCCFenceDO tccFenceDO) {
PreparedStatement ps = null;
Expand Down Expand Up @@ -149,12 +174,31 @@ public boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId) {
}

@Override
public int deleteTCCFenceDOByDate(Connection conn, Date datetime) {
public int deleteTCCFenceDO(Connection conn, List<String> xids) {
PreparedStatement ps = null;
try {
String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?", ",", xids.size());
String sql = TCCFenceStoreSqls.getDeleteSQLByXids(logTableName, paramsPlaceHolder);
ps = conn.prepareStatement(sql);
for (int i = 0; i < xids.size(); i++) {
ps.setString(i + 1, xids.get(i));
}
return ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps);
}
}

@Override
public int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit) {
PreparedStatement ps = null;
try {
String sql = TCCFenceStoreSqls.getDeleteSQLByDateAndStatus(logTableName);
ps = conn.prepareStatement(sql);
ps.setTimestamp(1, new Timestamp(datetime.getTime()));
ps.setInt(2, limit);
return ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ private TCCFenceStoreSqls() {
*/
public static final String LOCAL_TCC_LOG_PLACEHOLD = " #local_tcc_log# ";

/**
* The constant PRAMETER_PLACEHOLD.
* format: ?, ?, ?
*/
public static final String PRAMETER_PLACEHOLD = " #PRAMETER_PLACEHOLD# ";

/**
* The constant INSERT_LOCAL_TCC_LOG.
*/
Expand All @@ -47,6 +53,15 @@ private TCCFenceStoreSqls() {
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where xid = ? and branch_id = ? for update";

/**
* The constant QUERY_END_STATUS_BY_DATE.
*/
protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Later Please submit a PR for Oracle and other databases

+ " where gmt_modified < ? "
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";

/**
* The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID.
*/
Expand All @@ -58,12 +73,19 @@ private TCCFenceStoreSqls() {
*/
protected static final String DELETE_BY_BRANCH_ID_AND_XID = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid = ? and branch_id = ? ";

/**
* The constant DELETE_BY_BRANCH_ID_AND_XID.
*/
protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")";


/**
* The constant DELETE_BY_DATE_AND_STATUS.
*/
protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")";
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";

public static String getInsertLocalTCCLogSQL(String localTccTable) {
return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
Expand All @@ -73,6 +95,10 @@ public static String getQuerySQLByBranchIdAndXid(String localTccTable) {
return QUERY_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getQueryEndStatusSQLByDate(String localTccTable) {
return QUERY_END_STATUS_BY_DATE.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getUpdateStatusSQLByBranchIdAndXid(String localTccTable) {
return UPDATE_STATUS_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}
Expand All @@ -81,6 +107,11 @@ public static String getDeleteSQLByBranchIdAndXid(String localTccTable) {
return DELETE_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getDeleteSQLByXids(String localTccTable, String paramsPlaceHolder) {
return DELETE_BY_BRANCH_XIDS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable)
.replace(PRAMETER_PLACEHOLD, paramsPlaceHolder);
}

public static String getDeleteSQLByDateAndStatus(String localTccTable) {
return DELETE_BY_DATE_AND_STATUS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}
Expand Down