Skip to content

Commit

Permalink
[Improve][CDC] Optimize options & add docs for compatible_debezium_js…
Browse files Browse the repository at this point in the history
…on (apache#4351)

* Using enum define format options
* Rename canal-json to canal_json
* Add docs
  • Loading branch information
hailin0 committed Mar 15, 2023
1 parent fccd2c3 commit 2aac292
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 73 deletions.
12 changes: 6 additions & 6 deletions docs/en/connector-v2/formats/canal-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel

| option | default | required | Description |
|--------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| format | (none) | yes | Specify what format to use, here should be 'canal-json'. |
| canal-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
| canal-json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
| canal-json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
| format | (none) | yes | Specify what format to use, here should be 'canal_json'. |
| canal_json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
| canal_json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
| canal_json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |

# How to use Canal format

Expand Down Expand Up @@ -95,7 +95,7 @@ source {
weight = "string"
}
},
format = canal-json
format = canal_json
}

}
Expand All @@ -107,7 +107,7 @@ sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "consume-binlog"
format = canal-json
format = canal_json
}
}
```
Expand Down
67 changes: 67 additions & 0 deletions docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# CDC compatible debezium-json

Seatunnel supports to interpret cdc record as Debezium-JSON messages publish to mq(kafka) system.

This is useful in many cases to leverage this feature, such as compatible with the debezium ecosystem.

# How to use

## MySQL-CDC output to Kafka

```bash
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}

source {
MySQL-CDC {
result_table_name = "table1"

hostname = localhost
base-url="jdbc:mysql://localhost:3306/test"
"startup.mode"=INITIAL
catalog {
factory=MySQL
}
table-names=[
"database1.t1",
"database1.t2",
"database2.t1"
]

# compatible_debezium_json options
format = compatible_debezium_json
debezium = {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
# compatible_debezium_json fixed schema
schema = {
fields = {
topic = string
key = string
value = string
}
}
}
}

sink {
Kafka {
source_table_name = "table1"

bootstrap.servers = "localhost:9092"

# compatible_debezium_json options
format = compatible_debezium_json
}
}
```

5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describes how to setup the MySQL CDC connector to run SQL queries against MySQL
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |

### username [String]
Expand Down Expand Up @@ -156,6 +157,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
See more about
the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties)

### format [Enum]

Optional output format for MySQL CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".

#### example

```conf
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |

### hostname [String]
Expand Down Expand Up @@ -150,6 +151,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
See more about
the [Debezium's SqlServer Connector properties](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#sqlserver-connector-properties)

### format [Enum]

Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".

#### example

```conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class SourceOptions {

public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(INCREMENTAL_PARALLELISM)
.optional(STARTUP_MODE, STOP_MODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
Expand Down Expand Up @@ -67,6 +69,13 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
@Override
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
ReadonlyConfig config) {
if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
config.get(JdbcSourceOptions.FORMAT))) {
return (DebeziumDeserializationSchema<T>)
new DebeziumJsonDeserializeSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;

import java.util.List;
import java.util.Map;
Expand All @@ -29,16 +28,6 @@ public class Config {
public static final String CONNECTOR_IDENTITY = "Kafka";
public static final String REPLICATION_FACTOR = "replication.factor";

/** The default data format is JSON */
public static final String DEFAULT_FORMAT = "json";

public static final String TEXT_FORMAT = "text";

public static final String CANNAL_FORMAT = "canal-json";

public static final String COMPATIBLE_DEBEZIUM_JSON =
CompatibleDebeziumJsonSerializationSchema.IDENTIFIER;

/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";

Expand Down Expand Up @@ -75,7 +64,7 @@ public class Config {
public static final Option<String> CONSUMER_GROUP =
Options.key("consumer.group")
.stringType()
.noDefaultValue()
.defaultValue("SeaTunnel-Consumer-Group")
.withDescription(
"Kafka consumer group id, used to distinguish different consumer groups.");

Expand All @@ -102,10 +91,10 @@ public class Config {
.withDescription(
"The structure of the data, including field names and field types.");

public static final Option<String> FORMAT =
public static final Option<MessageFormat> FORMAT =
Options.key("format")
.stringType()
.noDefaultValue()
.enumType(MessageFormat.class)
.defaultValue(MessageFormat.JSON)
.withDescription(
"Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ "If you customize the delimiter, add the \"field_delimiter\" option.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.config;

public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
COMPATIBLE_DEBEZIUM_JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
Expand All @@ -40,11 +42,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANNAL_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMPATIBLE_DEBEZIUM_JSON;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;

@RequiredArgsConstructor
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
private final Function<SeaTunnelRow, String> topicExtractor;
Expand All @@ -66,9 +63,9 @@ public ProducerRecord serializeRow(SeaTunnelRow row) {
}

public static DefaultSeaTunnelRowSerializer create(
String topic, SeaTunnelRowType rowType, String format, String delimiter) {
String topic, SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
return new DefaultSeaTunnelRowSerializer(
topicExtractor(topic, rowType),
topicExtractor(topic, rowType, format),
partitionExtractor(null),
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter),
Expand All @@ -80,10 +77,10 @@ public static DefaultSeaTunnelRowSerializer create(
String topic,
Integer partition,
SeaTunnelRowType rowType,
String format,
MessageFormat format,
String delimiter) {
return new DefaultSeaTunnelRowSerializer(
topicExtractor(topic, rowType),
topicExtractor(topic, rowType, format),
partitionExtractor(partition),
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter),
Expand All @@ -95,10 +92,10 @@ public static DefaultSeaTunnelRowSerializer create(
String topic,
List<String> keyFields,
SeaTunnelRowType rowType,
String format,
MessageFormat format,
String delimiter) {
return new DefaultSeaTunnelRowSerializer(
topicExtractor(topic, rowType),
topicExtractor(topic, rowType, format),
partitionExtractor(null),
timestampExtractor(),
keyExtractor(keyFields, rowType, format, delimiter),
Expand All @@ -119,7 +116,13 @@ private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor() {
}

private static Function<SeaTunnelRow, String> topicExtractor(
String topic, SeaTunnelRowType rowType) {
String topic, SeaTunnelRowType rowType, MessageFormat format) {
if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
int topicFieldIndex =
rowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
return row -> row.getField(topicFieldIndex).toString();
}

String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topic);
Expand Down Expand Up @@ -147,8 +150,11 @@ private static Function<SeaTunnelRow, String> topicExtractor(
}

private static Function<SeaTunnelRow, byte[]> keyExtractor(
List<String> keyFields, SeaTunnelRowType rowType, String format, String delimiter) {
if (COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
List<String> keyFields,
SeaTunnelRowType rowType,
MessageFormat format,
String delimiter) {
if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
CompatibleDebeziumJsonSerializationSchema serializationSchema =
new CompatibleDebeziumJsonSerializationSchema(rowType, true);
return row -> serializationSchema.serialize(row);
Expand All @@ -167,7 +173,7 @@ private static Function<SeaTunnelRow, byte[]> keyExtractor(
}

private static Function<SeaTunnelRow, byte[]> valueExtractor(
SeaTunnelRowType rowType, String format, String delimiter) {
SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
SerializationSchema serializationSchema =
createSerializationSchema(rowType, format, delimiter, false);
return row -> serializationSchema.serialize(row);
Expand Down Expand Up @@ -202,16 +208,16 @@ private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
}

private static SerializationSchema createSerializationSchema(
SeaTunnelRowType rowType, String format, String delimiter, boolean isKey) {
SeaTunnelRowType rowType, MessageFormat format, String delimiter, boolean isKey) {
switch (format) {
case DEFAULT_FORMAT:
case JSON:
return new JsonSerializationSchema(rowType);
case TEXT_FORMAT:
case TEXT:
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();
case CANNAL_FORMAT:
case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;

/**
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
Expand All @@ -61,8 +60,7 @@ public class KafkaSink
public KafkaSink() {}

public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
Expand All @@ -76,8 +74,7 @@ public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
Expand Down
Loading

0 comments on commit 2aac292

Please sign in to comment.