From 046988220f3d1009628695b2d5389c5b9149cc33 Mon Sep 17 00:00:00 2001 From: lastincisor <64511824+lastincisor@users.noreply.github.com> Date: Mon, 11 Jul 2022 15:44:06 +0800 Subject: [PATCH] Release/8.0 tidb snapshot (#21) * prepare bug fix * set tidb Snapshot * set tidb Snapshot * set tidb Snapshot * asyn load snapshot * asyn load snapshot * monitor load snapshot * monitor load snapshot PreparedStatementProxy snapshot StatementProxy snapshot * monitor load snapshot PreparedStatementProxy snapshot StatementProxy snapshot Co-authored-by: song --- build.xml | 3 +- .../java/com/mysql/cj/conf/HostInfo.java | 14 + .../java/com/mysql/cj/NativeSession.java | 12 +- .../com/mysql/cj/jdbc/JdbcConnection.java | 3 - .../cj/jdbc/ClientPreparedStatement.java | 5 +- .../com/mysql/cj/jdbc/ConnectionImpl.java | 103 +--- .../mysql/cj/jdbc/NonRegisteringDriver.java | 13 +- .../mysql/cj/jdbc/PreparedStatementProxy.java | 529 ++++++++++++++++++ .../java/com/mysql/cj/jdbc/StatementImpl.java | 10 +- .../com/mysql/cj/jdbc/StatementProxy.java | 123 ++-- .../java/com/tidb/jdbc/TidbCdcOperate.java | 110 ++++ .../java/com/tidb/jdbc/TidbCdcWrapper.java | 15 + .../java/com/tidb/snapshot/Monitor.java | 116 ++++ .../java/com/tidb/snapshot/Ticdc.java | 38 ++ 14 files changed, 946 insertions(+), 148 deletions(-) create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java create mode 100644 src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java create mode 100644 src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java create mode 100644 src/main/user-impl/java/com/tidb/snapshot/Monitor.java create mode 100644 src/main/user-impl/java/com/tidb/snapshot/Ticdc.java diff --git a/build.xml b/build.xml index f4971ec99..45c263314 100644 --- a/build.xml +++ b/build.xml @@ -210,7 +210,8 @@ See also com.mysql.cj.conf.PropertyDefinitions.SYSP_* variables for other test o value="${com.mysql.cj.build.driver.version.series}.${com.mysql.cj.build.driver.version.subminor}" /> - + + diff --git a/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java b/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java index addcb402c..ad1973c77 100644 --- a/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java +++ b/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java @@ -29,6 +29,9 @@ package com.mysql.cj.conf; + +import com.tidb.snapshot.Ticdc; + import static com.mysql.cj.util.StringUtils.isNullOrEmpty; import java.util.Collections; @@ -55,6 +58,9 @@ public class HostInfo implements DatabaseUrlContainer { private final int port; private final String user; private final String password; + + private Ticdc ticdc = new Ticdc(); + private final Map hostProperties = new HashMap<>(); /** @@ -238,4 +244,12 @@ public String toString() { asStr.append(String.format(" :: {host: \"%s\", port: %d, hostProperties: %s}", this.host, this.port, this.hostProperties)); return asStr.toString(); } + + public Ticdc getTicdc() { + return ticdc; + } + + public void setTicdc(Ticdc ticdc) { + this.ticdc = ticdc; + } } diff --git a/src/main/core-impl/java/com/mysql/cj/NativeSession.java b/src/main/core-impl/java/com/mysql/cj/NativeSession.java index 9ac3fef63..72bd24b06 100644 --- a/src/main/core-impl/java/com/mysql/cj/NativeSession.java +++ b/src/main/core-impl/java/com/mysql/cj/NativeSession.java @@ -516,10 +516,14 @@ public void setSessionVariables() { } public void setSnapshot(String secondaryTs){ - StringBuilder query = new StringBuilder("SET @@tidb_snapshot = "); - query.append("\"").append(secondaryTs).append("\""); - //System.out.println("Snapshot-tidb_snapshot:"+query.toString()); - this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0); + try { + StringBuilder query = new StringBuilder("SET @@tidb_snapshot = "); + query.append("\"").append(secondaryTs).append("\""); + System.out.println("Snapshot-setSnapshot:"+query.toString()); + this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0); + }catch (Exception e){ + System.out.println("setSnapshot error"+e); + } } public String getProcessHost() { diff --git a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java index 1ac08d0eb..c83ad9807 100644 --- a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java +++ b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java @@ -614,7 +614,4 @@ default boolean isMasterConnection() { */ String getDatabase() throws SQLException; - default void refreshSnapshot(){ - - }; } diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java index ca47900f9..be551c5c5 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java @@ -873,7 +873,8 @@ protected ResultSetInternalMethods executeInternal(int maxRo boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { try { - this.connection.refreshSnapshot(); + + // TODO this.connection.refreshSnapshot(); JdbcConnection locallyScopedConnection = this.connection; ((PreparedQuery) this.query).getQueryBindings() @@ -921,7 +922,7 @@ public java.sql.ResultSet executeQuery() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - this.connection.refreshSnapshot(); + // TODO this.connection.refreshSnapshot(); if (!this.doPingInstead) { QueryReturnType queryReturnType = getQueryInfo().getQueryReturnType(); if (queryReturnType != QueryReturnType.PRODUCES_RESULT_SET && queryReturnType != QueryReturnType.MAY_PRODUCE_RESULT_SET) { diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java index 44855e0a7..20a469ee7 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java @@ -46,9 +46,6 @@ import java.util.Stack; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -92,6 +89,7 @@ import com.mysql.cj.util.LRUCache; import com.mysql.cj.util.StringUtils; import com.mysql.cj.util.Util; +import com.tidb.snapshot.Ticdc; /** * A Connection represents a session with a specific database. Within the context of a Connection, SQL statements are executed and results are returned. @@ -119,9 +117,9 @@ public class ConnectionImpl implements JdbcConnection, SessionEventListener, Ser private static final SQLPermission ABORT_PERM = new SQLPermission("abort"); - private final AtomicLong ticdcACIDinitValue = new AtomicLong(0); + private AtomicLong secondaryTs = new AtomicLong(0); - private StatementImpl stmt; + private Ticdc ticdc; @Override public String getHost() { @@ -146,6 +144,14 @@ public void setProxy(JdbcConnection proxy) { this.realProxy = this.topProxy instanceof MultiHostMySQLConnection ? ((MultiHostMySQLConnection) proxy).getThisAsProxy() : null; } + public void setSecondaryTs(Long secondaryTs){ + this.secondaryTs.set(secondaryTs); + } + + public Long getSecondaryTs(){ + return secondaryTs.get(); + } + // this connection has to be proxied when using multi-host settings so that statements get routed to the right physical connection // (works as "logical" connection) private JdbcConnection getProxy() { @@ -385,6 +391,7 @@ protected ConnectionImpl() { public ConnectionImpl(HostInfo hostInfo) throws SQLException { try { + this.ticdc = hostInfo.getTicdc(); // Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout(). this.origHostInfo = hostInfo; this.origHostToConnectTo = hostInfo.getHost(); @@ -769,81 +776,6 @@ private void closeStatement(java.sql.Statement stmt) { } } - private String getTidbSnapshotParameter(String key,String defaultValue){ - String value = this.props.getProperty(key); - if(value == null){ - value = defaultValue; - } - return value; - } - - public String buildTidbSnapshotSql(){ - String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null); - if(ticdcCFname == null){ - return null; - } - String sql = null; - if(ticdcCFname != null){ - sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname); - } - return sql; - } - - public void refreshSnapshot(){ - String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null); - if(useTicdcACID == null){ - return; - } - if(!"true".equals(useTicdcACID)){ - return; - } - String sql = buildTidbSnapshotSql(); - if(sql == null){ - return; - } - String ticdcACIDInterval = getTidbSnapshotParameter(TIDB_TICDC_ACID_INTERVAL_KEY,"300000"); - long setSnapshotTime = System.currentTimeMillis(); - //System.out.println("Snapshot-tidb_snapshot-timeout:"+(setSnapshotTime - ticdcACIDinitValue.get() - Long.parseLong(ticdcACIDInterval))); - try { - /* - * init setSnapshot - * */ - if(ticdcACIDinitValue.get() == 0){ - setSnapshot(true,sql); - ticdcACIDinitValue.set(System.currentTimeMillis()); - }else if(setSnapshotTime - ticdcACIDinitValue.get() > Long.parseLong(ticdcACIDInterval)){ - /* - * long connection setSnapshot - * */ - setSnapshot(false, sql); - ticdcACIDinitValue.set(System.currentTimeMillis()); - } - }catch (SQLException e){ - - } - - } - - - public void setSnapshot(Boolean init,String sql) throws SQLException{ - if(!init){ - this.session.setSnapshot(""); - //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot"); - //System.out.println("Snapshot-tidb_snapshot-set empty:"+tidb_snapshot); - } - try (final ResultSet resultSet = this.stmt.executeQuery(sql)) { - while (resultSet.next()) { - final String secondaryTs = resultSet.getString("secondary_ts"); - //System.out.println("Snapshot-tidb_snapshot-db:"+secondaryTs); - if(secondaryTs != null){ - this.session.setSnapshot(secondaryTs); - //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot"); - //System.out.println("Snapshot-tidb_snapshot-queryServerVariable:"+tidb_snapshot); - } - } - } - } - @Override public void commit() throws SQLException { synchronized (getConnectionMutex()) { @@ -1175,9 +1107,8 @@ public java.sql.Statement createStatement(int resultSetType, int resultSetConcur StatementImpl stmt = new StatementImpl(getMultiHostSafeProxy(), this.database); stmt.setResultSetType(resultSetType); stmt.setResultSetConcurrency(resultSetConcurrency); - this.stmt = stmt; - //refreshSnapshot(); - return stmt; + StatementProxy proxy = new StatementProxy(this,stmt,ticdc); + return proxy; } @Override @@ -1729,9 +1660,9 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType } else { pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false); } - this.stmt = pStmt; - //refreshSnapshot(); - return pStmt; + + PreparedStatementProxy proxy = new PreparedStatementProxy(this,pStmt,ticdc); + return proxy; } } diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java b/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java index 9dd632523..15940179d 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java @@ -32,10 +32,14 @@ import static com.mysql.cj.util.StringUtils.isNullOrEmpty; import java.sql.DriverPropertyInfo; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import com.mysql.cj.Constants; @@ -52,6 +56,8 @@ import com.mysql.cj.jdbc.ha.LoadBalancedConnectionProxy; import com.mysql.cj.jdbc.ha.ReplicationConnectionProxy; import com.mysql.cj.util.StringUtils; +import com.tidb.snapshot.Monitor; +import com.tidb.snapshot.Ticdc; /** * The Java SQL framework allows for multiple database drivers. Each driver should supply a class that implements the Driver interface @@ -73,6 +79,9 @@ */ public class NonRegisteringDriver implements java.sql.Driver { + + private Monitor monitor; + /* * Standardizes OS name information to align with other drivers/clients * for MySQL connection attributes @@ -127,6 +136,7 @@ static int getMinorVersionInternal() { */ public NonRegisteringDriver() throws SQLException { // Required for Class.forName().newInstance() + monitor = Monitor.of(this); } /** @@ -191,8 +201,9 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept */ return null; } - + Ticdc ticdc = monitor.setInfo(url,info).get(); ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info); + conStr.getMainHost().setTicdc(ticdc); switch (conStr.getType()) { case SINGLE_CONNECTION: return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost()); diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java b/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java new file mode 100644 index 000000000..d747d0de9 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java @@ -0,0 +1,529 @@ +package com.mysql.cj.jdbc; + +import com.tidb.jdbc.TidbCdcOperate; +import com.tidb.jdbc.TidbCdcWrapper; +import com.tidb.snapshot.Ticdc; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.NClob; +import java.util.Calendar; + +public class PreparedStatementProxy extends TidbCdcWrapper implements PreparedStatement { + + private PreparedStatement preparedStatement; + + private ConnectionImpl connection; + + public PreparedStatementProxy(ConnectionImpl connection,PreparedStatement preparedStatement, Ticdc ticdc) { + this.connection = connection; + this.preparedStatement = preparedStatement; + super.connection = connection; + super.ticdc = ticdc; + } + + @Override + public ResultSet executeQuery() throws SQLException { + refreshSnapshot(); + return preparedStatement.executeQuery(); + } + + @Override + public int executeUpdate() throws SQLException { + return preparedStatement.executeUpdate(); + } + + @Override + public void setNull(int parameterIndex, int sqlType) throws SQLException { + preparedStatement.setNull( parameterIndex, sqlType); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + preparedStatement.setBoolean( parameterIndex, x); + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + preparedStatement.setByte( parameterIndex, x); + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + preparedStatement.setShort( parameterIndex, x); + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + preparedStatement.setInt( parameterIndex, x); + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + preparedStatement.setLong( parameterIndex, x); + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + preparedStatement.setFloat( parameterIndex, x); + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + preparedStatement.setDouble( parameterIndex, x); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + preparedStatement.setBigDecimal( parameterIndex, x); + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + preparedStatement.setString( parameterIndex, x); + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + preparedStatement.setBytes( parameterIndex, x); + } + + @Override + public void setDate(int parameterIndex, Date x) throws SQLException { + preparedStatement.setDate( parameterIndex, x); + } + + @Override + public void setTime(int parameterIndex, Time x) throws SQLException { + preparedStatement.setTime( parameterIndex, x); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + preparedStatement.setTimestamp( parameterIndex, x); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + preparedStatement.setAsciiStream( parameterIndex, x, length); + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + preparedStatement.setUnicodeStream( parameterIndex, x, length); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + preparedStatement.setBinaryStream( parameterIndex, x, length); + } + + @Override + public void clearParameters() throws SQLException { + preparedStatement.clearParameters(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + preparedStatement.setObject( parameterIndex, x, targetSqlType); + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + preparedStatement.setObject( parameterIndex, x); + } + + @Override + public boolean execute() throws SQLException { + refreshSnapshot(); + return preparedStatement.execute(); + } + + @Override + public void addBatch() throws SQLException { + preparedStatement.addBatch(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + preparedStatement.setCharacterStream( parameterIndex, reader, length); + } + + @Override + public void setRef(int parameterIndex, Ref x) throws SQLException { + preparedStatement.setRef( parameterIndex, x); + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + preparedStatement.setBlob( parameterIndex, x); + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + preparedStatement.setClob( parameterIndex, x); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + preparedStatement.setArray( parameterIndex, x); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return preparedStatement.getMetaData() ; + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + preparedStatement.setDate( parameterIndex, x, cal); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + preparedStatement.setTime( parameterIndex, x, cal); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + preparedStatement.setTimestamp( parameterIndex, x, cal); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + preparedStatement.setNull( parameterIndex, sqlType, typeName); + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + preparedStatement.setURL( parameterIndex, x); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + return preparedStatement.getParameterMetaData(); + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + preparedStatement.setRowId( parameterIndex, x); + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + preparedStatement.setNString( parameterIndex, value); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + preparedStatement.setNCharacterStream( parameterIndex, value, length); + } + + @Override + public void setNClob(int parameterIndex, NClob value) throws SQLException { + preparedStatement.setNClob( parameterIndex, value); + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + preparedStatement.setClob( parameterIndex, reader, length); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + preparedStatement.setBlob( parameterIndex, inputStream, length); + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + preparedStatement.setNClob( parameterIndex, reader, length); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + preparedStatement.setSQLXML( parameterIndex, xmlObject); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + preparedStatement.setObject( parameterIndex, x, targetSqlType, scaleOrLength); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + preparedStatement.setAsciiStream( parameterIndex, x, length); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + preparedStatement.setBinaryStream( parameterIndex, x, length); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + preparedStatement.setCharacterStream( parameterIndex, reader, length); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + preparedStatement.setAsciiStream( parameterIndex, x); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + preparedStatement.setBinaryStream( parameterIndex, x); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + preparedStatement.setCharacterStream( parameterIndex, reader); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + preparedStatement.setNCharacterStream( parameterIndex, value); + } + + @Override + public void setClob(int parameterIndex, Reader reader) throws SQLException { + preparedStatement.setClob( parameterIndex, reader); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + preparedStatement.setBlob( parameterIndex, inputStream); + } + + @Override + public void setNClob(int parameterIndex, Reader reader) throws SQLException { + preparedStatement.setNClob( parameterIndex, reader); + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + return preparedStatement.executeQuery( sql); + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return preparedStatement.executeUpdate( sql); + } + + @Override + public void close() throws SQLException { + preparedStatement.close(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + return preparedStatement.getMaxFieldSize(); + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + preparedStatement.setMaxFieldSize( max); + } + + @Override + public int getMaxRows() throws SQLException { + return preparedStatement.getMaxRows(); + } + + @Override + public void setMaxRows(int max) throws SQLException { + preparedStatement.setMaxRows( max); + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + preparedStatement.setEscapeProcessing( enable); + } + + @Override + public int getQueryTimeout() throws SQLException { + return preparedStatement.getQueryTimeout(); + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + preparedStatement.setQueryTimeout( seconds); + } + + @Override + public void cancel() throws SQLException { + preparedStatement.cancel(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return preparedStatement.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + preparedStatement.clearWarnings(); + } + + @Override + public void setCursorName(String name) throws SQLException { + preparedStatement.setCursorName( name); + } + + @Override + public boolean execute(String sql) throws SQLException { + return preparedStatement.execute( sql); + } + + @Override + public ResultSet getResultSet() throws SQLException { + return preparedStatement.getResultSet(); + } + + @Override + public int getUpdateCount() throws SQLException { + return preparedStatement.getUpdateCount(); + } + + @Override + public boolean getMoreResults() throws SQLException { + return preparedStatement.getMoreResults(); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + preparedStatement.setFetchDirection( direction); + } + + @Override + public int getFetchDirection() throws SQLException { + return preparedStatement.getFetchDirection(); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + preparedStatement.setFetchSize( rows); + } + + @Override + public int getFetchSize() throws SQLException { + return preparedStatement.getFetchSize(); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return preparedStatement.getResultSetConcurrency(); + } + + @Override + public int getResultSetType() throws SQLException { + return preparedStatement.getResultSetType(); + } + + @Override + public void addBatch(String sql) throws SQLException { + preparedStatement.addBatch( sql); + } + + @Override + public void clearBatch() throws SQLException { + preparedStatement.clearBatch(); + } + + @Override + public int[] executeBatch() throws SQLException { + return preparedStatement.executeBatch(); + } + + @Override + public Connection getConnection() throws SQLException { + return preparedStatement.getConnection(); + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return preparedStatement.getMoreResults( current) ; + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return preparedStatement.getGeneratedKeys(); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return preparedStatement.executeUpdate( sql, autoGeneratedKeys); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return preparedStatement.executeUpdate( sql,columnIndexes); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return preparedStatement.executeUpdate( sql, columnNames); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + refreshSnapshot(); + return preparedStatement.execute(sql, autoGeneratedKeys); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + refreshSnapshot(); + return preparedStatement.execute( sql, columnIndexes); + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + refreshSnapshot(); + return preparedStatement.execute( sql, columnNames); + } + + @Override + public int getResultSetHoldability() throws SQLException { + return preparedStatement.getResultSetHoldability(); + } + + @Override + public boolean isClosed() throws SQLException { + return preparedStatement.isClosed(); + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + preparedStatement.setPoolable( poolable); + } + + @Override + public boolean isPoolable() throws SQLException { + return preparedStatement.isPoolable(); + } + + @Override + public void closeOnCompletion() throws SQLException { + preparedStatement.closeOnCompletion(); + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return preparedStatement.isCloseOnCompletion(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return preparedStatement.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return preparedStatement.isWrapperFor(iface); + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java index 74b159907..462b89055 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java @@ -652,7 +652,7 @@ private boolean executeInternal(String sql, boolean returnGeneratedKeys) throws JdbcConnection locallyScopedConn = checkClosed(); synchronized (locallyScopedConn.getConnectionMutex()) { - this.connection.refreshSnapshot(); + // TODO this.connection.refreshSnapshot(); checkClosed(); checkNullOrEmptyQuery(sql); @@ -800,7 +800,7 @@ protected long[] executeBatchInternal() throws SQLException { JdbcConnection locallyScopedConn = checkClosed(); synchronized (locallyScopedConn.getConnectionMutex()) { - this.connection.refreshSnapshot(); + // TODO this.connection.refreshSnapshot(); if (locallyScopedConn.isReadOnly()) { throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor()); @@ -1107,9 +1107,9 @@ protected SQLException handleExceptionForBatch(int endOfBatchIndex, int numValue public java.sql.ResultSet executeQuery(String sql) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + /*** TODO if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ this.connection.refreshSnapshot(); - } + }**/ this.retrieveGeneratedKeys = false; checkNullOrEmptyQuery(sql); @@ -1253,7 +1253,7 @@ public int executeUpdate(String sql) throws SQLException { protected long executeUpdateInternal(String sql, boolean isBatch, boolean returnGeneratedKeys) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; - this.connection.refreshSnapshot(); + //TODO this.connection.refreshSnapshot(); checkNullOrEmptyQuery(sql); resetCancelledState(); diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java index 3f540e7e9..b58740d9f 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java @@ -1,232 +1,263 @@ package com.mysql.cj.jdbc; + +import com.tidb.jdbc.TidbCdcWrapper; +import com.tidb.snapshot.Ticdc; + import java.sql.*; -public class StatementProxy implements Statement { +public class StatementProxy extends TidbCdcWrapper implements Statement { + + private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID"; private Statement statement; - public StatementProxy(Statement statement) { + public ConnectionImpl connection; + + public StatementProxy(ConnectionImpl connection,Statement statement, Ticdc ticdc) { this.statement = statement; + this.connection = connection; + super.connection = connection; + super.ticdc = ticdc; } + + @Override public ResultSet executeQuery(String sql) throws SQLException { - return null; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.executeQuery(sql); } @Override public int executeUpdate(String sql) throws SQLException { - return 0; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.executeUpdate(sql); } @Override public void close() throws SQLException { - + statement.close(); } @Override public int getMaxFieldSize() throws SQLException { - return 0; + return statement.getMaxFieldSize(); } @Override public void setMaxFieldSize(int max) throws SQLException { - + statement.setMaxFieldSize(max); } @Override public int getMaxRows() throws SQLException { - return 0; + return statement.getMaxRows(); } @Override public void setMaxRows(int max) throws SQLException { - + statement.setMaxRows(max); } @Override public void setEscapeProcessing(boolean enable) throws SQLException { - + statement.setEscapeProcessing(enable); } @Override public int getQueryTimeout() throws SQLException { - return 0; + return statement.getQueryTimeout(); } @Override public void setQueryTimeout(int seconds) throws SQLException { - + statement.setQueryTimeout(seconds); } @Override public void cancel() throws SQLException { - + statement.cancel(); } @Override public SQLWarning getWarnings() throws SQLException { - return null; + return statement.getWarnings(); } @Override public void clearWarnings() throws SQLException { - + statement.clearWarnings(); } @Override public void setCursorName(String name) throws SQLException { - + statement.setCursorName(name); } @Override public boolean execute(String sql) throws SQLException { - return false; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.execute(sql); } @Override public ResultSet getResultSet() throws SQLException { - return null; + return statement.getResultSet(); } @Override public int getUpdateCount() throws SQLException { - return 0; + return statement.getUpdateCount(); } @Override public boolean getMoreResults() throws SQLException { - return false; + return statement.getMoreResults(); } @Override public void setFetchDirection(int direction) throws SQLException { - + statement.setFetchDirection(direction); } @Override public int getFetchDirection() throws SQLException { - return 0; + return statement.getFetchDirection(); } @Override public void setFetchSize(int rows) throws SQLException { - + statement.setFetchSize(rows); } @Override public int getFetchSize() throws SQLException { - return 0; + return statement.getFetchSize(); } @Override public int getResultSetConcurrency() throws SQLException { - return 0; + return statement.getResultSetConcurrency(); } @Override public int getResultSetType() throws SQLException { - return 0; + return statement.getResultSetType(); } @Override public void addBatch(String sql) throws SQLException { - + statement.addBatch(sql); } @Override public void clearBatch() throws SQLException { - + statement.clearBatch(); } @Override public int[] executeBatch() throws SQLException { - return new int[0]; + return statement.executeBatch(); } @Override public Connection getConnection() throws SQLException { - return null; + return statement.getConnection(); } @Override public boolean getMoreResults(int current) throws SQLException { - return false; + return statement.getMoreResults(); } @Override public ResultSet getGeneratedKeys() throws SQLException { - return null; + return statement.getGeneratedKeys(); } @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { - return 0; + return statement.executeUpdate(sql,autoGeneratedKeys); } @Override public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { - return 0; + return statement.executeUpdate(sql,columnIndexes); } @Override public int executeUpdate(String sql, String[] columnNames) throws SQLException { - return 0; + return statement.executeUpdate(sql,columnNames); } @Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { - return false; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.execute(sql,autoGeneratedKeys); } @Override public boolean execute(String sql, int[] columnIndexes) throws SQLException { - return false; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.execute(sql,columnIndexes); } @Override public boolean execute(String sql, String[] columnNames) throws SQLException { - return false; + if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){ + refreshSnapshot(); + } + return statement.execute(sql,columnNames); } @Override public int getResultSetHoldability() throws SQLException { - return 0; + return statement.getResultSetHoldability(); } @Override public boolean isClosed() throws SQLException { - return false; + return statement.isClosed(); } @Override public void setPoolable(boolean poolable) throws SQLException { - + statement.setPoolable(poolable); } @Override public boolean isPoolable() throws SQLException { - return false; + return statement.isPoolable(); } @Override public void closeOnCompletion() throws SQLException { - + statement.closeOnCompletion(); } @Override public boolean isCloseOnCompletion() throws SQLException { - return false; + return statement.isCloseOnCompletion(); } @Override public T unwrap(Class iface) throws SQLException { - return null; + return statement.unwrap(iface); } @Override public boolean isWrapperFor(Class iface) throws SQLException { - return false; + return statement.isWrapperFor(iface); } } diff --git a/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java new file mode 100644 index 000000000..f62e713b5 --- /dev/null +++ b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java @@ -0,0 +1,110 @@ +package com.tidb.jdbc; + +import com.mysql.cj.jdbc.ConnectionImpl; +import com.tidb.snapshot.Ticdc; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public class TidbCdcOperate { + + private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID"; + + private static final String TIDB_TICDC_CF_NAME_KEY = "ticdcCFname"; + + private static final String QUERY_TIDB_SNAPSHOT_SQL = + "select `secondary_ts` from `tidb_cdc`.`syncpoint_v1` where `cf` = \"{ticdcCFname}\" order by `primary_ts` desc limit 1"; + + + public ConnectionImpl connection; + + public Ticdc ticdc; + + public TidbCdcOperate(ConnectionImpl connection,Ticdc ticdc){ + this.connection = connection; + this.ticdc = ticdc; + } + + public static TidbCdcOperate of(ConnectionImpl connection,Ticdc ticdc){ + return new TidbCdcOperate(connection,ticdc); + } + + public TidbCdcOperate refreshSnapshot(){ + String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null); + if(useTicdcACID == null){ + return this; + } + if(!"true".equals(useTicdcACID)){ + return this; + } + try { + if(connection != null){ + setSnapshot(); + } + + }catch (SQLException e){ + System.out.println("refreshSnapshot error:"+e.getMessage()); + } + return this; + } + + private String getTidbSnapshotParameter(String key,String defaultValue){ + if(connection == null){ + return defaultValue; + } + String value = this.connection.getProperties().getProperty(key); + if(value == null){ + value = defaultValue; + } + return value; + } + + public TidbCdcOperate setSnapshot() throws SQLException{ + if(this.ticdc.getGlobalSecondaryTs().get() == 0){ + return this; + } + if(this.connection.getSecondaryTs() == 0){ + this.connection.getSession().setSnapshot(""); + String secondaryTs = getSnapshot(); + if(secondaryTs != null){ + Long secondaryTsValue = Long.parseLong(secondaryTs); + this.connection.setSecondaryTs(secondaryTsValue); + this.connection.getSession().setSnapshot(secondaryTs); + } + }else if(this.ticdc.getGlobalSecondaryTs().get() != 0 && this.connection.getSecondaryTs() != this.ticdc.getGlobalSecondaryTs().get()){ + this.connection.getSession().setSnapshot(this.ticdc.getGlobalSecondaryTs().get()+""); + this.connection.setSecondaryTs(this.ticdc.getGlobalSecondaryTs().get()); + } + return this; + } + + public String getSnapshot() throws SQLException{ + String sql = buildTidbSnapshotSql(); + if(sql == null){ + return null; + } + Statement statement =this.connection.createStatement(); + try (final ResultSet resultSet = statement.executeQuery(sql)) { + while (resultSet.next()) { + final String secondaryTs = resultSet.getString("secondary_ts"); + if(secondaryTs != null){ + return secondaryTs; + } + } + } + return null; + } + + public String buildTidbSnapshotSql(){ + String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null); + if(ticdcCFname == null){ + return null; + } + String sql = null; + if(ticdcCFname != null){ + sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname); + } + return sql; + } +} diff --git a/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java new file mode 100644 index 000000000..eb585d821 --- /dev/null +++ b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java @@ -0,0 +1,15 @@ +package com.tidb.jdbc; + +import com.mysql.cj.jdbc.ConnectionImpl; +import com.tidb.snapshot.Ticdc; + +public class TidbCdcWrapper { + + public ConnectionImpl connection; + + public Ticdc ticdc; + + public void refreshSnapshot(){ + TidbCdcOperate.of(connection,ticdc).refreshSnapshot(); + } +} diff --git a/src/main/user-impl/java/com/tidb/snapshot/Monitor.java b/src/main/user-impl/java/com/tidb/snapshot/Monitor.java new file mode 100644 index 000000000..d349a3ee2 --- /dev/null +++ b/src/main/user-impl/java/com/tidb/snapshot/Monitor.java @@ -0,0 +1,116 @@ +package com.tidb.snapshot; + + +import com.mysql.cj.jdbc.ConnectionImpl; +import com.tidb.jdbc.TidbCdcOperate; + +import java.sql.Driver; + +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class Monitor { + private Ticdc ticdc = new Ticdc(); + + private String url; + + private Properties info; + + private AtomicReference conn = new AtomicReference<>(); + + private ScheduledThreadPoolExecutor executor; + + private Driver driver; + + private Lock connLock = new ReentrantLock(); + + private static final AtomicInteger threadId = new AtomicInteger(); + + + public Monitor(Driver driver,String url,Properties info,ScheduledThreadPoolExecutor executor){ + this.driver = driver; + this.url = url; + this.info = info; + this.executor = executor; + createExecutor(); + } + + public Monitor(Driver driver){ + this.driver = driver; + } + + public static Monitor of(Driver driver,String url,Properties info,ScheduledThreadPoolExecutor executor){ + return new Monitor(driver,url,info,executor); + } + + public static Monitor of(Driver driver){ + return new Monitor(driver); + } + + public Monitor setInfo(String url,Properties info){ + this.url = url; + this.info = info; + createExecutor(); + return this; + } + + public void createExecutor(){ + if(this.executor != null){ + return; + } + String executorName = "reload-Thread-" + threadId.getAndIncrement(); + this.executor = + new ScheduledThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + (runnable) -> { + Thread newThread = new Thread(runnable); + newThread.setName(executorName); + newThread.setDaemon(true); + return newThread; + }); + this.executor.setKeepAliveTime(10000, TimeUnit.MILLISECONDS); + this.executor.allowCoreThreadTimeOut(true); + this.executor.scheduleWithFixedDelay( + this::reload, 0, 10000, TimeUnit.MILLISECONDS); + } + + public Ticdc get(){ + return ticdc; + } + + public void reload(){ + if(this.url == null){ + return; + } + if("".equals(this.url)){ + return; + } + try { + if(this.conn.get() == null){ + if(connLock.tryLock()){ + this.conn.set(driver.connect(this.url,this.info)); + connLock.unlock(); + } + } + if(this.conn.get() == null){ + return; + } + String secondaryTs = TidbCdcOperate.of((ConnectionImpl) conn.get(),ticdc).getSnapshot(); + if(secondaryTs != null){ + Long secondaryTsValue = Long.parseLong(secondaryTs); + if(ticdc.getGlobalSecondaryTs().get() != secondaryTsValue){ + this.ticdc.getGlobalSecondaryTs().set(Long.parseLong(secondaryTs)); + this.ticdc.getGloballasttime().set(System.currentTimeMillis()); + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java b/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java new file mode 100644 index 000000000..0f0b226ce --- /dev/null +++ b/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java @@ -0,0 +1,38 @@ +package com.tidb.snapshot; + +import java.util.concurrent.atomic.AtomicLong; + +public class Ticdc { + + private AtomicLong globalSecondaryTs = new AtomicLong(0); + + private AtomicLong globallasttime = new AtomicLong(0); + + private AtomicLong name = new AtomicLong(0); + + + public AtomicLong getGlobalSecondaryTs() { + return globalSecondaryTs; + } + + public void setGlobalSecondaryTs(AtomicLong globalSecondaryTs) { + this.globalSecondaryTs = globalSecondaryTs; + } + + public AtomicLong getGloballasttime() { + return globallasttime; + } + + public void setGloballasttime(AtomicLong globallasttime) { + this.globallasttime = globallasttime; + } + + + public AtomicLong getName() { + return name; + } + + public void setName(AtomicLong name) { + this.name = name; + } +}