Skip to content

Commit

Permalink
[Doc]add more description in kafka connector and modify the name for …
Browse files Browse the repository at this point in the history
…op field (backport #43074) (#43315)

Signed-off-by: hellolilyliuyi <[email protected]>
Co-authored-by: hellolilyliuyi <[email protected]>
  • Loading branch information
mergify[bot] and hellolilyliuyi authored Mar 28, 2024
1 parent dbd8727 commit 3b4e0a2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
29 changes: 25 additions & 4 deletions docs/en/loading/Kafka-connector-starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ CREATE TABLE test_tbl (id INT, city STRING);

1. Configure the Kafka connector. In the **config** directory under the Kafka installation directory, create the configuration file **connect-StarRocks-sink.properties** for the Kafka connector, and configure the following parameters. For more parameters and dsescriptions, see [Parameters](#Parameters).

:::note

The Kafka connector is a sink connector.

:::

```yaml
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
Expand Down Expand Up @@ -140,6 +146,12 @@ CREATE TABLE test_tbl (id INT, city STRING);

2. Configure and create the Kafka connector. Note that in distributed mode, you need to configure and create the Kafka connector through the REST API. For parameters and descriptions, see [Parameters](#Parameters).

:::note

The Kafka connector is a sink connector.

:::

```Shell
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"starrocks-kafka-connector",
Expand All @@ -159,9 +171,12 @@ CREATE TABLE test_tbl (id INT, city STRING);
}
}'
```
> **NOTICE**
>
> If the source data is CDC data, such as data in Debezium format, and the StarRocks table is a Primary Key table, you also need to [configure `transform`](#load-debezium-formatted-cdc-data) in order to synchronize the source data changes to the Primary Key table.

:::info

If the source data is CDC data, such as data in Debezium format, and the StarRocks table is a Primary Key table, you also need to [configure `transform`](#load-debezium-formatted-cdc-data) in order to synchronize the source data changes to the Primary Key table.

:::

#### Query StarRocks table

Expand Down Expand Up @@ -308,6 +323,12 @@ The data is successfully loaded when the above result is returned.

If the Kafka data is in Debezium CDC format and the StarRocks table is a Primary Key table, you also need to configure the `transforms` parameter and other related parameters.

:::note

The Kafka connector is a sink connector.

:::

```Properties
transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
Expand All @@ -318,5 +339,5 @@ transforms.unwrap.delete.handling.mode

In the above configurations, we specify `transforms=addfield,unwrap`.

- The addfield transform is used to add the __op field to each record of Debezium CDC-formatted data to support the StarRocks Primary Key table. If the StarRocks table is not a Primary Key table, you do not need to specify the addfield transform. The addfield transform class is com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord. It is included in the Kafka connector JAR file, so you do not need to manually install it.
- If the StarRocks table is a Primary Key table, you need to specify the addfield transform to add an `op` field to each record of the Debezium CDC formatted data. If the StarRocks table is not a Primary Key table, you do not need to specify the addfield transform. The addfield transform class is `com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord`. It is included in the Kafka connector JAR file, so you do not need to manually install it.
- The unwrap transform is provided by Debezium and is used to unwrap Debezium's complex data structure based on the operation type. For more information, see [New Record State Extraction](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html).
20 changes: 18 additions & 2 deletions docs/zh/loading/Kafka-connector-starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ CREATE TABLE test_tbl (id INT, city STRING);

1. 配置 Kafka connector。在 Kafka 安装目录下的 **config** 目录,创建 Kafka connector 的配置文件 **connect-StarRocks-sink.properties**,并配置对应参数。参数和相关说明,参见[参数说明](#参数说明)

:::note

Kafka connector 是 sink connector。

:::
```yaml
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
Expand Down Expand Up @@ -115,7 +120,7 @@ CREATE TABLE test_tbl (id INT, city STRING);
#### 通过 Distributed 模式启动 Kafka Connect

1. 配置并启动 Kafka Connect。
1. 配置 Kafka Connect。在 **config** 目录中的 `config/connect-distributed.properties` 配置文件中配置如下参数。参数解释,参见 [Running Kafka Connect](https://kafka.apache.org/documentation.html#connect_running)。
1. 配置 Kafka Connect。在 **config** 目录中的 `config/connect-distributed.properties` 配置文件中配置如下参数。参数解释,参见 [Running Kafka Connect](https://kafka.apache.org/documentation.html#connect_running)。
```yaml
# kafka broker 的地址,多个 Broker 之间以英文逗号 (,) 分隔。
# 注意本示例使用 PLAINTEXT 安全协议访问 Kafka 集群,如果使用其他安全协议访问 Kafka 集群,则您需要在本文件中配置相关信息。 bootstrap.servers=<kafka_broker_ip>:9092
Expand All @@ -136,6 +141,12 @@ CREATE TABLE test_tbl (id INT, city STRING);
```

2. 配置并创建 Kafka connector。注意,在 Distributed 模式下您需要通过 REST API 来配置并创建 Kafka connector。参数和相关说明,参见[参数说明](#参数说明)。

:::note

Kafka connector 是 sink connector。

:::

```Shell
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
Expand Down Expand Up @@ -210,6 +221,11 @@ MySQL [example_db]> select * from test_tbl;
### 导入 Debezium CDC 格式数据

如果 Kafka 数据为 Debezium CDC 格式,并且 StarRocks 表为主键表,则在 Kafka connector 配置文件 **connect-StarRocks-sink.properties** 中除了[配置基础参数](#使用示例)外,还需要配置 `transforms` 以及相关参数。
:::note

Kafka connector 是 sink connector。

:::

```Properties
transforms=addfield,unwrap
Expand All @@ -221,5 +237,5 @@ transforms.unwrap.delete.handling.mode=rewrite

在上述配置中,我们指定 `transforms=addfield,unwrap`。

- addfield transform 用于向 Debezium CDC 格式数据的每个记录添加一个__op字段,以支持 [主键表](../table_design/table_types/primary_key_table.md),。如果 StarRocks 表不是主键表,则无需指定 addfield 转换。addfield transform 的类是 com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord,已经包含在 Kafka connector 的 JAR 文件中,您无需手动安装。
- 如果 StarRocks 表是主键表,则需要指定 addfield transform用于向 Debezium CDC 格式数据的每个记录添加一个 `op` 字段。如果 StarRocks 表不是主键表,则无需指定 addfield transform。addfield transform 的类是 `com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord`,已经包含在 Kafka connector 的 JAR 文件中,您无需手动安装。
- unwrap transform 是指由 Debezium 提供的 unwrap,可以根据操作类型 unwrap Debezium 复杂的数据结构。更多信息,参见 [New Record State Extraction](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html)。

0 comments on commit 3b4e0a2

Please sign in to comment.