From e3859c973f3b3901498757f40ede61b37f68194b Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 8 Sep 2024 11:16:20 +0200 Subject: [PATCH] Rename variable equalityFieldIds to identifierFieldIds --- .../server/iceberg/tableoperator/BaseDeltaTaskWriter.java | 4 ++-- .../iceberg/tableoperator/IcebergTableWriterFactory.java | 6 +++--- .../iceberg/tableoperator/PartitionedDeltaWriter.java | 4 ++-- .../iceberg/tableoperator/UnpartitionedDeltaWriter.java | 4 ++-- .../server/iceberg/tableoperator/BaseWriterTest.java | 4 ++-- .../iceberg/tableoperator/UnpartitionedDeltaWriterTest.java | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java index 884d6a83..fbeb1c23 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java @@ -33,12 +33,12 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { FileIO io, long targetFileSize, Schema schema, - Set equalityFieldIds, + Set identifierFieldIds, boolean upsert, boolean upsertKeepDeletes) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds)); this.wrapper = new InternalRecordWrapper(schema.asStruct()); this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); this.keyProjection = RecordProjection.create(schema, deleteSchema); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java index c2a86b14..0bd64f09 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -40,7 +40,7 @@ public BaseTaskWriter create(Table icebergTable) { GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable); OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format); // equality Field Ids - Set equalityFieldIds = icebergTable.schema().identifierFieldIds(); + Set identifierFieldIds = icebergTable.schema().identifierFieldIds(); long targetFileSize = PropertyUtil.propertyAsLong( icebergTable.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); @@ -76,12 +76,12 @@ else if (icebergTable.spec().isUnpartitioned()) { // running with upsert mode + un partitioned table writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), - targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + targetFileSize, icebergTable.schema(), identifierFieldIds, true, upsertKeepDeletes); } else { // running with upsert mode + partitioned table writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), - targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + targetFileSize, icebergTable.schema(), identifierFieldIds, true, upsertKeepDeletes); } return writer; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java index 1d168837..707dc29e 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java @@ -29,10 +29,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { FileIO io, long targetFileSize, Schema schema, - Set equalityFieldIds, + Set identifierFieldIds, boolean upsert, boolean upsertKeepDeletes) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, identifierFieldIds, upsert, upsertKeepDeletes); this.partitionKey = new PartitionKey(spec, schema); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java index 32b58e8d..b7589c3b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java @@ -21,10 +21,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { FileIO io, long targetFileSize, Schema schema, - Set equalityFieldIds, + Set identifierFieldIds, boolean upsert, boolean upsertKeepDeletes) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, identifierFieldIds, upsert, upsertKeepDeletes); this.writer = new RowDataDeltaWriter(null); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/BaseWriterTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/BaseWriterTest.java index f9cc5143..712d0ed5 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/BaseWriterTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/BaseWriterTest.java @@ -24,7 +24,7 @@ public class BaseWriterTest { FileFormat format; GenericAppenderFactory appenderFactory; OutputFileFactory fileFactory; - Set equalityFieldIds; + Set identifierFieldIds; protected static final Schema SCHEMA = new Schema( @@ -55,7 +55,7 @@ public void before() { format = IcebergUtil.getTableFileFormat(table); appenderFactory = IcebergUtil.getTableAppender(table); fileFactory = IcebergUtil.getTableOutputFileFactory(table, format); - equalityFieldIds = table.schema().identifierFieldIds(); + identifierFieldIds = table.schema().identifierFieldIds(); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriterTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriterTest.java index 8e4e5637..296e0f42 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriterTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriterTest.java @@ -14,7 +14,7 @@ class UnpartitionedDeltaWriterTest extends BaseWriterTest { public void testUnpartitionedDeltaWriter() throws IOException { UnpartitionedDeltaWriter writer = new UnpartitionedDeltaWriter(table.spec(), format, appenderFactory, fileFactory, table.io(), - Long.MAX_VALUE, table.schema(), equalityFieldIds, true, true); + Long.MAX_VALUE, table.schema(), identifierFieldIds, true, true); Record row = GenericRecord.create(SCHEMA); row.setField("id", "123");