diff --git a/build.xml b/build.xml
index f4971ec99..45c263314 100644
--- a/build.xml
+++ b/build.xml
@@ -210,7 +210,8 @@ See also com.mysql.cj.conf.PropertyDefinitions.SYSP_* variables for other test o
value="${com.mysql.cj.build.driver.version.series}.${com.mysql.cj.build.driver.version.subminor}" />
-
+
+
diff --git a/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java b/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java
index addcb402c..ad1973c77 100644
--- a/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java
+++ b/src/main/core-api/java/com/mysql/cj/conf/HostInfo.java
@@ -29,6 +29,9 @@
package com.mysql.cj.conf;
+
+import com.tidb.snapshot.Ticdc;
+
import static com.mysql.cj.util.StringUtils.isNullOrEmpty;
import java.util.Collections;
@@ -55,6 +58,9 @@ public class HostInfo implements DatabaseUrlContainer {
private final int port;
private final String user;
private final String password;
+
+ private Ticdc ticdc = new Ticdc();
+
private final Map hostProperties = new HashMap<>();
/**
@@ -238,4 +244,12 @@ public String toString() {
asStr.append(String.format(" :: {host: \"%s\", port: %d, hostProperties: %s}", this.host, this.port, this.hostProperties));
return asStr.toString();
}
+
+ public Ticdc getTicdc() {
+ return ticdc;
+ }
+
+ public void setTicdc(Ticdc ticdc) {
+ this.ticdc = ticdc;
+ }
}
diff --git a/src/main/core-impl/java/com/mysql/cj/NativeSession.java b/src/main/core-impl/java/com/mysql/cj/NativeSession.java
index 9ac3fef63..72bd24b06 100644
--- a/src/main/core-impl/java/com/mysql/cj/NativeSession.java
+++ b/src/main/core-impl/java/com/mysql/cj/NativeSession.java
@@ -516,10 +516,14 @@ public void setSessionVariables() {
}
public void setSnapshot(String secondaryTs){
- StringBuilder query = new StringBuilder("SET @@tidb_snapshot = ");
- query.append("\"").append(secondaryTs).append("\"");
- //System.out.println("Snapshot-tidb_snapshot:"+query.toString());
- this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0);
+ try {
+ StringBuilder query = new StringBuilder("SET @@tidb_snapshot = ");
+ query.append("\"").append(secondaryTs).append("\"");
+ System.out.println("Snapshot-setSnapshot:"+query.toString());
+ this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0);
+ }catch (Exception e){
+ System.out.println("setSnapshot error"+e);
+ }
}
public String getProcessHost() {
diff --git a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java
index 1ac08d0eb..c83ad9807 100644
--- a/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java
+++ b/src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java
@@ -614,7 +614,4 @@ default boolean isMasterConnection() {
*/
String getDatabase() throws SQLException;
- default void refreshSnapshot(){
-
- };
}
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java
index ca47900f9..be551c5c5 100644
--- a/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ClientPreparedStatement.java
@@ -873,7 +873,8 @@ protected ResultSetInternalMethods executeInternal(int maxRo
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
try {
- this.connection.refreshSnapshot();
+
+ // TODO this.connection.refreshSnapshot();
JdbcConnection locallyScopedConnection = this.connection;
((PreparedQuery) this.query).getQueryBindings()
@@ -921,7 +922,7 @@ public java.sql.ResultSet executeQuery() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
JdbcConnection locallyScopedConn = this.connection;
- this.connection.refreshSnapshot();
+ // TODO this.connection.refreshSnapshot();
if (!this.doPingInstead) {
QueryReturnType queryReturnType = getQueryInfo().getQueryReturnType();
if (queryReturnType != QueryReturnType.PRODUCES_RESULT_SET && queryReturnType != QueryReturnType.MAY_PRODUCE_RESULT_SET) {
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java
index 44855e0a7..20a469ee7 100644
--- a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java
@@ -46,9 +46,6 @@
import java.util.Stack;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -92,6 +89,7 @@
import com.mysql.cj.util.LRUCache;
import com.mysql.cj.util.StringUtils;
import com.mysql.cj.util.Util;
+import com.tidb.snapshot.Ticdc;
/**
* A Connection represents a session with a specific database. Within the context of a Connection, SQL statements are executed and results are returned.
@@ -119,9 +117,9 @@ public class ConnectionImpl implements JdbcConnection, SessionEventListener, Ser
private static final SQLPermission ABORT_PERM = new SQLPermission("abort");
- private final AtomicLong ticdcACIDinitValue = new AtomicLong(0);
+ private AtomicLong secondaryTs = new AtomicLong(0);
- private StatementImpl stmt;
+ private Ticdc ticdc;
@Override
public String getHost() {
@@ -146,6 +144,14 @@ public void setProxy(JdbcConnection proxy) {
this.realProxy = this.topProxy instanceof MultiHostMySQLConnection ? ((MultiHostMySQLConnection) proxy).getThisAsProxy() : null;
}
+ public void setSecondaryTs(Long secondaryTs){
+ this.secondaryTs.set(secondaryTs);
+ }
+
+ public Long getSecondaryTs(){
+ return secondaryTs.get();
+ }
+
// this connection has to be proxied when using multi-host settings so that statements get routed to the right physical connection
// (works as "logical" connection)
private JdbcConnection getProxy() {
@@ -385,6 +391,7 @@ protected ConnectionImpl() {
public ConnectionImpl(HostInfo hostInfo) throws SQLException {
try {
+ this.ticdc = hostInfo.getTicdc();
// Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout().
this.origHostInfo = hostInfo;
this.origHostToConnectTo = hostInfo.getHost();
@@ -769,81 +776,6 @@ private void closeStatement(java.sql.Statement stmt) {
}
}
- private String getTidbSnapshotParameter(String key,String defaultValue){
- String value = this.props.getProperty(key);
- if(value == null){
- value = defaultValue;
- }
- return value;
- }
-
- public String buildTidbSnapshotSql(){
- String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null);
- if(ticdcCFname == null){
- return null;
- }
- String sql = null;
- if(ticdcCFname != null){
- sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname);
- }
- return sql;
- }
-
- public void refreshSnapshot(){
- String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null);
- if(useTicdcACID == null){
- return;
- }
- if(!"true".equals(useTicdcACID)){
- return;
- }
- String sql = buildTidbSnapshotSql();
- if(sql == null){
- return;
- }
- String ticdcACIDInterval = getTidbSnapshotParameter(TIDB_TICDC_ACID_INTERVAL_KEY,"300000");
- long setSnapshotTime = System.currentTimeMillis();
- //System.out.println("Snapshot-tidb_snapshot-timeout:"+(setSnapshotTime - ticdcACIDinitValue.get() - Long.parseLong(ticdcACIDInterval)));
- try {
- /*
- * init setSnapshot
- * */
- if(ticdcACIDinitValue.get() == 0){
- setSnapshot(true,sql);
- ticdcACIDinitValue.set(System.currentTimeMillis());
- }else if(setSnapshotTime - ticdcACIDinitValue.get() > Long.parseLong(ticdcACIDInterval)){
- /*
- * long connection setSnapshot
- * */
- setSnapshot(false, sql);
- ticdcACIDinitValue.set(System.currentTimeMillis());
- }
- }catch (SQLException e){
-
- }
-
- }
-
-
- public void setSnapshot(Boolean init,String sql) throws SQLException{
- if(!init){
- this.session.setSnapshot("");
- //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
- //System.out.println("Snapshot-tidb_snapshot-set empty:"+tidb_snapshot);
- }
- try (final ResultSet resultSet = this.stmt.executeQuery(sql)) {
- while (resultSet.next()) {
- final String secondaryTs = resultSet.getString("secondary_ts");
- //System.out.println("Snapshot-tidb_snapshot-db:"+secondaryTs);
- if(secondaryTs != null){
- this.session.setSnapshot(secondaryTs);
- //String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
- //System.out.println("Snapshot-tidb_snapshot-queryServerVariable:"+tidb_snapshot);
- }
- }
- }
- }
-
@Override
public void commit() throws SQLException {
synchronized (getConnectionMutex()) {
@@ -1175,9 +1107,8 @@ public java.sql.Statement createStatement(int resultSetType, int resultSetConcur
StatementImpl stmt = new StatementImpl(getMultiHostSafeProxy(), this.database);
stmt.setResultSetType(resultSetType);
stmt.setResultSetConcurrency(resultSetConcurrency);
- this.stmt = stmt;
- //refreshSnapshot();
- return stmt;
+ StatementProxy proxy = new StatementProxy(this,stmt,ticdc);
+ return proxy;
}
@Override
@@ -1729,9 +1660,9 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType
} else {
pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
}
- this.stmt = pStmt;
- //refreshSnapshot();
- return pStmt;
+
+ PreparedStatementProxy proxy = new PreparedStatementProxy(this,pStmt,ticdc);
+ return proxy;
}
}
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java b/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java
index 9dd632523..15940179d 100644
--- a/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/NonRegisteringDriver.java
@@ -32,10 +32,14 @@
import static com.mysql.cj.util.StringUtils.isNullOrEmpty;
import java.sql.DriverPropertyInfo;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import com.mysql.cj.Constants;
@@ -52,6 +56,8 @@
import com.mysql.cj.jdbc.ha.LoadBalancedConnectionProxy;
import com.mysql.cj.jdbc.ha.ReplicationConnectionProxy;
import com.mysql.cj.util.StringUtils;
+import com.tidb.snapshot.Monitor;
+import com.tidb.snapshot.Ticdc;
/**
* The Java SQL framework allows for multiple database drivers. Each driver should supply a class that implements the Driver interface
@@ -73,6 +79,9 @@
*/
public class NonRegisteringDriver implements java.sql.Driver {
+
+ private Monitor monitor;
+
/*
* Standardizes OS name information to align with other drivers/clients
* for MySQL connection attributes
@@ -127,6 +136,7 @@ static int getMinorVersionInternal() {
*/
public NonRegisteringDriver() throws SQLException {
// Required for Class.forName().newInstance()
+ monitor = Monitor.of(this);
}
/**
@@ -191,8 +201,9 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept
*/
return null;
}
-
+ Ticdc ticdc = monitor.setInfo(url,info).get();
ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
+ conStr.getMainHost().setTicdc(ticdc);
switch (conStr.getType()) {
case SINGLE_CONNECTION:
return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java b/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java
new file mode 100644
index 000000000..d747d0de9
--- /dev/null
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/PreparedStatementProxy.java
@@ -0,0 +1,529 @@
+package com.mysql.cj.jdbc;
+
+import com.tidb.jdbc.TidbCdcOperate;
+import com.tidb.jdbc.TidbCdcWrapper;
+import com.tidb.snapshot.Ticdc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.*;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.NClob;
+import java.util.Calendar;
+
+public class PreparedStatementProxy extends TidbCdcWrapper implements PreparedStatement {
+
+ private PreparedStatement preparedStatement;
+
+ private ConnectionImpl connection;
+
+ public PreparedStatementProxy(ConnectionImpl connection,PreparedStatement preparedStatement, Ticdc ticdc) {
+ this.connection = connection;
+ this.preparedStatement = preparedStatement;
+ super.connection = connection;
+ super.ticdc = ticdc;
+ }
+
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ refreshSnapshot();
+ return preparedStatement.executeQuery();
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ return preparedStatement.executeUpdate();
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+ preparedStatement.setNull( parameterIndex, sqlType);
+ }
+
+ @Override
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ preparedStatement.setBoolean( parameterIndex, x);
+ }
+
+ @Override
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+ preparedStatement.setByte( parameterIndex, x);
+ }
+
+ @Override
+ public void setShort(int parameterIndex, short x) throws SQLException {
+ preparedStatement.setShort( parameterIndex, x);
+ }
+
+ @Override
+ public void setInt(int parameterIndex, int x) throws SQLException {
+ preparedStatement.setInt( parameterIndex, x);
+ }
+
+ @Override
+ public void setLong(int parameterIndex, long x) throws SQLException {
+ preparedStatement.setLong( parameterIndex, x);
+ }
+
+ @Override
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+ preparedStatement.setFloat( parameterIndex, x);
+ }
+
+ @Override
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+ preparedStatement.setDouble( parameterIndex, x);
+ }
+
+ @Override
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+ preparedStatement.setBigDecimal( parameterIndex, x);
+ }
+
+ @Override
+ public void setString(int parameterIndex, String x) throws SQLException {
+ preparedStatement.setString( parameterIndex, x);
+ }
+
+ @Override
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+ preparedStatement.setBytes( parameterIndex, x);
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+ preparedStatement.setDate( parameterIndex, x);
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+ preparedStatement.setTime( parameterIndex, x);
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+ preparedStatement.setTimestamp( parameterIndex, x);
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ preparedStatement.setAsciiStream( parameterIndex, x, length);
+ }
+
+ @Override
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ preparedStatement.setUnicodeStream( parameterIndex, x, length);
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ preparedStatement.setBinaryStream( parameterIndex, x, length);
+ }
+
+ @Override
+ public void clearParameters() throws SQLException {
+ preparedStatement.clearParameters();
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+ preparedStatement.setObject( parameterIndex, x, targetSqlType);
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+ preparedStatement.setObject( parameterIndex, x);
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ refreshSnapshot();
+ return preparedStatement.execute();
+ }
+
+ @Override
+ public void addBatch() throws SQLException {
+ preparedStatement.addBatch();
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+ preparedStatement.setCharacterStream( parameterIndex, reader, length);
+ }
+
+ @Override
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+ preparedStatement.setRef( parameterIndex, x);
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+ preparedStatement.setBlob( parameterIndex, x);
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+ preparedStatement.setClob( parameterIndex, x);
+ }
+
+ @Override
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+ preparedStatement.setArray( parameterIndex, x);
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ return preparedStatement.getMetaData() ;
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+ preparedStatement.setDate( parameterIndex, x, cal);
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+ preparedStatement.setTime( parameterIndex, x, cal);
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+ preparedStatement.setTimestamp( parameterIndex, x, cal);
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+ preparedStatement.setNull( parameterIndex, sqlType, typeName);
+ }
+
+ @Override
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+ preparedStatement.setURL( parameterIndex, x);
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ return preparedStatement.getParameterMetaData();
+ }
+
+ @Override
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+ preparedStatement.setRowId( parameterIndex, x);
+ }
+
+ @Override
+ public void setNString(int parameterIndex, String value) throws SQLException {
+ preparedStatement.setNString( parameterIndex, value);
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+ preparedStatement.setNCharacterStream( parameterIndex, value, length);
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {
+ preparedStatement.setNClob( parameterIndex, value);
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ preparedStatement.setClob( parameterIndex, reader, length);
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+ preparedStatement.setBlob( parameterIndex, inputStream, length);
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ preparedStatement.setNClob( parameterIndex, reader, length);
+ }
+
+ @Override
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+ preparedStatement.setSQLXML( parameterIndex, xmlObject);
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+ preparedStatement.setObject( parameterIndex, x, targetSqlType, scaleOrLength);
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ preparedStatement.setAsciiStream( parameterIndex, x, length);
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ preparedStatement.setBinaryStream( parameterIndex, x, length);
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+ preparedStatement.setCharacterStream( parameterIndex, reader, length);
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+ preparedStatement.setAsciiStream( parameterIndex, x);
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+ preparedStatement.setBinaryStream( parameterIndex, x);
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+ preparedStatement.setCharacterStream( parameterIndex, reader);
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+ preparedStatement.setNCharacterStream( parameterIndex, value);
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {
+ preparedStatement.setClob( parameterIndex, reader);
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+ preparedStatement.setBlob( parameterIndex, inputStream);
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+ preparedStatement.setNClob( parameterIndex, reader);
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return preparedStatement.executeQuery( sql);
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ return preparedStatement.executeUpdate( sql);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ preparedStatement.close();
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ return preparedStatement.getMaxFieldSize();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ preparedStatement.setMaxFieldSize( max);
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ return preparedStatement.getMaxRows();
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ preparedStatement.setMaxRows( max);
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ preparedStatement.setEscapeProcessing( enable);
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return preparedStatement.getQueryTimeout();
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ preparedStatement.setQueryTimeout( seconds);
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ preparedStatement.cancel();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return preparedStatement.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ preparedStatement.clearWarnings();
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ preparedStatement.setCursorName( name);
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ return preparedStatement.execute( sql);
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ return preparedStatement.getResultSet();
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ return preparedStatement.getUpdateCount();
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return preparedStatement.getMoreResults();
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ preparedStatement.setFetchDirection( direction);
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return preparedStatement.getFetchDirection();
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ preparedStatement.setFetchSize( rows);
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return preparedStatement.getFetchSize();
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return preparedStatement.getResultSetConcurrency();
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return preparedStatement.getResultSetType();
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ preparedStatement.addBatch( sql);
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ preparedStatement.clearBatch();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ return preparedStatement.executeBatch();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return preparedStatement.getConnection();
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ return preparedStatement.getMoreResults( current) ;
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ return preparedStatement.getGeneratedKeys();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return preparedStatement.executeUpdate( sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return preparedStatement.executeUpdate( sql,columnIndexes);
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ return preparedStatement.executeUpdate( sql, columnNames);
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ refreshSnapshot();
+ return preparedStatement.execute(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ refreshSnapshot();
+ return preparedStatement.execute( sql, columnIndexes);
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ refreshSnapshot();
+ return preparedStatement.execute( sql, columnNames);
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ return preparedStatement.getResultSetHoldability();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return preparedStatement.isClosed();
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ preparedStatement.setPoolable( poolable);
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ return preparedStatement.isPoolable();
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ preparedStatement.closeOnCompletion();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ return preparedStatement.isCloseOnCompletion();
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ return preparedStatement.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return preparedStatement.isWrapperFor(iface);
+ }
+}
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java
index 74b159907..462b89055 100644
--- a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java
@@ -652,7 +652,7 @@ private boolean executeInternal(String sql, boolean returnGeneratedKeys) throws
JdbcConnection locallyScopedConn = checkClosed();
synchronized (locallyScopedConn.getConnectionMutex()) {
- this.connection.refreshSnapshot();
+ // TODO this.connection.refreshSnapshot();
checkClosed();
checkNullOrEmptyQuery(sql);
@@ -800,7 +800,7 @@ protected long[] executeBatchInternal() throws SQLException {
JdbcConnection locallyScopedConn = checkClosed();
synchronized (locallyScopedConn.getConnectionMutex()) {
- this.connection.refreshSnapshot();
+ // TODO this.connection.refreshSnapshot();
if (locallyScopedConn.isReadOnly()) {
throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"),
MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());
@@ -1107,9 +1107,9 @@ protected SQLException handleExceptionForBatch(int endOfBatchIndex, int numValue
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
JdbcConnection locallyScopedConn = this.connection;
- if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ /*** TODO if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
this.connection.refreshSnapshot();
- }
+ }**/
this.retrieveGeneratedKeys = false;
checkNullOrEmptyQuery(sql);
@@ -1253,7 +1253,7 @@ public int executeUpdate(String sql) throws SQLException {
protected long executeUpdateInternal(String sql, boolean isBatch, boolean returnGeneratedKeys) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
JdbcConnection locallyScopedConn = this.connection;
- this.connection.refreshSnapshot();
+ //TODO this.connection.refreshSnapshot();
checkNullOrEmptyQuery(sql);
resetCancelledState();
diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java
index 3f540e7e9..b58740d9f 100644
--- a/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java
+++ b/src/main/user-impl/java/com/mysql/cj/jdbc/StatementProxy.java
@@ -1,232 +1,263 @@
package com.mysql.cj.jdbc;
+
+import com.tidb.jdbc.TidbCdcWrapper;
+import com.tidb.snapshot.Ticdc;
+
import java.sql.*;
-public class StatementProxy implements Statement {
+public class StatementProxy extends TidbCdcWrapper implements Statement {
+
+ private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID";
private Statement statement;
- public StatementProxy(Statement statement) {
+ public ConnectionImpl connection;
+
+ public StatementProxy(ConnectionImpl connection,Statement statement, Ticdc ticdc) {
this.statement = statement;
+ this.connection = connection;
+ super.connection = connection;
+ super.ticdc = ticdc;
}
+
+
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- return null;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.executeQuery(sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
- return 0;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.executeUpdate(sql);
}
@Override
public void close() throws SQLException {
-
+ statement.close();
}
@Override
public int getMaxFieldSize() throws SQLException {
- return 0;
+ return statement.getMaxFieldSize();
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
-
+ statement.setMaxFieldSize(max);
}
@Override
public int getMaxRows() throws SQLException {
- return 0;
+ return statement.getMaxRows();
}
@Override
public void setMaxRows(int max) throws SQLException {
-
+ statement.setMaxRows(max);
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
-
+ statement.setEscapeProcessing(enable);
}
@Override
public int getQueryTimeout() throws SQLException {
- return 0;
+ return statement.getQueryTimeout();
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
-
+ statement.setQueryTimeout(seconds);
}
@Override
public void cancel() throws SQLException {
-
+ statement.cancel();
}
@Override
public SQLWarning getWarnings() throws SQLException {
- return null;
+ return statement.getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
-
+ statement.clearWarnings();
}
@Override
public void setCursorName(String name) throws SQLException {
-
+ statement.setCursorName(name);
}
@Override
public boolean execute(String sql) throws SQLException {
- return false;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.execute(sql);
}
@Override
public ResultSet getResultSet() throws SQLException {
- return null;
+ return statement.getResultSet();
}
@Override
public int getUpdateCount() throws SQLException {
- return 0;
+ return statement.getUpdateCount();
}
@Override
public boolean getMoreResults() throws SQLException {
- return false;
+ return statement.getMoreResults();
}
@Override
public void setFetchDirection(int direction) throws SQLException {
-
+ statement.setFetchDirection(direction);
}
@Override
public int getFetchDirection() throws SQLException {
- return 0;
+ return statement.getFetchDirection();
}
@Override
public void setFetchSize(int rows) throws SQLException {
-
+ statement.setFetchSize(rows);
}
@Override
public int getFetchSize() throws SQLException {
- return 0;
+ return statement.getFetchSize();
}
@Override
public int getResultSetConcurrency() throws SQLException {
- return 0;
+ return statement.getResultSetConcurrency();
}
@Override
public int getResultSetType() throws SQLException {
- return 0;
+ return statement.getResultSetType();
}
@Override
public void addBatch(String sql) throws SQLException {
-
+ statement.addBatch(sql);
}
@Override
public void clearBatch() throws SQLException {
-
+ statement.clearBatch();
}
@Override
public int[] executeBatch() throws SQLException {
- return new int[0];
+ return statement.executeBatch();
}
@Override
public Connection getConnection() throws SQLException {
- return null;
+ return statement.getConnection();
}
@Override
public boolean getMoreResults(int current) throws SQLException {
- return false;
+ return statement.getMoreResults();
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
- return null;
+ return statement.getGeneratedKeys();
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- return 0;
+ return statement.executeUpdate(sql,autoGeneratedKeys);
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- return 0;
+ return statement.executeUpdate(sql,columnIndexes);
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- return 0;
+ return statement.executeUpdate(sql,columnNames);
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- return false;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.execute(sql,autoGeneratedKeys);
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- return false;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.execute(sql,columnIndexes);
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
- return false;
+ if(!sql.contains("`tidb_cdc`.`syncpoint_v1`")){
+ refreshSnapshot();
+ }
+ return statement.execute(sql,columnNames);
}
@Override
public int getResultSetHoldability() throws SQLException {
- return 0;
+ return statement.getResultSetHoldability();
}
@Override
public boolean isClosed() throws SQLException {
- return false;
+ return statement.isClosed();
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
-
+ statement.setPoolable(poolable);
}
@Override
public boolean isPoolable() throws SQLException {
- return false;
+ return statement.isPoolable();
}
@Override
public void closeOnCompletion() throws SQLException {
-
+ statement.closeOnCompletion();
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
- return false;
+ return statement.isCloseOnCompletion();
}
@Override
public T unwrap(Class iface) throws SQLException {
- return null;
+ return statement.unwrap(iface);
}
@Override
public boolean isWrapperFor(Class> iface) throws SQLException {
- return false;
+ return statement.isWrapperFor(iface);
}
}
diff --git a/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java
new file mode 100644
index 000000000..f62e713b5
--- /dev/null
+++ b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcOperate.java
@@ -0,0 +1,110 @@
+package com.tidb.jdbc;
+
+import com.mysql.cj.jdbc.ConnectionImpl;
+import com.tidb.snapshot.Ticdc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TidbCdcOperate {
+
+ private static final String TIDB_USE_TICDC_ACID_KEY = "useTicdcACID";
+
+ private static final String TIDB_TICDC_CF_NAME_KEY = "ticdcCFname";
+
+ private static final String QUERY_TIDB_SNAPSHOT_SQL =
+ "select `secondary_ts` from `tidb_cdc`.`syncpoint_v1` where `cf` = \"{ticdcCFname}\" order by `primary_ts` desc limit 1";
+
+
+ public ConnectionImpl connection;
+
+ public Ticdc ticdc;
+
+ public TidbCdcOperate(ConnectionImpl connection,Ticdc ticdc){
+ this.connection = connection;
+ this.ticdc = ticdc;
+ }
+
+ public static TidbCdcOperate of(ConnectionImpl connection,Ticdc ticdc){
+ return new TidbCdcOperate(connection,ticdc);
+ }
+
+ public TidbCdcOperate refreshSnapshot(){
+ String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null);
+ if(useTicdcACID == null){
+ return this;
+ }
+ if(!"true".equals(useTicdcACID)){
+ return this;
+ }
+ try {
+ if(connection != null){
+ setSnapshot();
+ }
+
+ }catch (SQLException e){
+ System.out.println("refreshSnapshot error:"+e.getMessage());
+ }
+ return this;
+ }
+
+ private String getTidbSnapshotParameter(String key,String defaultValue){
+ if(connection == null){
+ return defaultValue;
+ }
+ String value = this.connection.getProperties().getProperty(key);
+ if(value == null){
+ value = defaultValue;
+ }
+ return value;
+ }
+
+ public TidbCdcOperate setSnapshot() throws SQLException{
+ if(this.ticdc.getGlobalSecondaryTs().get() == 0){
+ return this;
+ }
+ if(this.connection.getSecondaryTs() == 0){
+ this.connection.getSession().setSnapshot("");
+ String secondaryTs = getSnapshot();
+ if(secondaryTs != null){
+ Long secondaryTsValue = Long.parseLong(secondaryTs);
+ this.connection.setSecondaryTs(secondaryTsValue);
+ this.connection.getSession().setSnapshot(secondaryTs);
+ }
+ }else if(this.ticdc.getGlobalSecondaryTs().get() != 0 && this.connection.getSecondaryTs() != this.ticdc.getGlobalSecondaryTs().get()){
+ this.connection.getSession().setSnapshot(this.ticdc.getGlobalSecondaryTs().get()+"");
+ this.connection.setSecondaryTs(this.ticdc.getGlobalSecondaryTs().get());
+ }
+ return this;
+ }
+
+ public String getSnapshot() throws SQLException{
+ String sql = buildTidbSnapshotSql();
+ if(sql == null){
+ return null;
+ }
+ Statement statement =this.connection.createStatement();
+ try (final ResultSet resultSet = statement.executeQuery(sql)) {
+ while (resultSet.next()) {
+ final String secondaryTs = resultSet.getString("secondary_ts");
+ if(secondaryTs != null){
+ return secondaryTs;
+ }
+ }
+ }
+ return null;
+ }
+
+ public String buildTidbSnapshotSql(){
+ String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null);
+ if(ticdcCFname == null){
+ return null;
+ }
+ String sql = null;
+ if(ticdcCFname != null){
+ sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname);
+ }
+ return sql;
+ }
+}
diff --git a/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java
new file mode 100644
index 000000000..eb585d821
--- /dev/null
+++ b/src/main/user-impl/java/com/tidb/jdbc/TidbCdcWrapper.java
@@ -0,0 +1,15 @@
+package com.tidb.jdbc;
+
+import com.mysql.cj.jdbc.ConnectionImpl;
+import com.tidb.snapshot.Ticdc;
+
+public class TidbCdcWrapper {
+
+ public ConnectionImpl connection;
+
+ public Ticdc ticdc;
+
+ public void refreshSnapshot(){
+ TidbCdcOperate.of(connection,ticdc).refreshSnapshot();
+ }
+}
diff --git a/src/main/user-impl/java/com/tidb/snapshot/Monitor.java b/src/main/user-impl/java/com/tidb/snapshot/Monitor.java
new file mode 100644
index 000000000..d349a3ee2
--- /dev/null
+++ b/src/main/user-impl/java/com/tidb/snapshot/Monitor.java
@@ -0,0 +1,116 @@
+package com.tidb.snapshot;
+
+
+import com.mysql.cj.jdbc.ConnectionImpl;
+import com.tidb.jdbc.TidbCdcOperate;
+
+import java.sql.Driver;
+
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Monitor {
+ private Ticdc ticdc = new Ticdc();
+
+ private String url;
+
+ private Properties info;
+
+ private AtomicReference conn = new AtomicReference<>();
+
+ private ScheduledThreadPoolExecutor executor;
+
+ private Driver driver;
+
+ private Lock connLock = new ReentrantLock();
+
+ private static final AtomicInteger threadId = new AtomicInteger();
+
+
+ public Monitor(Driver driver,String url,Properties info,ScheduledThreadPoolExecutor executor){
+ this.driver = driver;
+ this.url = url;
+ this.info = info;
+ this.executor = executor;
+ createExecutor();
+ }
+
+ public Monitor(Driver driver){
+ this.driver = driver;
+ }
+
+ public static Monitor of(Driver driver,String url,Properties info,ScheduledThreadPoolExecutor executor){
+ return new Monitor(driver,url,info,executor);
+ }
+
+ public static Monitor of(Driver driver){
+ return new Monitor(driver);
+ }
+
+ public Monitor setInfo(String url,Properties info){
+ this.url = url;
+ this.info = info;
+ createExecutor();
+ return this;
+ }
+
+ public void createExecutor(){
+ if(this.executor != null){
+ return;
+ }
+ String executorName = "reload-Thread-" + threadId.getAndIncrement();
+ this.executor =
+ new ScheduledThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ (runnable) -> {
+ Thread newThread = new Thread(runnable);
+ newThread.setName(executorName);
+ newThread.setDaemon(true);
+ return newThread;
+ });
+ this.executor.setKeepAliveTime(10000, TimeUnit.MILLISECONDS);
+ this.executor.allowCoreThreadTimeOut(true);
+ this.executor.scheduleWithFixedDelay(
+ this::reload, 0, 10000, TimeUnit.MILLISECONDS);
+ }
+
+ public Ticdc get(){
+ return ticdc;
+ }
+
+ public void reload(){
+ if(this.url == null){
+ return;
+ }
+ if("".equals(this.url)){
+ return;
+ }
+ try {
+ if(this.conn.get() == null){
+ if(connLock.tryLock()){
+ this.conn.set(driver.connect(this.url,this.info));
+ connLock.unlock();
+ }
+ }
+ if(this.conn.get() == null){
+ return;
+ }
+ String secondaryTs = TidbCdcOperate.of((ConnectionImpl) conn.get(),ticdc).getSnapshot();
+ if(secondaryTs != null){
+ Long secondaryTsValue = Long.parseLong(secondaryTs);
+ if(ticdc.getGlobalSecondaryTs().get() != secondaryTsValue){
+ this.ticdc.getGlobalSecondaryTs().set(Long.parseLong(secondaryTs));
+ this.ticdc.getGloballasttime().set(System.currentTimeMillis());
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java b/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java
new file mode 100644
index 000000000..0f0b226ce
--- /dev/null
+++ b/src/main/user-impl/java/com/tidb/snapshot/Ticdc.java
@@ -0,0 +1,38 @@
+package com.tidb.snapshot;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Ticdc {
+
+ private AtomicLong globalSecondaryTs = new AtomicLong(0);
+
+ private AtomicLong globallasttime = new AtomicLong(0);
+
+ private AtomicLong name = new AtomicLong(0);
+
+
+ public AtomicLong getGlobalSecondaryTs() {
+ return globalSecondaryTs;
+ }
+
+ public void setGlobalSecondaryTs(AtomicLong globalSecondaryTs) {
+ this.globalSecondaryTs = globalSecondaryTs;
+ }
+
+ public AtomicLong getGloballasttime() {
+ return globallasttime;
+ }
+
+ public void setGloballasttime(AtomicLong globallasttime) {
+ this.globallasttime = globallasttime;
+ }
+
+
+ public AtomicLong getName() {
+ return name;
+ }
+
+ public void setName(AtomicLong name) {
+ this.name = name;
+ }
+}