From 151981319107ed0726af0ce892a3407e3abda9af Mon Sep 17 00:00:00 2001
From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com>
Date: Tue, 3 Sep 2024 15:04:08 +0200
Subject: [PATCH 1/2] Add code improvements
---
debezium-server-iceberg-sink/pom.xml | 5 ++++
.../tableoperator/BaseDeltaTaskWriter.java | 24 ++++++++++---------
.../IcebergTableWriterFactory.java | 5 ++--
.../tableoperator/PartitionedDeltaWriter.java | 4 ++--
.../UnpartitionedDeltaWriter.java | 4 ++--
.../server/iceberg/TestConfigSource.java | 1 +
pom.xml | 2 +-
7 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml
index 3eb2a626..6acd7295 100644
--- a/debezium-server-iceberg-sink/pom.xml
+++ b/debezium-server-iceberg-sink/pom.xml
@@ -133,6 +133,11 @@
iceberg-azure-bundle
${version.iceberg}
+
+ org.apache.iceberg
+ iceberg-bundled-guava
+ ${version.iceberg}
+
com.google.cloud.bigdataoss
diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
index d7b950ec..c3d4dfcc 100644
--- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
+++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
@@ -12,7 +12,7 @@
import org.apache.iceberg.types.TypeUtil;
import java.io.IOException;
-import java.util.List;
+import java.util.Set;
import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;
@@ -32,7 +32,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
- List equalityFieldIds,
+ Set equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
@@ -59,16 +59,18 @@ public void write(Record row) throws IOException {
"This field is required when updating or deleting data, when running in upsert mode."
);
}
- if (upsert && !opFieldValue.equals("c")) {// anything which not an insert is upsert
- writer.delete(row);
- }
- // if its deleted row and upsertKeepDeletes = true then add deleted record to target table
- // else deleted records are deleted from target table
- if (
- upsertKeepDeletes
- || !(opFieldValue.equals("d")))// anything which not an insert is upsert
- {
+ if (!upsert) {
+ // APPEND ONLY MODE!!
writer.write(row);
+ } else {
+ // UPSERT MODE
+ if (!opFieldValue.equals("c")) {// anything which not created is deleted first
+ writer.delete(row);
+ }
+ // when upsertKeepDeletes = FALSE we dont keep deleted record
+ if (upsertKeepDeletes || !opFieldValue.equals("d")) {
+ writer.write(row);
+ }
}
}
diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
index ca24631f..798543bf 100644
--- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
+++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
@@ -2,8 +2,7 @@
import io.debezium.server.iceberg.IcebergUtil;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Set;
import jakarta.enterprise.context.Dependent;
import org.apache.iceberg.FileFormat;
@@ -37,7 +36,7 @@ public BaseTaskWriter create(Table icebergTable) {
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
- List equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());
+ Set equalityFieldIds = icebergTable.schema().identifierFieldIds();
BaseTaskWriter writer;
diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java
index 2530b693..1d168837 100644
--- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java
+++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java
@@ -2,8 +2,8 @@
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
@@ -29,7 +29,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
- List equalityFieldIds,
+ Set equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java
index cdfb89f9..32b58e8d 100644
--- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java
+++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java
@@ -1,7 +1,7 @@
package io.debezium.server.iceberg.tableoperator;
import java.io.IOException;
-import java.util.List;
+import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -21,7 +21,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
- List equalityFieldIds,
+ Set equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
index ec74f28f..8c8f16be 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
@@ -75,6 +75,7 @@ public TestConfigSource() {
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.replica.identity.autoset.values", "inventory.*:FULL");
+ config.put("quarkus.devservices.enabled", "false");
config.put("quarkus.log.level", "WARN");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
config.put("quarkus.log.category.\"org.apache.hadoop\".level", "ERROR");
diff --git a/pom.xml b/pom.xml
index bdb1d6aa..96d4c349 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
3.7.1
2.14.2
- 1.6.0
+ 1.6.1
4.0
From ff16717c4f5045de3e93417f1c527d2af7eff95a Mon Sep 17 00:00:00 2001
From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com>
Date: Wed, 4 Sep 2024 15:17:56 +0200
Subject: [PATCH 2/2] Add code improvements
---
.../server/iceberg/tableoperator/IcebergTableWriterFactory.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
index 798543bf..f7190e80 100644
--- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
+++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java
@@ -37,7 +37,6 @@ public BaseTaskWriter create(Table icebergTable) {
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
Set equalityFieldIds = icebergTable.schema().identifierFieldIds();
-
BaseTaskWriter writer;
// 1. TABLE DONT HAVE identifierFieldIds