Skip to content

Commit

Permalink
[feature](jdbc catalog) support doris jdbc catalog array type (#23056)
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk authored Aug 23, 2023
1 parent 8bdb75e commit 448b775
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 112 deletions.
18 changes: 16 additions & 2 deletions docs/en/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ As for data mapping from SQLServer to Doris, one Database in Doris corresponds t

Jdbc Catalog also support to connect another Doris database:

* mysql 5.7 Driver

```sql
CREATE CATALOG jdbc_doris PROPERTIES (
"type"="jdbc",
Expand All @@ -361,10 +363,21 @@ CREATE CATALOG jdbc_doris PROPERTIES (
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
);
)
```

**Note:** Currently, Jdbc Catalog only support to use 5.x version of JDBC jar package to connect another Doris database. If you use 8.x version of JDBC jar package, the data type of column may not be matched.
* mysql 8 Driver

```sql
CREATE CATALOG jdbc_doris PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-8.0.25.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver"
)
```

#### Type Mapping

Expand All @@ -386,6 +399,7 @@ CREATE CATALOG jdbc_doris PROPERTIES (
| STRING | STRING | |
| TEXT | STRING | |
| HLL | HLL | Query HLL needs to set `return_object_data_as_binary=true` |
| Array | Array | The internal type adaptation logic of Array refers to the above types, and nested complex types are not supported |
| Other | UNSUPPORTED | |

### Clickhouse
Expand Down
18 changes: 16 additions & 2 deletions docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ CREATE CATALOG jdbc_sqlserve PROPERTIES (

Jdbc Catalog也支持连接另一个Doris数据库:

* mysql 5.7 Driver

```sql
CREATE CATALOG jdbc_doris PROPERTIES (
"type"="jdbc",
Expand All @@ -362,10 +364,21 @@ CREATE CATALOG jdbc_doris PROPERTIES (
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
);
)
```

**注意:** 目前 Jdbc Catalog 连接一个 Doris 数据库只支持用 5.x 版本的 jdbc jar 包。如果使用 8.x jdbc jar 包,可能会出现列类型无法匹配问题。
* mysql 8 Driver

```sql
CREATE CATALOG jdbc_doris PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-8.0.25.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver"
)
```

#### 类型映射

Expand All @@ -387,6 +400,7 @@ CREATE CATALOG jdbc_doris PROPERTIES (
| STRING | STRING | |
| TEXT | STRING | |
| HLL | HLL | 查询HLL需要设置`return_object_data_as_binary=true` |
| Array | Array | Array内部类型适配逻辑参考上述类型,不支持嵌套复杂类型 |
| Other | UNSUPPORTED | |

### Clickhouse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.jdbc.client;

import org.apache.doris.analysis.DefaultValueExprDef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
Expand All @@ -39,10 +40,27 @@
public class JdbcMySQLClient extends JdbcClient {

private static boolean convertDateToNull = false;
private static boolean isDoris = false;

protected JdbcMySQLClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
convertDateToNull = isConvertDatetimeToNull(jdbcClientConfig);
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = super.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW VARIABLES LIKE 'version_comment'");
if (rs.next()) {
String versionComment = rs.getString("Value");
isDoris = versionComment.toLowerCase().contains("doris");
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to determine MySQL Version Comment", e);
} finally {
close(rs, stmt, conn);
}
}

@Override
Expand Down Expand Up @@ -91,37 +109,6 @@ protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalog
return databaseMetaData.getColumns(schemaName, null, tableName, null);
}

/**
* get all columns like DatabaseMetaData.getColumns in mysql-jdbc-connector
*/
private Map<String, String> getJdbcColumnsTypeInfo(String dbName, String tableName) {
Connection conn = getConnection();
ResultSet resultSet = null;
Map<String, String> fieldtoType = Maps.newHashMap();

StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
queryBuf.append(tableName);
queryBuf.append(" FROM ");
queryBuf.append(dbName);
try (Statement stmt = conn.createStatement()) {
resultSet = stmt.executeQuery(queryBuf.toString());
while (resultSet.next()) {
// get column name
String fieldName = resultSet.getString("Field");
// get original type name
String typeName = resultSet.getString("Type");
fieldtoType.put(fieldName, typeName);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get column list from jdbc for table %s:%s", tableName,
Util.getRootCauseMessage(e));
} finally {
close(resultSet, conn);
}

return fieldtoType;
}

/**
* get all columns of one table
*/
Expand All @@ -141,7 +128,6 @@ public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName)
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, dbName, tableName);
List<String> primaryKeys = getPrimaryKeys(databaseMetaData, catalogName, dbName, tableName);
boolean needGetDorisColumns = true;
Map<String, String> mapFieldtoType = null;
while (rs.next()) {
JdbcFieldSchema field = new JdbcFieldSchema();
Expand All @@ -151,17 +137,10 @@ public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName)
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN"
// in mysql-jdbc-connector-5.1.*, TYPE_NAME of the HLL column in doris will be "HLL"
field.setDataTypeName(rs.getString("TYPE_NAME"));
if (rs.getString("TYPE_NAME").equalsIgnoreCase("UNKNOWN")) {
if (needGetDorisColumns) {
mapFieldtoType = getJdbcColumnsTypeInfo(dbName, tableName);
needGetDorisColumns = false;
}

if (mapFieldtoType != null) {
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
}
if (isDoris) {
mapFieldtoType = getColumnsDataTypeUseQuery(dbName, tableName);
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
}

field.setKey(primaryKeys.contains(field.getColumnName()));
field.setColumnSize(rs.getInt("COLUMN_SIZE"));
field.setDecimalDigits(rs.getInt("DECIMAL_DIGITS"));
Expand Down Expand Up @@ -227,6 +206,10 @@ protected List<String> getPrimaryKeys(DatabaseMetaData databaseMetaData, String

@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
// For Doris type
if (isDoris) {
return dorisTypeToDoris(fieldSchema.getDataTypeName().toUpperCase());
}
// For mysql type: "INT UNSIGNED":
// fieldSchema.getDataTypeName().split(" ")[0] == "INT"
// fieldSchema.getDataTypeName().split(" ")[1] == "UNSIGNED"
Expand Down Expand Up @@ -274,15 +257,10 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
return Type.INT;
case "BIGINT":
return Type.BIGINT;
case "LARGEINT": // for jdbc catalog connecting Doris database
return Type.LARGEINT;
case "DATE":
case "DATEV2":
return ScalarType.createDateV2Type();
case "TIMESTAMP":
case "DATETIME":
// for jdbc catalog connecting Doris database
case "DATETIMEV2": {
case "DATETIME": {
// mysql can support microsecond
// use columnSize to calculate the precision of timestamp/datetime
int columnSize = fieldSchema.getColumnSize();
Expand All @@ -299,9 +277,7 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL":
// for jdbc catalog connecting Doris database
case "DECIMALV3": {
case "DECIMAL": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
Expand Down Expand Up @@ -335,8 +311,6 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
case "VARBINARY":
case "ENUM":
return ScalarType.createStringType();
case "HLL":
return ScalarType.createHllType();
default:
return Type.UNSUPPORTED;
}
Expand All @@ -346,4 +320,105 @@ private boolean isConvertDatetimeToNull(JdbcClientConfig jdbcClientConfig) {
// Check if the JDBC URL contains "zeroDateTimeBehavior=convertToNull".
return jdbcClientConfig.getJdbcUrl().contains("zeroDateTimeBehavior=convertToNull");
}

/**
* get all columns like DatabaseMetaData.getColumns in mysql-jdbc-connector
*/
private Map<String, String> getColumnsDataTypeUseQuery(String dbName, String tableName) {
Connection conn = getConnection();
ResultSet resultSet = null;
Map<String, String> fieldtoType = Maps.newHashMap();

StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
queryBuf.append(tableName);
queryBuf.append(" FROM ");
queryBuf.append(dbName);
try (Statement stmt = conn.createStatement()) {
resultSet = stmt.executeQuery(queryBuf.toString());
while (resultSet.next()) {
// get column name
String fieldName = resultSet.getString("Field");
// get original type name
String typeName = resultSet.getString("Type");
fieldtoType.put(fieldName, typeName);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get column list from jdbc for table %s:%s", tableName,
Util.getRootCauseMessage(e));
} finally {
close(resultSet, conn);
}
return fieldtoType;
}

private Type dorisTypeToDoris(String type) {
if (type == null || type.isEmpty()) {
return Type.UNSUPPORTED;
}

String upperType = type.toUpperCase();

// For ARRAY type
if (upperType.startsWith("ARRAY")) {
String innerType = upperType.substring(6, upperType.length() - 1).trim();
Type arrayInnerType = dorisTypeToDoris(innerType);
return ArrayType.create(arrayInnerType, true);
}

int openParen = upperType.indexOf("(");
String baseType = (openParen == -1) ? upperType : upperType.substring(0, openParen);

switch (baseType) {
case "BOOL":
case "BOOLEAN":
return Type.BOOLEAN;
case "TINYINT":
return Type.TINYINT;
case "INT":
return Type.INT;
case "SMALLINT":
return Type.SMALLINT;
case "BIGINT":
return Type.BIGINT;
case "LARGEINT":
return Type.LARGEINT;
case "FLOAT":
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL":
case "DECIMALV3": {
String[] params = upperType.substring(openParen + 1, upperType.length() - 1).split(",");
int precision = Integer.parseInt(params[0].trim());
int scale = Integer.parseInt(params[1].trim());
return createDecimalOrStringType(precision, scale);
}
case "DATE":
case "DATEV2":
return ScalarType.createDateV2Type();
case "DATETIME":
case "DATETIMEV2": {
int scale = Integer.parseInt(upperType.substring(openParen + 1, upperType.length() - 1));
if (scale > 6) {
scale = 6;
}
return ScalarType.createDatetimeV2Type(scale);
}
case "CHAR":
case "VARCHAR": {
int length = Integer.parseInt(upperType.substring(openParen + 1, upperType.length() - 1));
return baseType.equals("CHAR")
? ScalarType.createCharType(length) : ScalarType.createVarcharType(length);
}
case "STRING":
case "TEXT":
return ScalarType.createStringType();
case "JSON":
return ScalarType.createJsonbType();
case "HLL":
return ScalarType.createHllType();
default:
return Type.UNSUPPORTED;
}
}
}
1 change: 1 addition & 0 deletions regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pg_14_port=5442
oracle_11_port=1521
sqlserver_2022_port=1433
clickhouse_22_port=8123
doris_port=9030

// hive catalog test config
// To enable hive test, you need first start hive container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ doris_jdbc_catalog
5 doris5
6 doris6

-- !base1 --
true 1 1 1 1 1 1.0 1.0 1.00000 1.0000000000 2021-01-01 2021-01-01T00:00 a a {"a":1}

-- !arr1 --
1 [1] [1] [1] [1] [1] [1] [1] [1] [1.00000] [1.0000000000] [2021-01-01] [2021-01-01 00:00:00.000] ["a"] ["a"] ["a"]

-- !tb1 --
1 1
2 1
Expand All @@ -49,6 +55,12 @@ doris_jdbc_catalog
5 1
6 1

-- !base2 --
true 1 1 1 1 1 1.0 1.0 1.00000 1.0000000000 2021-01-01 2021-01-01T00:00 a a {"a":1}

-- !arr2 --
1 [1] [1] [1] [1] [1] [1] [1] [1] [1.00000] [1.0000000000] [2021-01-01] [2021-01-01 00:00:00.000] ["a"] ["a"] ["a"]

-- !sql --
doris_jdbc_catalog

Expand Down
Loading

0 comments on commit 448b775

Please sign in to comment.