diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java index 9737bfc62a0..7df2b3c7984 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java @@ -141,9 +141,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { @Override public OracleSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final OracleConnection jdbcConnection = - createOracleConnection(taskSourceConfig.getDbzConfiguration()); - return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection); + return new OracleSourceFetchTaskContext(taskSourceConfig, this); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index a36cf1ff7a4..68a71d584f9 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -37,7 +37,6 @@ import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.ChangeRecordEmitter; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.SnapshotChangeRecordEmitter; @@ -59,7 +58,6 @@ import java.util.Map; import static com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.RedoLogSplitReadTask; -import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection; import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.currentRedoLogOffset; import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.buildSplitScanQuery; import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.readTableSplitDataStatement; @@ -163,10 +161,6 @@ private StreamSplit createBackfillRedoLogSplit( private RedoLogSplitReadTask createBackfillRedoLogReadTask( StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) { - OracleConnectorConfig oracleConnectorConfig = - context.getSourceConfig().getDbzConnectorConfig(); - final OffsetContext.Loader loader = - new LogMinerOracleOffsetContextLoader(oracleConnectorConfig); // we should only capture events for the current table, // otherwise, we may can't find corresponding schema Configuration dezConf = @@ -180,7 +174,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask( // task to read binlog and backfill for current split return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), - createOracleConnection(context.getSourceConfig().getDbzConfiguration()), + context.getConnection(), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 5d8c7794b4a..4a9636419bb 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -67,6 +67,7 @@ import java.time.Instant; import java.util.Map; +import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection; import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn; /** The context for fetch task that fetching data of snapshot split from Oracle data source. */ @@ -90,11 +91,9 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private OracleErrorHandler errorHandler; public OracleSourceFetchTaskContext( - JdbcSourceConfig sourceConfig, - JdbcDataSourceDialect dataSourceDialect, - OracleConnection connection) { + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) { super(sourceConfig, dataSourceDialect); - this.connection = connection; + this.connection = createOracleConnection(sourceConfig.getDbzConfiguration()); this.metadataProvider = new OracleEventMetadataProvider(); } @@ -108,7 +107,7 @@ public void configure(SourceSplitBase sourceSplitBase) { .getDbzConfiguration() .getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), sourceSplitBase.getTableSchemas().values()); - this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig); + this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig, connection); // todo logMiner or xStream this.offsetContext = loadStartingOffsetState( diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java index 275e170d03b..96d7b882258 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java @@ -19,7 +19,6 @@ import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; @@ -27,7 +26,6 @@ import io.debezium.connector.oracle.OracleTopicSelector; import io.debezium.connector.oracle.OracleValueConverters; import io.debezium.connector.oracle.StreamingAdapter; -import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; @@ -243,12 +241,9 @@ public static PreparedStatement readTableSplitDataStatement( /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig) { + OracleConnectorConfig dbzOracleConfig, OracleConnection oracleConnection) { TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig()); - // OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig); OracleValueConverters oracleValueConverters = new OracleValueConverters(dbzOracleConfig, oracleConnection); OracleDefaultValueConverter defaultValueConverter = @@ -264,31 +259,6 @@ public static OracleDatabaseSchema createOracleDatabaseSchema( tableNameCaseSensitivity); } - /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ - public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) { - TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); - SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection( - JdbcConfiguration.adapt((Configuration) dbzOracleConfig)); - OracleValueConverters oracleValueConverters = - new OracleValueConverters(dbzOracleConfig, oracleConnection); - OracleDefaultValueConverter defaultValueConverter = - new OracleDefaultValueConverter(oracleValueConverters, oracleConnection); - StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = - tableIdCaseInsensitive - ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE - : StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE; - return new OracleDatabaseSchema( - dbzOracleConfig, - oracleValueConverters, - defaultValueConverter, - schemaNameAdjuster, - topicSelector, - tableNameCaseSensitivity); - } - public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) { return getRedoLogPosition(dataRecord.sourceOffset()); }