Skip to content

Commit

Permalink
[oracle] Use oracle connection in context for each reader subtask (#2254
Browse files Browse the repository at this point in the history
) (#2258)
  • Loading branch information
ruanhang1993 authored Jul 3, 2023
1 parent 4c1a19c commit bb3b081
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
@Override
public OracleSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final OracleConnection jdbcConnection =
createOracleConnection(taskSourceConfig.getDbzConfiguration());
return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection);
return new OracleSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
Expand All @@ -59,7 +58,6 @@
import java.util.Map;

import static com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.RedoLogSplitReadTask;
import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.currentRedoLogOffset;
import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.buildSplitScanQuery;
import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.readTableSplitDataStatement;
Expand Down Expand Up @@ -163,10 +161,6 @@ private StreamSplit createBackfillRedoLogSplit(

private RedoLogSplitReadTask createBackfillRedoLogReadTask(
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
OracleConnectorConfig oracleConnectorConfig =
context.getSourceConfig().getDbzConnectorConfig();
final OffsetContext.Loader<OracleOffsetContext> loader =
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
Expand All @@ -180,7 +174,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
// task to read binlog and backfill for current split
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
context.getConnection(),
context.getDispatcher(),
context.getErrorHandler(),
context.getDatabaseSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.time.Instant;
import java.util.Map;

import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn;

/** The context for fetch task that fetching data of snapshot split from Oracle data source. */
Expand All @@ -90,11 +91,9 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private OracleErrorHandler errorHandler;

public OracleSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
OracleConnection connection) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.connection = createOracleConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new OracleEventMetadataProvider();
}

Expand All @@ -108,7 +107,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
.getDbzConfiguration()
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
sourceSplitBase.getTableSchemas().values());
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig);
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig, connection);
// todo logMiner or xStream
this.offsetContext =
loadStartingOffsetState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDefaultValueConverter;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.StreamingAdapter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
Expand Down Expand Up @@ -243,12 +241,9 @@ public static PreparedStatement readTableSplitDataStatement(

/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig) {
OracleConnectorConfig dbzOracleConfig, OracleConnection oracleConnection) {
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
// OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
OracleValueConverters oracleValueConverters =
new OracleValueConverters(dbzOracleConfig, oracleConnection);
OracleDefaultValueConverter defaultValueConverter =
Expand All @@ -264,31 +259,6 @@ public static OracleDatabaseSchema createOracleDatabaseSchema(
tableNameCaseSensitivity);
}

/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) {
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(
JdbcConfiguration.adapt((Configuration) dbzOracleConfig));
OracleValueConverters oracleValueConverters =
new OracleValueConverters(dbzOracleConfig, oracleConnection);
OracleDefaultValueConverter defaultValueConverter =
new OracleDefaultValueConverter(oracleValueConverters, oracleConnection);
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
tableIdCaseInsensitive
? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
: StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
return new OracleDatabaseSchema(
dbzOracleConfig,
oracleValueConverters,
defaultValueConverter,
schemaNameAdjuster,
topicSelector,
tableNameCaseSensitivity);
}

public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
return getRedoLogPosition(dataRecord.sourceOffset());
}
Expand Down

0 comments on commit bb3b081

Please sign in to comment.