Skip to content

Commit

Permalink
[Feature][Connector] add starrocks save_mode (#6029)
Browse files Browse the repository at this point in the history
  • Loading branch information
chl-wxp authored Jan 12, 2024
1 parent 413fa74 commit 66b0f1e
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 126 deletions.
95 changes: 77 additions & 18 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,68 @@ The internal implementation of StarRocks sink connector is cached and imported b

## Sink Options

| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` |
| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` |
| username | string | yes | - | `StarRocks` user username |
| password | string | yes | - | `StarRocks` user password |
| database | string | yes | - | The name of StarRocks database |
| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table |
| labelPrefix | string | no | - | The prefix of StarRocks stream load label |
| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| max_retries | int | no | - | The number of retries to flush failed |
| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff |
| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` |
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |
| http_socket_timeout_ms | int | no | 180000 | Set http socket timeout, default is 3 minutes. |
| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` |
| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` |
| username | string | yes | - | `StarRocks` user username |
| password | string | yes | - | `StarRocks` user password |
| database | string | yes | - | The name of StarRocks database |
| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table |
| labelPrefix | string | no | - | The prefix of StarRocks stream load label |
| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| max_retries | int | no | - | The number of retries to flush failed |
| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff |
| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` |
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |
| http_socket_timeout_ms | int | no | 180000 | Set http socket timeout, default is 3 minutes. |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. |
| data_save_mode | Enum | no | APPEND_DATA | Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. |
| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. |

### save_mode_create_template

We use templates to automatically create starrocks tables,
which will create corresponding table creation statements based on the type of upstream data and schema type,
and the default template can be modified according to the situation. Only work on multi-table mode at now.

### table [string]

Use `database` and this `table-name` auto-generate sql and receive upstream input datas write to database.

This option is mutually exclusive with `query` and has a higher priority.

The table parameter can fill in the name of an unwilling table, which will eventually be used as the table name of the creation table, and supports variables (`${table_name}`, `${schema_name}`). Replacement rules: `${schema_name}` will replace the SCHEMA name passed to the target side, and `${table_name}` will replace the name of the table passed to the table at the target side.

for example:
1. test_${schema_name}_${table_name}_test
2. sink_sinktable
3. ss_${table_name}

### schema_save_mode[Enum]

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist

### data_save_mode[Enum]

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
`DROP_DATA`: Preserve database structure and delete data
`APPEND_DATA`:Preserve database structure, preserve data
`CUSTOM_PROCESSING`:User defined processing
`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported

### custom_sql[String]

When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
Expand Down Expand Up @@ -222,6 +259,28 @@ sink {
}
```

### Use save_mode function

```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
```

## Changelog

### next version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -63,6 +65,7 @@

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

@Slf4j
public class StarRocksCatalog implements Catalog {

protected final String catalogName;
Expand All @@ -72,6 +75,7 @@ public class StarRocksCatalog implements Catalog {
protected final String baseUrl;
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
private final String template;

private static final Set<String> SYS_DATABASES = new HashSet<>();
private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
Expand All @@ -81,10 +85,10 @@ public class StarRocksCatalog implements Catalog {
SYS_DATABASES.add("_statistics_");
}

public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
public StarRocksCatalog(
String catalogName, String username, String pwd, String defaultUrl, String template) {

checkArgument(StringUtils.isNotBlank(username));
checkArgument(StringUtils.isNotBlank(pwd));
checkArgument(StringUtils.isNotBlank(defaultUrl));
urlInfo = JdbcUrlUtil.getUrlInfo(defaultUrl);
this.baseUrl = urlInfo.getUrlWithoutDatabase();
Expand All @@ -95,6 +99,7 @@ public StarRocksCatalog(String catalogName, String username, String pwd, String
this.catalogName = catalogName;
this.username = username;
this.pwd = pwd;
this.template = template;
}

@Override
Expand Down Expand Up @@ -208,13 +213,64 @@ public CatalogTable getTable(TablePath tablePath)
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
this.createTable(
StarRocksSaveModeUtil.fillingCreateSql(
template,
tablePath.getDatabaseName(),
tablePath.getTableName(),
table.getTableSchema()));
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
if (ignoreIfNotExists) {
conn.createStatement().execute("DROP TABLE IF EXISTS " + tablePath.getFullName());
} else {
conn.createStatement()
.execute(String.format("DROP TABLE %s", tablePath.getFullName()));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
}

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
if (ignoreIfNotExists) {
conn.createStatement()
.execute(String.format("TRUNCATE TABLE %s", tablePath.getFullName()));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed TRUNCATE TABLE in catalog %s", tablePath.getFullName()),
e);
}
}

public void executeSql(TablePath tablePath, String sql) {
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd)) {
connection.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(String.format("Failed EXECUTE SQL in catalog %s", sql), e);
}
}

public boolean isExistsData(TablePath tablePath) {
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd)) {
String sql = String.format("select * from %s limit 1", tablePath.getFullName());
ResultSet resultSet = connection.createStatement().executeQuery(sql);
if (resultSet == null) {
return false;
}
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed Connection JDBC error %s", tablePath.getTableName()), e);
}
}

@Override
Expand Down Expand Up @@ -336,6 +392,7 @@ private Map<String, String> buildConnectorOptions(TablePath tablePath) {
public void createTable(String sql)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
log.info("create table sql is :{}", sql);
conn.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(
Expand Down
Loading

0 comments on commit 66b0f1e

Please sign in to comment.