Skip to content

Commit

Permalink
Release/8.0 tidb snapshot (mysql#4)
Browse files Browse the repository at this point in the history
* prepare bug fix

* set tidb Snapshot

* set tidb Snapshot

* set tidb Snapshot

Co-authored-by: song <[email protected]>
  • Loading branch information
lastincisor and lastincisor authored Jun 21, 2022
1 parent 6304705 commit e47ef17
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 17 deletions.
7 changes: 7 additions & 0 deletions src/main/core-impl/java/com/mysql/cj/NativeSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ public void setSessionVariables() {
}
}

public void setSnapshot(String secondaryTs){
StringBuilder query = new StringBuilder("SET @@tidb_snapshot = ");
query.append("\"").append(secondaryTs).append("\"");
//System.out.println("Snapshot-tidb_snapshot:"+query.toString());
this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0);
}

public String getProcessHost() {
try {
long threadId = getThreadId();
Expand Down
4 changes: 4 additions & 0 deletions src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -613,4 +613,8 @@ default boolean isMasterConnection() {
* if an error occurs
*/
String getDatabase() throws SQLException;

default void refreshSnapshot(){

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRo
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
try {

this.connection.refreshSnapshot();
JdbcConnection locallyScopedConnection = this.connection;

((PreparedQuery) this.query).getQueryBindings()
Expand Down Expand Up @@ -921,7 +921,7 @@ public java.sql.ResultSet executeQuery() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {

JdbcConnection locallyScopedConn = this.connection;

this.connection.refreshSnapshot();
if (!this.doPingInstead) {
QueryReturnType queryReturnType = getQueryInfo().getQueryReturnType();
if (queryReturnType != QueryReturnType.PRODUCES_RESULT_SET && queryReturnType != QueryReturnType.MAY_PRODUCE_RESULT_SET) {
Expand Down
114 changes: 101 additions & 13 deletions src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,11 @@
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.sql.*;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.NClob;
import java.sql.ResultSet;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLPermission;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Struct;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -54,6 +46,10 @@
import java.util.Stack;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.mysql.cj.CacheAdapter;
Expand All @@ -69,6 +65,7 @@
import com.mysql.cj.conf.HostInfo;
import com.mysql.cj.conf.PropertyDefinitions.DatabaseTerm;
import com.mysql.cj.conf.PropertyKey;
import com.mysql.cj.conf.PropertySet;
import com.mysql.cj.conf.RuntimeProperty;
import com.mysql.cj.exceptions.CJCommunicationsException;
import com.mysql.cj.exceptions.CJException;
Expand Down Expand Up @@ -108,10 +105,24 @@ public class ConnectionImpl implements JdbcConnection, SessionEventListener, Ser

private static final long serialVersionUID = 4009476458425101761L;

private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID";

private static final String TIDB_TICDC_CF_NAME_KEY = "ticdcCFname";

private static final String TIDB_TICDC_ACID_INTERVAL_KEY = "ticdcACIDInterval";

private static final String QUERY_TIDB_SNAPSHOT_SQL =
"select `secondary_ts` from `tidb_cdc`.`syncpoint_v1` where `cf` = \"{ticdcCFname}\" order by `primary_ts` desc limit 1";


private static final SQLPermission SET_NETWORK_TIMEOUT_PERM = new SQLPermission("setNetworkTimeout");

private static final SQLPermission ABORT_PERM = new SQLPermission("abort");

private final AtomicLong ticdcACIDinitValue = new AtomicLong(0);

private StatementImpl stmt;

@Override
public String getHost() {
return this.session.getHostInfo().getHost();
Expand Down Expand Up @@ -437,9 +448,10 @@ public ConnectionImpl(HostInfo hostInfo) throws SQLException {
}

this.dbmd = getMetaData(false, false);

//ticdcACIDinitValue.set(System.currentTimeMillis());
initializeSafeQueryInterceptors();


} catch (CJException e1) {
throw SQLExceptionsMapping.translateException(e1, getExceptionInterceptor());
}
Expand Down Expand Up @@ -757,6 +769,81 @@ private void closeStatement(java.sql.Statement stmt) {
}
}

private String getTidbSnapshotParameter(String key,String defaultValue){
String value = this.props.getProperty(key);
if(value == null){
value = defaultValue;
}
return value;
}

public String buildTidbSnapshotSql(){
String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null);
if(ticdcCFname == null){
return null;
}
String sql = null;
if(ticdcCFname != null){
sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname);
}
return sql;
}

public void refreshSnapshot(){
String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null);
if(useTicdcACID == null){
return;
}
if(!"true".equals(useTicdcACID)){
return;
}
String sql = buildTidbSnapshotSql();
if(sql == null){
return;
}
String ticdcACIDInterval = getTidbSnapshotParameter(TIDB_TICDC_ACID_INTERVAL_KEY,"300000");
long setSnapshotTime = System.currentTimeMillis();
//System.out.println("Snapshot-tidb_snapshot-timeout:"+(setSnapshotTime - ticdcACIDinitValue.get() - Long.parseLong(ticdcACIDInterval)));
try {
/*
* init setSnapshot
* */
if(ticdcACIDinitValue.get() == 0){
setSnapshot(true,sql);
ticdcACIDinitValue.set(System.currentTimeMillis());
}else if(setSnapshotTime - ticdcACIDinitValue.get() > Long.parseLong(ticdcACIDInterval)){
/*
* long connection setSnapshot
* */
setSnapshot(false, sql);
ticdcACIDinitValue.set(System.currentTimeMillis());
}
}catch (SQLException e){

}

}


public void setSnapshot(Boolean init,String sql) throws SQLException{
if(!init){
this.session.setSnapshot("");
//String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
//System.out.println("Snapshot-tidb_snapshot-set empty:"+tidb_snapshot);
}
try (final ResultSet resultSet = this.stmt.executeQuery(sql)) {
while (resultSet.next()) {
final String secondaryTs = resultSet.getString("secondary_ts");
//System.out.println("Snapshot-tidb_snapshot-db:"+secondaryTs);
if(secondaryTs != null){
this.session.setSnapshot(secondaryTs);
//String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
//System.out.println("Snapshot-tidb_snapshot-queryServerVariable:"+tidb_snapshot);
}
}
}
}

@Override
public void commit() throws SQLException {
synchronized (getConnectionMutex()) {
Expand Down Expand Up @@ -1085,11 +1172,11 @@ public java.sql.Statement createStatement() throws SQLException {

@Override
public java.sql.Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {

StatementImpl stmt = new StatementImpl(getMultiHostSafeProxy(), this.database);
stmt.setResultSetType(resultSetType);
stmt.setResultSetConcurrency(resultSetConcurrency);

this.stmt = stmt;
//refreshSnapshot();
return stmt;
}

Expand Down Expand Up @@ -1642,7 +1729,8 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType
} else {
pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
}

this.stmt = pStmt;
//refreshSnapshot();
return pStmt;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public String getDatabase() throws SQLException {
return null; // we don't reach this code, compiler can't tell
}


@Override
public void setCatalog(String catalog) throws SQLException {

Expand Down
8 changes: 6 additions & 2 deletions src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ private boolean executeInternal(String sql, boolean returnGeneratedKeys) throws
JdbcConnection locallyScopedConn = checkClosed();

synchronized (locallyScopedConn.getConnectionMutex()) {
this.connection.refreshSnapshot();
checkClosed();

checkNullOrEmptyQuery(sql);
Expand Down Expand Up @@ -799,6 +800,7 @@ protected long[] executeBatchInternal() throws SQLException {
JdbcConnection locallyScopedConn = checkClosed();

synchronized (locallyScopedConn.getConnectionMutex()) {
this.connection.refreshSnapshot();
if (locallyScopedConn.isReadOnly()) {
throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"),
MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());
Expand Down Expand Up @@ -1105,7 +1107,9 @@ protected SQLException handleExceptionForBatch(int endOfBatchIndex, int numValue
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
JdbcConnection locallyScopedConn = this.connection;

if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
this.connection.refreshSnapshot();
}
this.retrieveGeneratedKeys = false;

checkNullOrEmptyQuery(sql);
Expand Down Expand Up @@ -1249,7 +1253,7 @@ public int executeUpdate(String sql) throws SQLException {
protected long executeUpdateInternal(String sql, boolean isBatch, boolean returnGeneratedKeys) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
JdbcConnection locallyScopedConn = this.connection;

this.connection.refreshSnapshot();
checkNullOrEmptyQuery(sql);

resetCancelledState();
Expand Down
Loading

0 comments on commit e47ef17

Please sign in to comment.