Skip to content

Commit

Permalink
[mysql] Support tables which do not contain a primary key (apache#2150)
Browse files Browse the repository at this point in the history
Co-authored-by: zhuyuan03 <[email protected]>
  • Loading branch information
2 people authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent 9688b6a commit be59212
Show file tree
Hide file tree
Showing 16 changed files with 546 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.slf4j.Logger;
Expand All @@ -43,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;

import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;

Expand Down Expand Up @@ -135,6 +138,20 @@ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
}
}

/** Create a TableFilter by database name and table name. */
public static Tables.TableFilter createTableFilter(String database, String table) {
final Selectors.TableSelectionPredicateBuilder eligibleTables =
Selectors.tableSelector().includeDatabases(database);

Predicate<TableId> tablePredicate = eligibleTables.includeTables(table).build();

Predicate<TableId> finalTablePredicate =
tablePredicate.and(
Tables.TableFilter.fromPredicate(MySqlConnectorConfig::isNotBuiltInTable)
::isIncluded);
return finalTablePredicate::test;
}

// --------------------------------------------------------------------------------------------

private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,6 +62,7 @@

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;

Expand Down Expand Up @@ -226,20 +228,21 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
if (hasEnterPureBinlogPhase(tableId, position)) {
return true;
}

// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo.containsKey(tableId)) {
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumn());
Object[] key =
statefulTaskContext.getSourceConfig().getChunkKeyColumns());

Struct target = getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
getSplitKey(
splitKeyType,
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster());
splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(
key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
&& position.isAfter(splitInfo.getHighWatermark())) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isLowWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.upsertBinlog;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -248,7 +249,8 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
boolean reachBinlogEnd = false;
SourceRecord lowWatermark = null;
SourceRecord highWatermark = null;
Map<Struct, SourceRecord> snapshotRecords = new LinkedHashMap<>();

Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
while (!reachBinlogEnd) {
checkReadException();
List<DataChangeEvent> batch = queue.poll();
Expand All @@ -274,12 +276,23 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
}

if (!reachBinlogStart) {
snapshotRecords.put((Struct) record.key(), record);
} else {
if (isRequiredBinlogRecord(record)) {
// upsert binlog events through the record key
upsertBinlog(snapshotRecords, record);
if (record.key() != null) {
snapshotRecords.put(
(Struct) record.key(), Collections.singletonList(record));
} else {
List<SourceRecord> records =
snapshotRecords.computeIfAbsent(
(Struct) record.value(), key -> new LinkedList<>());
records.add(record);
}
} else {
upsertBinlog(
snapshotRecords,
record,
currentSnapshotSplit.getSplitKeyType(),
nameAdjuster,
currentSnapshotSplit.getSplitStart(),
currentSnapshotSplit.getSplitEnd());
}
}
}
Expand All @@ -288,7 +301,11 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {

final List<SourceRecord> normalizedRecords = new ArrayList<>();
normalizedRecords.add(lowWatermark);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.addAll(
formatMessageTimestamp(
snapshotRecords.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList())));
normalizedRecords.add(highWatermark);

final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
Expand Down Expand Up @@ -318,16 +335,6 @@ private void assertLowWatermark(SourceRecord lowWatermark) {
lowWatermark));
}

private boolean isRequiredBinlogRecord(SourceRecord record) {
if (isDataChangeRecord(record)) {
Object[] key =
getSplitKey(currentSnapshotSplit.getSplitKeyType(), record, nameAdjuster);
return splitKeyRangeContains(
key, currentSnapshotSplit.getSplitStart(), currentSnapshotSplit.getSplitEnd());
}
return false;
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ververica.cdc.connectors.mysql.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.ObjectPath;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
Expand Down Expand Up @@ -123,8 +124,8 @@ public MySqlSourceBuilder<T> serverTimeZone(String timeZone) {
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public MySqlSourceBuilder<T> chunkKeyColumn(String chunkKeyColumn) {
this.configFactory.chunkKeyColumn(chunkKeyColumn);
public MySqlSourceBuilder<T> chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
this.configFactory.chunkKeyColumn(objectPath, chunkKeyColumn);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void analyzeTable(TableId tableId) {
currentSplittingTable = mySqlSchema.getTableSchema(jdbcConnection, tableId).getTable();
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumn());
currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitType = ChunkUtils.getChunkKeyColumnType(splitColumn);
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.ververica.cdc.connectors.mysql.source.config;

import org.apache.flink.table.catalog.ObjectPath;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import io.debezium.config.Configuration;
Expand All @@ -27,6 +29,7 @@
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -55,7 +58,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
@Nullable private final String chunkKeyColumn;
private final Map<ObjectPath, String> chunkKeyColumns;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -86,7 +89,7 @@ public class MySqlSourceConfig implements Serializable {
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties,
Properties jdbcProperties,
@Nullable String chunkKeyColumn) {
Map<ObjectPath, String> chunkKeyColumns) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand All @@ -110,7 +113,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
this.chunkKeyColumn = chunkKeyColumn;
this.chunkKeyColumns = chunkKeyColumns;
}

public String getHostname() {
Expand Down Expand Up @@ -210,8 +213,7 @@ public Properties getJdbcProperties() {
return jdbcProperties;
}

@Nullable
public String getChunkKeyColumn() {
return chunkKeyColumn;
public Map<ObjectPath, String> getChunkKeyColumns() {
return chunkKeyColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ververica.cdc.connectors.mysql.source.config;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ObjectPath;

import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
Expand All @@ -26,7 +27,9 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

Expand Down Expand Up @@ -71,7 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Properties jdbcProperties;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
private String chunkKeyColumn;
private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -145,8 +148,17 @@ public MySqlSourceConfigFactory serverTimeZone(String timeZone) {
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public MySqlSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
this.chunkKeyColumn = chunkKeyColumn;
public MySqlSourceConfigFactory chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public MySqlSourceConfigFactory chunkKeyColumn(Map<ObjectPath, String> chunkKeyColumns) {
this.chunkKeyColumns.putAll(chunkKeyColumns);
return this;
}

Expand Down Expand Up @@ -332,6 +344,6 @@ public MySqlSourceConfig createConfig(int subtaskId) {
scanNewlyAddedTableEnabled,
props,
jdbcProperties,
chunkKeyColumn);
chunkKeyColumns);
}
}
Loading

0 comments on commit be59212

Please sign in to comment.