Skip to content

Commit

Permalink
oralce
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyun2024 committed Jun 5, 2023
1 parent 8dc46cc commit 061602d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void execute(Context context) throws Exception {
// execute redoLog read task
if (snapshotResult.isCompletedOrSkipped()) {
final LogMinerOracleOffsetContextLoader loader =
new LogMinerOracleOffsetContextLoader(sourceFetchContext.getDbzConnectorConfig());
new LogMinerOracleOffsetContextLoader(
sourceFetchContext.getDbzConnectorConfig());
final OracleOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.ververica.cdc.connectors.oracle.source.reader.fetch;

import io.debezium.pipeline.EventDispatcher;
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
Expand Down Expand Up @@ -44,6 +43,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@

package com.ververica.cdc.connectors.oracle.source.read.fetch;

import com.ververica.cdc.connectors.oracle.source.OracleDialect;
import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.connectors.oracle.utils.RecordsFormatter;
import io.debezium.connector.oracle.OracleConnection;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

Expand All @@ -34,6 +25,15 @@
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import com.ververica.cdc.connectors.oracle.source.OracleDialect;
import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.connectors.oracle.utils.RecordsFormatter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.data.Envelope;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
Expand Down Expand Up @@ -64,25 +64,23 @@ public class OracleScanFetchTaskTest extends OracleSourceTestBase {

@Test
public void testChangingDataInSnapshotScan() throws Exception {
OracleTestUtils.createAndInitialize(
OracleTestUtils.ORACLE_CONTAINER, "customer.sql");
OracleTestUtils.createAndInitialize(OracleTestUtils.ORACLE_CONTAINER, "customer.sql");

String tableName = ORACLE_SCHEMA + ".customers";
String tableName = ORACLE_SCHEMA + ".CUSTOMERS";

OracleSourceConfigFactory sourceConfigFactory =
getConfigFactory( new String[] {tableName}, 10);
getConfigFactory(new String[] {tableName}, 10);
OracleSourceConfig sourceConfig = sourceConfigFactory.create(0);
OracleDialect oracleDialect = new OracleDialect(sourceConfigFactory);

String tableId = ORACLE_DATABASE + "." + tableName;
String[] changingDataSql =
new String[] {
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110",
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111",
"UPDATE " + tableName + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableName + " where id = 102",
"INSERT INTO " + tableName + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableName + " SET address = 'Shanghai' where id = 103",
"UPDATE " + tableName + " SET address = 'Hangzhou' where id = 110",
"UPDATE " + tableName + " SET address = 'Hangzhou' where id = 111",
};

MakeChangeEventTaskContext makeChangeEventTaskContext =
Expand All @@ -95,10 +93,11 @@ public void testChangingDataInSnapshotScan() throws Exception {

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
DataTypes.FIELD("ID", DataTypes.BIGINT()),
DataTypes.FIELD("NAME", DataTypes.STRING()),
DataTypes.FIELD("ADDRESS", DataTypes.STRING()),
DataTypes.FIELD("PHONE_NUMBER", DataTypes.STRING()));

List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, oracleDialect);

String[] expected =
Expand Down Expand Up @@ -174,7 +173,8 @@ private List<SnapshotSplit> getSnapshotSplits(
return snapshotSplitList;
}

public static OracleSourceConfigFactory getConfigFactory(String[] captureTables, int splitSize) {
public static OracleSourceConfigFactory getConfigFactory(
String[] captureTables, int splitSize) {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
Expand Down Expand Up @@ -240,7 +240,7 @@ public void completeSnapshot() throws InterruptedException {
snapshotReceiver.completeSnapshot();
// make change events
makeChangeEventFunction.get();
Thread.sleep(10 * 1000);
Thread.sleep(120 * 1000);
}
};
}
Expand Down

0 comments on commit 061602d

Please sign in to comment.