Skip to content

Commit

Permalink
Release/8.0 tidb snapshot (mysql#21)
Browse files Browse the repository at this point in the history
* prepare bug fix

* set tidb Snapshot

* set tidb Snapshot

* set tidb Snapshot

* asyn load snapshot

* asyn load snapshot

* monitor load snapshot

* monitor load snapshot
PreparedStatementProxy snapshot
StatementProxy snapshot

* monitor load snapshot
PreparedStatementProxy snapshot
StatementProxy snapshot

Co-authored-by: song <[email protected]>
  • Loading branch information
lastincisor and lastincisor authored Jul 11, 2022
1 parent e47ef17 commit 0469882
Show file tree
Hide file tree
Showing 14 changed files with 946 additions and 148 deletions.
3 changes: 2 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ See also com.mysql.cj.conf.PropertyDefinitions.SYSP_* variables for other test o
value="${com.mysql.cj.build.driver.version.series}.${com.mysql.cj.build.driver.version.subminor}" />
<property name="com.mysql.cj.build.driver.version" value="${com.mysql.cj.build.driver.version.numeric}${com.mysql.cj.build.driver.version.status}" />
<property name="com.mysql.cj.build.driver.version.extra" value="" />
<property name="com.mysql.cj.build.driver.version.snapshot" value="-SNAPSHOT" />
<!-- -SNAPSHOT -->
<property name="com.mysql.cj.build.driver.version.snapshot" value="-TICDC" />
<property name="com.mysql.cj.build.driver.version.full"
value="${com.mysql.cj.build.driver.version}${com.mysql.cj.build.driver.version.extra}${com.mysql.cj.build.driver.version.snapshot}" />

Expand Down
14 changes: 14 additions & 0 deletions src/main/core-api/java/com/mysql/cj/conf/HostInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

package com.mysql.cj.conf;


import com.tidb.snapshot.Ticdc;

import static com.mysql.cj.util.StringUtils.isNullOrEmpty;

import java.util.Collections;
Expand All @@ -55,6 +58,9 @@ public class HostInfo implements DatabaseUrlContainer {
private final int port;
private final String user;
private final String password;

private Ticdc ticdc = new Ticdc();

private final Map<String, String> hostProperties = new HashMap<>();

/**
Expand Down Expand Up @@ -238,4 +244,12 @@ public String toString() {
asStr.append(String.format(" :: {host: \"%s\", port: %d, hostProperties: %s}", this.host, this.port, this.hostProperties));
return asStr.toString();
}

public Ticdc getTicdc() {
return ticdc;
}

public void setTicdc(Ticdc ticdc) {
this.ticdc = ticdc;
}
}
12 changes: 8 additions & 4 deletions src/main/core-impl/java/com/mysql/cj/NativeSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,14 @@ public void setSessionVariables() {
}

public void setSnapshot(String secondaryTs){
StringBuilder query = new StringBuilder("SET @@tidb_snapshot = ");
query.append("\"").append(secondaryTs).append("\"");
//System.out.println("Snapshot-tidb_snapshot:"+query.toString());
this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0);
try {
StringBuilder query = new StringBuilder("SET @@tidb_snapshot = ");
query.append("\"").append(secondaryTs).append("\"");
System.out.println("Snapshot-setSnapshot:"+query.toString());
this.protocol.sendCommand(this.commandBuilder.buildComQuery(null, query.toString()), false, 0);
}catch (Exception e){
System.out.println("setSnapshot error"+e);
}
}

public String getProcessHost() {
Expand Down
3 changes: 0 additions & 3 deletions src/main/user-api/java/com/mysql/cj/jdbc/JdbcConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,4 @@ default boolean isMasterConnection() {
*/
String getDatabase() throws SQLException;

default void refreshSnapshot(){

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,8 @@ protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRo
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
try {
this.connection.refreshSnapshot();

// TODO this.connection.refreshSnapshot();
JdbcConnection locallyScopedConnection = this.connection;

((PreparedQuery) this.query).getQueryBindings()
Expand Down Expand Up @@ -921,7 +922,7 @@ public java.sql.ResultSet executeQuery() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {

JdbcConnection locallyScopedConn = this.connection;
this.connection.refreshSnapshot();
// TODO this.connection.refreshSnapshot();
if (!this.doPingInstead) {
QueryReturnType queryReturnType = getQueryInfo().getQueryReturnType();
if (queryReturnType != QueryReturnType.PRODUCES_RESULT_SET && queryReturnType != QueryReturnType.MAY_PRODUCE_RESULT_SET) {
Expand Down
103 changes: 17 additions & 86 deletions src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@
import java.util.Stack;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -92,6 +89,7 @@
import com.mysql.cj.util.LRUCache;
import com.mysql.cj.util.StringUtils;
import com.mysql.cj.util.Util;
import com.tidb.snapshot.Ticdc;

/**
* A Connection represents a session with a specific database. Within the context of a Connection, SQL statements are executed and results are returned.
Expand Down Expand Up @@ -119,9 +117,9 @@ public class ConnectionImpl implements JdbcConnection, SessionEventListener, Ser

private static final SQLPermission ABORT_PERM = new SQLPermission("abort");

private final AtomicLong ticdcACIDinitValue = new AtomicLong(0);
private AtomicLong secondaryTs = new AtomicLong(0);

private StatementImpl stmt;
private Ticdc ticdc;

@Override
public String getHost() {
Expand All @@ -146,6 +144,14 @@ public void setProxy(JdbcConnection proxy) {
this.realProxy = this.topProxy instanceof MultiHostMySQLConnection ? ((MultiHostMySQLConnection) proxy).getThisAsProxy() : null;
}

public void setSecondaryTs(Long secondaryTs){
this.secondaryTs.set(secondaryTs);
}

public Long getSecondaryTs(){
return secondaryTs.get();
}

// this connection has to be proxied when using multi-host settings so that statements get routed to the right physical connection
// (works as "logical" connection)
private JdbcConnection getProxy() {
Expand Down Expand Up @@ -385,6 +391,7 @@ protected ConnectionImpl() {
public ConnectionImpl(HostInfo hostInfo) throws SQLException {

try {
this.ticdc = hostInfo.getTicdc();
// Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout().
this.origHostInfo = hostInfo;
this.origHostToConnectTo = hostInfo.getHost();
Expand Down Expand Up @@ -769,81 +776,6 @@ private void closeStatement(java.sql.Statement stmt) {
}
}

private String getTidbSnapshotParameter(String key,String defaultValue){
String value = this.props.getProperty(key);
if(value == null){
value = defaultValue;
}
return value;
}

public String buildTidbSnapshotSql(){
String ticdcCFname = getTidbSnapshotParameter(TIDB_TICDC_CF_NAME_KEY,null);
if(ticdcCFname == null){
return null;
}
String sql = null;
if(ticdcCFname != null){
sql = QUERY_TIDB_SNAPSHOT_SQL.replace("{ticdcCFname}",ticdcCFname);
}
return sql;
}

public void refreshSnapshot(){
String useTicdcACID = getTidbSnapshotParameter(TIDB_USE_TICDC_ACID_KEY,null);
if(useTicdcACID == null){
return;
}
if(!"true".equals(useTicdcACID)){
return;
}
String sql = buildTidbSnapshotSql();
if(sql == null){
return;
}
String ticdcACIDInterval = getTidbSnapshotParameter(TIDB_TICDC_ACID_INTERVAL_KEY,"300000");
long setSnapshotTime = System.currentTimeMillis();
//System.out.println("Snapshot-tidb_snapshot-timeout:"+(setSnapshotTime - ticdcACIDinitValue.get() - Long.parseLong(ticdcACIDInterval)));
try {
/*
* init setSnapshot
* */
if(ticdcACIDinitValue.get() == 0){
setSnapshot(true,sql);
ticdcACIDinitValue.set(System.currentTimeMillis());
}else if(setSnapshotTime - ticdcACIDinitValue.get() > Long.parseLong(ticdcACIDInterval)){
/*
* long connection setSnapshot
* */
setSnapshot(false, sql);
ticdcACIDinitValue.set(System.currentTimeMillis());
}
}catch (SQLException e){

}

}


public void setSnapshot(Boolean init,String sql) throws SQLException{
if(!init){
this.session.setSnapshot("");
//String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
//System.out.println("Snapshot-tidb_snapshot-set empty:"+tidb_snapshot);
}
try (final ResultSet resultSet = this.stmt.executeQuery(sql)) {
while (resultSet.next()) {
final String secondaryTs = resultSet.getString("secondary_ts");
//System.out.println("Snapshot-tidb_snapshot-db:"+secondaryTs);
if(secondaryTs != null){
this.session.setSnapshot(secondaryTs);
//String tidb_snapshot = this.session.queryServerVariable("@@tidb_snapshot");
//System.out.println("Snapshot-tidb_snapshot-queryServerVariable:"+tidb_snapshot);
}
}
}
}

@Override
public void commit() throws SQLException {
synchronized (getConnectionMutex()) {
Expand Down Expand Up @@ -1175,9 +1107,8 @@ public java.sql.Statement createStatement(int resultSetType, int resultSetConcur
StatementImpl stmt = new StatementImpl(getMultiHostSafeProxy(), this.database);
stmt.setResultSetType(resultSetType);
stmt.setResultSetConcurrency(resultSetConcurrency);
this.stmt = stmt;
//refreshSnapshot();
return stmt;
StatementProxy proxy = new StatementProxy(this,stmt,ticdc);
return proxy;
}

@Override
Expand Down Expand Up @@ -1729,9 +1660,9 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType
} else {
pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
}
this.stmt = pStmt;
//refreshSnapshot();
return pStmt;

PreparedStatementProxy proxy = new PreparedStatementProxy(this,pStmt,ticdc);
return proxy;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
import static com.mysql.cj.util.StringUtils.isNullOrEmpty;

import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import com.mysql.cj.Constants;
Expand All @@ -52,6 +56,8 @@
import com.mysql.cj.jdbc.ha.LoadBalancedConnectionProxy;
import com.mysql.cj.jdbc.ha.ReplicationConnectionProxy;
import com.mysql.cj.util.StringUtils;
import com.tidb.snapshot.Monitor;
import com.tidb.snapshot.Ticdc;

/**
* The Java SQL framework allows for multiple database drivers. Each driver should supply a class that implements the Driver interface
Expand All @@ -73,6 +79,9 @@
*/
public class NonRegisteringDriver implements java.sql.Driver {


private Monitor monitor;

/*
* Standardizes OS name information to align with other drivers/clients
* for MySQL connection attributes
Expand Down Expand Up @@ -127,6 +136,7 @@ static int getMinorVersionInternal() {
*/
public NonRegisteringDriver() throws SQLException {
// Required for Class.forName().newInstance()
monitor = Monitor.of(this);
}

/**
Expand Down Expand Up @@ -191,8 +201,9 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept
*/
return null;
}

Ticdc ticdc = monitor.setInfo(url,info).get();
ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
conStr.getMainHost().setTicdc(ticdc);
switch (conStr.getType()) {
case SINGLE_CONNECTION:
return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());
Expand Down
Loading

0 comments on commit 0469882

Please sign in to comment.