diff --git a/src/main/core-impl/java/com/mysql/cj/NativeSession.java b/src/main/core-impl/java/com/mysql/cj/NativeSession.java index bf2c7b4d6..9ac3fef63 100644 --- a/src/main/core-impl/java/com/mysql/cj/NativeSession.java +++ b/src/main/core-impl/java/com/mysql/cj/NativeSession.java @@ -515,6 +515,13 @@ public void setSessionVariables() { } } + public void setSnapshot(String secondaryTs){ + StringBuilder query = new StringBuilder("SET @@tidb_snapshot = "); + query.append("\"").append(secondaryTs).append("\""); + //System.out.println("Snapshot-tidb_snapshot:"+query.toString()); + this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0); + } + public String getProcessHost() { try { long threadId = getThreadId(); diff --git a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java index 04130011a..1ac08d0eb 100644 --- a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java +++ b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java @@ -613,4 +613,8 @@ default boolean isMasterConnection() { * if an error occurs */ String getDatabase() throws SQLException; + + default void refreshSnapshot(){ + + }; } diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java index 193de0024..ca47900f9 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java @@ -873,7 +873,7 @@ protected ResultSetInternalMethods executeInternal(int maxRo boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { try { - + this.connection.refreshSnapshot(); JdbcConnection locallyScopedConnection = this.connection; ((PreparedQuery) this.query).getQueryBindings() @@ -921,7 +921,7 @@ public java.sql.ResultSet executeQuery() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - + this.connection.refreshSnapshot(); if (!this.doPingInstead) { QueryReturnType queryReturnType = getQueryInfo().getQueryReturnType(); if (queryReturnType != QueryReturnType.PRODUCES_RESULT_SET && queryReturnType != QueryReturnType.MAY_PRODUCE_RESULT_SET) { diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java index 2ac7357fb..44855e0a7 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java @@ -32,19 +32,11 @@ import java.io.Serializable; import java.lang.ref.WeakReference; import java.lang.reflect.InvocationHandler; +import java.sql.*; import java.sql.Blob; import java.sql.Clob; import java.sql.DatabaseMetaData; -import java.sql.DriverManager; import java.sql.NClob; -import java.sql.ResultSet; -import java.sql.SQLClientInfoException; -import java.sql.SQLException; -import java.sql.SQLPermission; -import java.sql.SQLWarning; -import java.sql.SQLXML; -import java.sql.Savepoint; -import java.sql.Struct; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -54,6 +46,10 @@ import java.util.Stack; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.mysql.cj.CacheAdapter; @@ -69,6 +65,7 @@ import com.mysql.cj.conf.HostInfo; import com.mysql.cj.conf.PropertyDefinitions.DatabaseTerm; import com.mysql.cj.conf.PropertyKey; +import com.mysql.cj.conf.PropertySet; import com.mysql.cj.conf.RuntimeProperty; import com.mysql.cj.exceptions.CJCommunicationsException; import com.mysql.cj.exceptions.CJException; @@ -108,10 +105,24 @@ public class ConnectionImpl implements JdbcConnection, SessionEventListener, Ser private static final long serialVersionUID = 4009476458425101761L; + private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID"; + + private static final String TIDB_TICDC_CF_NAME_KEY = "ticdcCFname"; + + private static final String TIDB_TICDC_ACID_INTERVAL_KEY = "ticdcACIDInterval"; + + private static final String QUERY_TIDB_SNAPSHOT_SQL = + "select `secondary_ts` from `tidb_cdc`.`syncpoint_v1` where `cf` = \"{ticdcCFname}\" order by `primary_ts` desc limit 1"; + + private static final SQLPermission SET_NETWORK_TIMEOUT_PERM = new SQLPermission("setNetworkTimeout"); private static final SQLPermission ABORT_PERM = new SQLPermission("abort"); + private final AtomicLong ticdcACIDinitValue = new AtomicLong(0); + + private StatementImpl stmt; + @Override public String getHost() { return this.session.getHostInfo().getHost(); @@ -437,9 +448,10 @@ public ConnectionImpl(HostInfo hostInfo) throws SQLException { } this.dbmd = getMetaData(false, false); - + //ticdcACIDinitValue.set(System.currentTimeMillis()); initializeSafeQueryInterceptors(); + } catch (CJException e1) { throw SQLExceptionsMapping.translateException(e1, getExceptionInterceptor()); } @@ -757,6 +769,81 @@ private void closeStatement(java.sql.Statement stmt) { } } + private String getTidbSnapshotParameter(String key,String defaultValue){ + String value = this.props.getProperty(key); + if(value == null){ + value = defaultValue; + } + return value; + } + + public String buildTidbSnapshotSql(){ + String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null); + if(ticdcCFname == null){ + return null; + } + String sql = null; + if(ticdcCFname != null){ + sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname); + } + return sql; + } + + public void refreshSnapshot(){ + String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null); + if(useTicdcACID == null){ + return; + } + if(!"true".equals(useTicdcACID)){ + return; + } + String sql = buildTidbSnapshotSql(); + if(sql == null){ + return; + } + String ticdcACIDInterval = getTidbSnapshotParameter(TIDB_TICDC_ACID_INTERVAL_KEY,"300000"); + long setSnapshotTime = System.currentTimeMillis(); + //System.out.println("Snapshot-tidb_snapshot-timeout:"+(setSnapshotTime - ticdcACIDinitValue.get() - Long.parseLong(ticdcACIDInterval))); + try { + /* + * init setSnapshot + * */ + if(ticdcACIDinitValue.get() == 0){ + setSnapshot(true,sql); + ticdcACIDinitValue.set(System.currentTimeMillis()); + }else if(setSnapshotTime - ticdcACIDinitValue.get() > Long.parseLong(ticdcACIDInterval)){ + /* + * long connection setSnapshot + * */ + setSnapshot(false, sql); + ticdcACIDinitValue.set(System.currentTimeMillis()); + } + }catch (SQLException e){ + + } + + } + + + public void setSnapshot(Boolean init,String sql) throws SQLException{ + if(!init){ + this.session.setSnapshot(""); + //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot"); + //System.out.println("Snapshot-tidb_snapshot-set empty:"+tidb_snapshot); + } + try (final ResultSet resultSet = this.stmt.executeQuery(sql)) { + while (resultSet.next()) { + final String secondaryTs = resultSet.getString("secondary_ts"); + //System.out.println("Snapshot-tidb_snapshot-db:"+secondaryTs); + if(secondaryTs != null){ + this.session.setSnapshot(secondaryTs); + //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot"); + //System.out.println("Snapshot-tidb_snapshot-queryServerVariable:"+tidb_snapshot); + } + } + } + } + @Override public void commit() throws SQLException { synchronized (getConnectionMutex()) { @@ -1085,11 +1172,11 @@ public java.sql.Statement createStatement() throws SQLException { @Override public java.sql.Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - StatementImpl stmt = new StatementImpl(getMultiHostSafeProxy(), this.database); stmt.setResultSetType(resultSetType); stmt.setResultSetConcurrency(resultSetConcurrency); - + this.stmt = stmt; + //refreshSnapshot(); return stmt; } @@ -1642,7 +1729,8 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType } else { pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false); } - + this.stmt = pStmt; + //refreshSnapshot(); return pStmt; } } diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionWrapper.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionWrapper.java index 4b0202058..e297b14a6 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionWrapper.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionWrapper.java @@ -156,6 +156,7 @@ public String getDatabase() throws SQLException { return null; // we don't reach this code, compiler can't tell } + @Override public void setCatalog(String catalog) throws SQLException { diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java index d6d9aafa0..74b159907 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java @@ -652,6 +652,7 @@ private boolean executeInternal(String sql, boolean returnGeneratedKeys) throws JdbcConnection locallyScopedConn = checkClosed(); synchronized (locallyScopedConn.getConnectionMutex()) { + this.connection.refreshSnapshot(); checkClosed(); checkNullOrEmptyQuery(sql); @@ -799,6 +800,7 @@ protected long[] executeBatchInternal() throws SQLException { JdbcConnection locallyScopedConn = checkClosed(); synchronized (locallyScopedConn.getConnectionMutex()) { + this.connection.refreshSnapshot(); if (locallyScopedConn.isReadOnly()) { throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor()); @@ -1105,7 +1107,9 @@ protected SQLException handleExceptionForBatch(int endOfBatchIndex, int numValue public java.sql.ResultSet executeQuery(String sql) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + this.connection.refreshSnapshot(); + } this.retrieveGeneratedKeys = false; checkNullOrEmptyQuery(sql); @@ -1249,7 +1253,7 @@ public int executeUpdate(String sql) throws SQLException { protected long executeUpdateInternal(String sql, boolean isBatch, boolean returnGeneratedKeys) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - + this.connection.refreshSnapshot(); checkNullOrEmptyQuery(sql); resetCancelledState(); diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java new file mode 100644 index 000000000..3f540e7e9 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java @@ -0,0 +1,232 @@ +package com.mysql.cj.jdbc; + +import java.sql.*; + +public class StatementProxy implements Statement { + + private Statement statement; + + public StatementProxy(Statement statement) { + this.statement = statement; + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return 0; + } + + @Override + public void close() throws SQLException { + + } + + @Override + public int getMaxFieldSize() throws SQLException { + return 0; + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + + } + + @Override + public int getMaxRows() throws SQLException { + return 0; + } + + @Override + public void setMaxRows(int max) throws SQLException { + + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + + } + + @Override + public int getQueryTimeout() throws SQLException { + return 0; + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + + } + + @Override + public void cancel() throws SQLException { + + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public void setCursorName(String name) throws SQLException { + + } + + @Override + public boolean execute(String sql) throws SQLException { + return false; + } + + @Override + public ResultSet getResultSet() throws SQLException { + return null; + } + + @Override + public int getUpdateCount() throws SQLException { + return 0; + } + + @Override + public boolean getMoreResults() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public void setFetchSize(int rows) throws SQLException { + + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return 0; + } + + @Override + public int getResultSetType() throws SQLException { + return 0; + } + + @Override + public void addBatch(String sql) throws SQLException { + + } + + @Override + public void clearBatch() throws SQLException { + + } + + @Override + public int[] executeBatch() throws SQLException { + return new int[0]; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return false; + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return 0; + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + return false; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return 0; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + + } + + @Override + public boolean isPoolable() throws SQLException { + return false; + } + + @Override + public void closeOnCompletion() throws SQLException { + + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +}