Skip to content

Commit

Permalink
[sqlserver] Sqlserver incremental source.
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Jan 12, 2023
1 parent 22f68d3 commit ae7d376
Show file tree
Hide file tree
Showing 38 changed files with 4,421 additions and 232 deletions.
79 changes: 75 additions & 4 deletions docs/content/connectors/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dob.orders'
);

-- read snapshot and binlogs from orders table
Expand Down Expand Up @@ -167,6 +166,38 @@ Connector Options
<td>String</td>
<td>The session time zone in database server, e.g. "Asia/Shanghai".</td>
</tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enable parallelism snapshot.</td>
</tr>
<tr>
<td>chunk-meta.group.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.</td>
</tr>
<tr>
<td>chunk-key.even-distribution.factor.lower-bound</td>
<td>optional</td>
<td style="word-wrap: break-word;">0.05d</td>
<td>Double</td>
<td>The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not.
The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven.
The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.</td>
</tr>
<tr>
<td>chunk-key.even-distribution.factor.upper-bound</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000.0d</td>
<td>Double</td>
<td>The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not.
The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven.
The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
Expand Down Expand Up @@ -248,8 +279,7 @@ CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
```

Expand Down Expand Up @@ -306,6 +336,47 @@ public class SqlServerSourceExample {
}
}
```

The SQLServer CDC incremental connector (after 2.4.0) can be used as the following shows:
```java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class SqlServerIncrementalSourceExample {
public static void main(String[] args) throws Exception {
SqlServerIncrementalSource<String> sqlServerSource =
new SqlServerSourceBuilder()
.hostname("localhost")
.port(1433)
.databaseList("inventory")
.tableList("dbo.products")
.username("sa")
.password("Password!")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
env.fromSource(
sqlServerSource,
WatermarkStrategy.noWatermarks(),
"SqlServerIncrementalSource")
.setParallelism(2)
.print()
.setParallelism(1);

env.execute("Print SqlServer Snapshot + Change Stream");
}
}
```
**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization.

Data Type Mapping
Expand Down
6 changes: 2 additions & 4 deletions docs/content/quickstart/sqlserver-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ Flink SQL> CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
Flink SQL> CREATE TABLE orders (
Expand All @@ -161,8 +160,7 @@ Flink SQL> CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dbo.orders'
);
Flink SQL> CREATE TABLE enriched_orders (
Expand Down
6 changes: 2 additions & 4 deletions docs/content/快速上手/sqlserver-tutorial-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ Flink SQL> CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
Flink SQL> CREATE TABLE orders (
Expand All @@ -161,8 +160,7 @@ Flink SQL> CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dbo.orders'
);
Flink SQL> CREATE TABLE enriched_orders (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
public static boolean isDataChangeRecord(SourceRecord record) {
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
return valueSchema.field(Envelope.FieldName.OPERATION) != null
return valueSchema != null
&& valueSchema.field(Envelope.FieldName.OPERATION) != null
&& value.getString(Envelope.FieldName.OPERATION) != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ public void testSqlServerCDC() throws Exception {
" 'username' = '" + sqlServer.getUsername() + "',",
" 'password' = '" + sqlServer.getPassword() + "',",
" 'database-name' = 'inventory',",
" 'schema-name' = 'dbo',",
" 'table-name' = 'products'",
" 'table-name' = 'dbo.products',",
" 'scan.incremental.snapshot.enabled' = 'true',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
Expand All @@ -126,7 +127,7 @@ public void testSqlServerCDC() throws Exception {
submitSQLJob(sqlLines, sqlServerCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));

// generate binlogs
// generate change stream
try (Connection conn = getSqlServerJdbcConnection();
Statement statement = conn.createStatement()) {

Expand Down Expand Up @@ -171,7 +172,7 @@ public void testSqlServerCDC() throws Exception {
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
80000L);
}

private void initializeSqlServerTable(String sqlFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
* HistoryRecords in Flink's state for persistence.
*
* <p>Note: This is not a clean solution because we depends on a global variable and all the history
* <p>Note: This is not a clean solution because we depend on a global variable and all the history
* records will be stored in state (grow infinitely). We may need to come up with a
* FileSystemDatabaseHistory in the future to store history in HDFS.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
Expand Down Expand Up @@ -153,12 +152,6 @@ private StreamSplit createBackfillRedoLogSplit(

private RedoLogSplitReadTask createBackfillRedoLogReadTask(
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
OracleConnectorConfig oracleConnectorConfig =
context.getSourceConfig().getDbzConnectorConfig();
final OffsetContext.Loader<OracleOffsetContext> loader =
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
final OracleOffsetContext oracleOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
Expand Down Expand Up @@ -294,13 +287,13 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
@Override
protected SnapshotContext prepare(ChangeEventSourceContext changeEventSourceContext)
throws Exception {
return new MySqlSnapshotContext();
return new OracleSnapshotContext();
}

private static class MySqlSnapshotContext
private static class OracleSnapshotContext
extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {

public MySqlSnapshotContext() throws SQLException {
public OracleSnapshotContext() throws SQLException {
super("");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private void testOracleParallelSource(

String sourceDDL =
format(
"CREATE TABLE products ("
"CREATE TABLE customers ("
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " ADDRESS STRING,"
Expand Down Expand Up @@ -183,7 +183,7 @@ private void testOracleParallelSource(
"+I[2000, user_21, Shanghai, 123567891234]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from products");
TableResult tableResult = tEnv.executeSql("select * from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
Expand Down
7 changes: 7 additions & 0 deletions flink-connector-sqlserver-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ under the License.

<dependencies>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
Expand Down Expand Up @@ -138,6 +144,7 @@ under the License.
<scope>test</scope>
</dependency>


</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.ververica.cdc.connectors.sqlserver;

import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.connector.sqlserver.SqlServerConnector;
Expand Down Expand Up @@ -137,9 +137,6 @@ public DebeziumSourceFunction<T> build() {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
break;
case INITIAL_ONLY:
props.setProperty("snapshot.mode", "initial_only");
break;
case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
break;
Expand Down
Loading

0 comments on commit ae7d376

Please sign in to comment.