Skip to content

Commit

Permalink
[Fix][Connector-V2] Fixed adding table comments (#8514)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w authored Jan 16, 2025
1 parent 63c52de commit edca75b
Show file tree
Hide file tree
Showing 28 changed files with 366 additions and 26 deletions.
2 changes: 2 additions & 0 deletions docs/en/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ The placeholders are mainly controlled by the following expressions:
- Used to get the table unique-key fields in the upstream catalog table
- `${field_names}`
- Used to get the table field keys in the upstream catalog table
- `${comment}`
- Used to get the table comment in the upstream catalog table

## Configuration

Expand Down
7 changes: 5 additions & 2 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

If custom fields are added to the template, for example, adding an `id` field:
Expand All @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

The connector will automatically retrieve the corresponding types from the upstream source and fill in the template, removing the `id` field from the `rowtype_fields`. This method can be used to modify custom field types and attributes.
Expand All @@ -121,6 +123,7 @@ The following placeholders can be used:
- `rowtype_fields`: Retrieves all fields from the upstream schema and automatically maps them to Clickhouse field descriptions.
- `rowtype_primary_key`: Retrieves the primary key from the upstream schema (this may be a list).
- `rowtype_unique_key`: Retrieves the unique key from the upstream schema (this may be a list).
- `comment`: Retrieves the table comment from the upstream schema.

## How to Create a Clickhouse Data Synchronization Jobs

Expand Down
3 changes: 3 additions & 0 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand All @@ -110,6 +111,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
id,
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
Expand All @@ -129,6 +131,7 @@ You can use the following placeholders
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- rowtype_duplicate_key: Used to get the duplicate key in the upstream schema (only for doris source, maybe a list)
- comment: Used to get the table comment in the upstream schema

## Data Type Mapping

Expand Down
5 changes: 3 additions & 2 deletions docs/en/connector-v2/sink/Maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Default template:
```sql
CREATE TABLE IF NOT EXISTS `${table}` (
${rowtype_fields}
);
) COMMENT '${comment}';
```

If a custom field is filled in the template, such as adding an `id` field
Expand All @@ -72,7 +72,7 @@ CREATE TABLE IF NOT EXISTS `${table}`
(
id,
${rowtype_fields}
);
) COMMENT '${comment}';
```

The connector will automatically obtain the corresponding type from the upstream to complete the filling,
Expand All @@ -86,6 +86,7 @@ You can use the following placeholders
description of MaxCompute
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- comment: Used to get the table comment in the upstream schema

### schema_save_mode[Enum]

Expand Down
6 changes: 5 additions & 1 deletion docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)
Expand All @@ -79,7 +80,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(
id,
${rowtype_fields}
) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
Expand All @@ -97,6 +100,7 @@ You can use the following placeholders
description of StarRocks
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- comment: Used to get the table comment in the upstream schema

### table [string]

Expand Down
2 changes: 2 additions & 0 deletions docs/zh/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占
- 用于获取上游表中的唯一键字段名称列表
- `${field_names}`
- 用于获取上游表中的所有字段名称列表
- `${comment}`
- 用于获取上游表中的表注释

## 配置

Expand Down
7 changes: 5 additions & 2 deletions docs/zh/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

如果模板中填写了自定义字段,例如添加 id 字段
Expand All @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

连接器会自动从上游获取对应类型完成填充,
Expand All @@ -122,6 +124,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到 Clickhouse 的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
- comment:用于获取上游模式中的表注释。

## 如何创建一个clickhouse 同步任务

Expand Down
5 changes: 4 additions & 1 deletion docs/zh/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand All @@ -109,6 +110,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
id,
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
Expand All @@ -124,8 +126,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
- database:用于获取上游schema中的数据库。
- table_name:用于获取上游schema中的表名。
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
- comment:用于获取上游模式中的表注释。

## 数据类型映射

Expand Down
6 changes: 5 additions & 1 deletion docs/zh/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)
Expand All @@ -76,7 +77,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
id,
${rowtype_fields}
) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
Expand All @@ -92,6 +95,7 @@ StarRocks数据接收器根据上游数据自动获取相应的信息来填充
- rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型
- rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表
- rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表
- comment: 上游数据模式的注释信息

### table [string]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum SaveModePlaceHolder {
ROWTYPE_FIELDS("rowtype_fields", "fields"),
TABLE("table", "table"),
DATABASE("database", "database"),
COMMENT("comment", "comment"),
/** @deprecated instead by {@link #TABLE} todo remove this enum */
@Deprecated
TABLE_NAME("table_name", "table name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
tablePath.getDatabaseName(),
tablePath.getTableName(),
template,
table.getComment(),
table.getTableSchema());
}

Expand Down Expand Up @@ -252,6 +253,7 @@ public PreviewResult previewAction(
tablePath.getDatabaseName(),
tablePath.getTableName(),
catalogTable.get().getTableSchema(),
catalogTable.get().getComment(),
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
} else if (actionType == ActionType.DROP_TABLE) {
return new SQLPreviewResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ public class ClickhouseConfig {
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ ")\n"
+ "SETTINGS\n"
+ " index_granularity = 8192;")
+ " index_granularity = 8192"
+ "\n"
+ "COMMENT '"
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
+ "';")
.withDescription(
"Create table statement template, used to create Clickhouse table");
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,18 @@ public void executeSql(String sql) {
}

public void createTable(
String database, String table, String template, TableSchema tableSchema) {
String database,
String table,
String template,
String comment,
TableSchema tableSchema) {
String createTableSql =
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
template,
database,
table,
tableSchema,
comment,
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
log.debug("Create Clickhouse table sql: {}", createTableSql);
executeSql(createTableSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void test() {
.ASC)))))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
createTableSql,
Expand Down Expand Up @@ -129,6 +130,7 @@ public void test() {
"test1",
"test2",
tableSchema,
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));

String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
Expand Down Expand Up @@ -224,6 +226,7 @@ public void testInSeq() {
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
Expand All @@ -249,4 +252,110 @@ public void testInSeq() {
+ " index_granularity = 8192;";
Assertions.assertEquals(result, expected);
}

@Test
public void testTableComment() {
List<Column> columns = new ArrayList<>();

columns.add(
PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_LINENUMBER", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_QUANTITY", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_EXTENDEDPRICE", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_DISCOUNT", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_TAX", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_RETURNFLAG", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_LINESTATUS", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_COMMITDATE",
LocalTimeType.LOCAL_DATE_TYPE,
(Long) null,
false,
null,
""));
columns.add(
PhysicalColumn.of(
"L_RECEIPTDATE",
LocalTimeType.LOCAL_DATE_TYPE,
(Long) null,
false,
null,
""));
columns.add(
PhysicalColumn.of(
"L_SHIPINSTRUCT", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_SHIPMODE", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_COMMENT", BasicType.STRING_TYPE, (Long) null, false, null, ""));

String result =
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n"
+ "${rowtype_primary_key},\n"
+ "${rowtype_fields}\n"
+ ") ENGINE = MergeTree()\n"
+ "ORDER BY (${rowtype_primary_key})\n"
+ "PRIMARY KEY (${rowtype_primary_key})\n"
+ "SETTINGS\n"
+ " index_granularity = 8192\n"
+ "COMMENT '${comment}';",
"tpch",
"lineitem",
TableSchema.builder()
.primaryKey(
PrimaryKey.of(
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ "`L_ORDERKEY` Int32 ,`L_LINENUMBER` Int32 ,\n"
+ "`L_PARTKEY` Int32 ,\n"
+ "`L_SUPPKEY` Int32 ,\n"
+ "`L_QUANTITY` Decimal(15, 2) ,\n"
+ "`L_EXTENDEDPRICE` Decimal(15, 2) ,\n"
+ "`L_DISCOUNT` Decimal(15, 2) ,\n"
+ "`L_TAX` Decimal(15, 2) ,\n"
+ "`L_RETURNFLAG` String ,\n"
+ "`L_LINESTATUS` String ,\n"
+ "`L_SHIPDATE` Date ,\n"
+ "`L_COMMITDATE` Date ,\n"
+ "`L_RECEIPTDATE` Date ,\n"
+ "`L_SHIPINSTRUCT` String ,\n"
+ "`L_SHIPMODE` String ,\n"
+ "`L_COMMENT` String \n"
+ ") ENGINE = MergeTree()\n"
+ "ORDER BY (`L_ORDERKEY`,`L_LINENUMBER`)\n"
+ "PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`)\n"
+ "SETTINGS\n"
+ " index_granularity = 8192\n"
+ "COMMENT 'clickhouse test table';";
Assertions.assertEquals(result, expected);
}
}
Loading

0 comments on commit edca75b

Please sign in to comment.