Skip to content

Commit

Permalink
use obconnector-j & add jdbc properties
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 4, 2023
1 parent 22f68d3 commit f7ee5c1
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.table;
package com.ververica.cdc.debezium.utils;

import java.util.Map;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
18 changes: 6 additions & 12 deletions flink-connector-oceanbase-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,16 @@ under the License.

<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logproxy-client</artifactId>
<groupId>com.oceanbase</groupId>
<artifactId>oblogclient-logproxy</artifactId>
<version>${oblogclient.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>

<!-- The MySQL JDBC driver for reading snapshot-->
<!-- OceanBase JDBC Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>${obclient.version}</version>
</dependency>

<!-- test dependencies on Flink -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -56,6 +57,7 @@ public static class Builder<T> {
// snapshot reading config
private String hostname;
private Integer port;
private Properties jdbcProperties;

// incremental reading config
private String logProxyHost;
Expand Down Expand Up @@ -123,6 +125,11 @@ public Builder<T> port(int port) {
return this;
}

public Builder<T> jdbcProperties(Properties jdbcProperties) {
this.jdbcProperties = jdbcProperties;
return this;
}

public Builder<T> logProxyHost(String logProxyHost) {
this.logProxyHost = logProxyHost;
return this;
Expand Down Expand Up @@ -231,7 +238,7 @@ public SourceFunction<T> build() {
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

return new OceanBaseRichSourceFunction<T>(
return new OceanBaseRichSourceFunction<>(
StartupMode.INITIAL.equals(startupMode),
username,
password,
Expand All @@ -240,8 +247,10 @@ public SourceFunction<T> build() {
tableName,
tableList,
connectTimeout,
serverTimeZone,
hostname,
port,
jdbcProperties,
logProxyHost,
logProxyPort,
clientConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,161 @@

package com.ververica.cdc.connectors.oceanbase.source;

import org.apache.flink.util.FlinkRuntimeException;

import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/** {@link JdbcConnection} extension to be used with OceanBase server. */
public class OceanBaseConnection extends JdbcConnection {

protected static final String URL_PATTERN =
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull&connectTimeout=${connectTimeout}";
protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
private static final String DRIVER_CLASS_NAME = "com.oceanbase.jdbc.Driver";
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
private static final String JDBC_URL_PATTERN =
"jdbc:oceanbase://${hostname}:${port}/?connectTimeout=${connectTimeout}&serverTimezone=${serverTimezone}";

private String databaseMode;

public OceanBaseConnection(
String hostname,
Integer port,
String user,
String password,
Duration timeout,
String serverTimeZone,
Properties jdbcProperties,
ClassLoader classLoader) {
super(config(hostname, port, user, password, timeout), factory(classLoader));
super(
config(hostname, port, user, password, timeout, serverTimeZone),
factory(jdbcProperties, classLoader));
}

public static Configuration config(
String hostname, Integer port, String user, String password, Duration timeout) {
private static Configuration config(
String hostname,
Integer port,
String user,
String password,
Duration timeout,
String serverTimeZone) {
return Configuration.create()
.with("hostname", hostname)
.with("port", port)
.with("user", user)
.with("password", password)
.with("connectTimeout", timeout == null ? 30000 : timeout.toMillis())
.with("serverTimezone", serverTimeZone)
.build();
}

public static JdbcConnection.ConnectionFactory factory(ClassLoader classLoader) {
return JdbcConnection.patternBasedFactory(URL_PATTERN, DRIVER_CLASS_NAME, classLoader);
private static String formatJdbcUrl(Properties jdbcProperties) {
Properties combinedProperties = new Properties();
combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
if (jdbcProperties != null) {
combinedProperties.putAll(jdbcProperties);
}
StringBuilder jdbcUrlStringBuilder = new StringBuilder(JDBC_URL_PATTERN);
combinedProperties.forEach(
(key, value) -> {
jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
});
return jdbcUrlStringBuilder.toString();
}

private static Properties initializeDefaultJdbcProperties() {
Properties defaultJdbcProperties = new Properties();
defaultJdbcProperties.setProperty("useInformationSchema", "true");
defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false");
defaultJdbcProperties.setProperty("useUnicode", "true");
defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
return defaultJdbcProperties;
}

private static JdbcConnection.ConnectionFactory factory(
Properties jdbcProperties, ClassLoader classLoader) {
return JdbcConnection.patternBasedFactory(
formatJdbcUrl(jdbcProperties), DRIVER_CLASS_NAME, classLoader);
}

/**
* Get the database mode of this connection, should be 'MySQL' or 'Oracle'.
*
* @return The database mode.
* @throws SQLException If a database access error occurs.
*/
public String getDatabaseMode() throws SQLException {
if (databaseMode == null) {
databaseMode = connection().getMetaData().getDatabaseProductName();
}
return databaseMode;
}

/**
* Get table list by database name pattern and table name pattern.
*
* @param dbPattern Database name pattern.
* @param tbPattern Table name pattern.
* @return Table list.
* @throws SQLException If a database access error occurs.
*/
public List<String> getTables(String dbPattern, String tbPattern) throws SQLException {
List<String> result = new ArrayList<>();
DatabaseMetaData metaData = connection().getMetaData();
switch (getDatabaseMode().toLowerCase()) {
case "mysql":
List<String> dbNames = getResultList(metaData.getCatalogs(), "TABLE_CAT");
dbNames =
dbNames.stream()
.filter(dbName -> Pattern.matches(dbPattern, dbName))
.collect(Collectors.toList());
for (String dbName : dbNames) {
List<String> tableNames =
getResultList(
metaData.getTables(dbName, null, null, new String[] {"TABLE"}),
"TABLE_NAME");
tableNames.stream()
.filter(tbName -> Pattern.matches(tbPattern, tbName))
.forEach(tbName -> result.add(dbName + "." + tbName));
}
break;
case "oracle":
dbNames = getResultList(metaData.getSchemas(), "TABLE_SCHEM");
dbNames =
dbNames.stream()
.filter(dbName -> Pattern.matches(dbPattern, dbName))
.collect(Collectors.toList());
for (String dbName : dbNames) {
List<String> tableNames =
getResultList(
metaData.getTables(null, dbName, null, new String[] {"TABLE"}),
"TABLE_NAME");
tableNames.stream()
.filter(tbName -> Pattern.matches(tbPattern, tbName))
.forEach(tbName -> result.add(dbName + "." + tbName));
}
break;
default:
throw new FlinkRuntimeException("Unsupported database mode: " + getDatabaseMode());
}
return result;
}

private List<String> getResultList(ResultSet resultSet, String columnName) throws SQLException {
List<String> result = new ArrayList<>();
while (resultSet.next()) {
result.add(resultSet.getString(columnName));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,8 +82,10 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
private final String tableName;
private final String tableList;
private final Duration connectTimeout;
private final String serverTimeZone;
private final String hostname;
private final Integer port;
private final Properties jdbcProperties;
private final String logProxyHost;
private final int logProxyPort;
private final ClientConf logProxyClientConf;
Expand All @@ -108,8 +111,10 @@ public OceanBaseRichSourceFunction(
String tableName,
String tableList,
Duration connectTimeout,
String serverTimeZone,
String hostname,
Integer port,
Properties jdbcProperties,
String logProxyHost,
int logProxyPort,
ClientConf logProxyClientConf,
Expand All @@ -123,8 +128,10 @@ public OceanBaseRichSourceFunction(
this.tableName = tableName;
this.tableList = tableList;
this.connectTimeout = checkNotNull(connectTimeout);
this.serverTimeZone = checkNotNull(serverTimeZone);
this.hostname = hostname;
this.port = port;
this.jdbcProperties = jdbcProperties;
this.logProxyHost = checkNotNull(logProxyHost);
this.logProxyPort = checkNotNull(logProxyPort);
this.logProxyClientConf = checkNotNull(logProxyClientConf);
Expand Down Expand Up @@ -169,7 +176,7 @@ private boolean shouldReadSnapshot() {
return resolvedTimestamp == -1 && snapshot;
}

private OceanBaseConnection getSnapshotConnection() {
private OceanBaseConnection getSnapshotConnection() throws SQLException {
if (snapshotConnection == null) {
snapshotConnection =
new OceanBaseConnection(
Expand All @@ -178,6 +185,8 @@ private OceanBaseConnection getSnapshotConnection() {
username,
password,
connectTimeout,
serverTimeZone,
jdbcProperties,
getClass().getClassLoader());
}
return snapshotConnection;
Expand Down Expand Up @@ -214,23 +223,12 @@ private void initTableWhiteList() {
&& StringUtils.isNotBlank(databaseName)
&& StringUtils.isNotBlank(tableName)) {
try {
String sql =
String.format(
"SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'",
databaseName, tableName);
getSnapshotConnection()
.query(
sql,
rs -> {
while (rs.next()) {
localTableSet.add(
String.format(
"%s.%s", rs.getString(1), rs.getString(2)));
}
});
LOG.info("Connection database mode: {}", getSnapshotConnection().getDatabaseMode());
List<String> tables = getSnapshotConnection().getTables(databaseName, tableName);
LOG.info("Pattern matched tables: {}", tables);
localTableSet.addAll(tables);
} catch (SQLException e) {
LOG.error("Query database and table name failed", e);
LOG.error("Query table list by 'databaseName' and 'tableName' failed", e);
throw new FlinkRuntimeException(e);
}
}
Expand All @@ -239,6 +237,7 @@ private void initTableWhiteList() {
throw new FlinkRuntimeException("No valid table found");
}

LOG.info("Table list: {}", localTableSet);
this.tableSet = localTableSet;
this.obReaderConfig.setTableWhiteList(
localTableSet.stream()
Expand All @@ -259,14 +258,18 @@ private void readSnapshotRecordsByTable(String databaseName, String tableName) {
OceanBaseRecord.SourceInfo sourceInfo =
new OceanBaseRecord.SourceInfo(
tenantName, databaseName, tableName, resolvedTimestamp);

String fullName = String.format("`%s`.`%s`", databaseName, tableName);
String selectSql = "SELECT * FROM " + fullName;
try {
String databaseMode = getSnapshotConnection().getDatabaseMode();
String fullName;
if ("mysql".equalsIgnoreCase(databaseMode)) {
fullName = String.format("`%s`.`%s`", databaseName, tableName);
} else {
fullName = String.format("%s.%s", databaseName, tableName);
}
LOG.info("Start to read snapshot from {}", fullName);
getSnapshotConnection()
.query(
selectSql,
"SELECT * FROM " + fullName,
rs -> {
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
Expand All @@ -287,7 +290,7 @@ private void readSnapshotRecordsByTable(String databaseName, String tableName) {
});
LOG.info("Read snapshot from {} finished", fullName);
} catch (SQLException e) {
LOG.error("Read snapshot from table " + fullName + " failed", e);
LOG.error("Read snapshot by table failed", e);
throw new FlinkRuntimeException(e);
}
}
Expand Down
Loading

0 comments on commit f7ee5c1

Please sign in to comment.