Skip to content

Commit

Permalink
[Improve][CDC] Close idle subtasks gorup(reader/writer) in increment …
Browse files Browse the repository at this point in the history
…phase (apache#6526)
  • Loading branch information
hailin0 authored and liunaijie committed May 24, 2024
1 parent 88d1c5c commit 99e2832
Show file tree
Hide file tree
Showing 49 changed files with 541 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
}

/** Get a connection pool factory to create connection pool. */
JdbcConnectionPoolFactory getPooledDataSourceFactory();
default JdbcConnectionPoolFactory getPooledDataSourceFactory() {
throw new UnsupportedOperationException();
}

/** Query and build the schema of table. */
TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public Optional<SourceSplitBase> getNext() {

@Override
public boolean waitingForCompletedSplits() {
return snapshotSplitAssigner.waitingForCompletedSplits();
return snapshotSplitAssigner.waitingForCompletedSplits()
|| incrementalSplitAssigner.waitingForAssignedSplits();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,16 @@ private void assignSplits() {
awaitingReader.remove();
LOG.debug("Assign split {} to subtask {}", sourceSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
break;
if (splitAssigner.waitingForCompletedSplits()) {
// there is no available splits by now, skip assigning
break;
} else {
LOG.info(
"No more splits available, signal no more splits to subtask {}",
nextAwaiting);
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,8 @@ public boolean completedSnapshotPhase(List<TableId> tableIds) {
return context.getAssignedSnapshotSplit().isEmpty()
&& context.getSplitCompletedOffsets().isEmpty();
}

public boolean waitingForAssignedSplits() {
return !(splitAssigned && noMoreSplits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ public void pollNext(Collector<T> output) throws Exception {
context.sendSplitRequest();
needSendSplitRequest.compareAndSet(true, false);
}
super.pollNext(output);

if (isNoMoreSplitsAssignment() && isNoMoreElement()) {
log.info("Reader {} send NoMoreElement event", context.getIndexOfSubtask());
context.signalNoMoreElement();
} else {
super.pollNext(output);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand All @@ -34,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
Expand Down Expand Up @@ -70,20 +70,24 @@ public String getName() {
@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return isTableIdCaseSensitive(jdbcConnection);
return isDataCollectionIdCaseSensitive(jdbcConnection);
} catch (SQLException e) {
throw new SeaTunnelException("Error reading MySQL variables: " + e.getMessage(), e);
}
}

private boolean isDataCollectionIdCaseSensitive(JdbcConnection jdbcConnection) {
return isTableIdCaseSensitive(jdbcConnection);
}

@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new MySqlChunkSplitter(sourceConfig, this);
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
return MySqlConnectionUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new MysqlPooledDataSourceFactory();
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new MySqlChunkSplitter(sourceConfig, this);
}

@Override
Expand All @@ -101,8 +105,7 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (mySqlSchema == null) {
mySqlSchema =
new MySqlSchema(
sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
new MySqlSchema(sourceConfig, isDataCollectionIdCaseSensitive(jdbc), tableMap);
}
return mySqlSchema.getTableSchema(jdbc, tableId);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
Expand All @@ -40,12 +41,15 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;

@Slf4j
public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
private final IncrementalSplit split;
private volatile boolean taskRunning = false;
Expand All @@ -72,6 +76,22 @@ public void execute(FetchTask.Context context) throws Exception {
BinlogSplitChangeEventSourceContext changeEventSourceContext =
new BinlogSplitChangeEventSourceContext();

sourceFetchContext
.getBinaryLogClient()
.registerLifecycleListener(
new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onConnect(BinaryLogClient client) {
try {
sourceFetchContext.getConnection().close();
log.info(
"Binlog client connected, closed idle jdbc connection.");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
});

mySqlStreamingChangeEventSource.execute(
changeEventSourceContext, sourceFetchContext.getOffsetContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand Down Expand Up @@ -90,11 +89,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new OracleChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new OraclePooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
OracleSourceConfig oracleSourceConfig = (OracleSourceConfig) sourceConfig;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
Expand Down Expand Up @@ -96,11 +95,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new PostgresChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new PostgresPooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) sourceConfig;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand Down Expand Up @@ -85,11 +84,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new SqlServerChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new SqlServerPooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig) sourceConfig;
Expand Down

This file was deleted.

Loading

0 comments on commit 99e2832

Please sign in to comment.