Skip to content

Commit

Permalink
Add code improvements (#409)
Browse files Browse the repository at this point in the history
* Add code improvements

* Add code improvements
  • Loading branch information
ismailsimsek authored Sep 4, 2024
1 parent 3f7e469 commit 6e553bb
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 20 deletions.
5 changes: 5 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>iceberg-azure-bundle</artifactId>
<version>${version.iceberg}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${version.iceberg}</version>
</dependency>
<!-- Google -->
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.iceberg.types.TypeUtil;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;

Expand All @@ -32,7 +32,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
Expand All @@ -59,16 +59,18 @@ public void write(Record row) throws IOException {
"This field is required when updating or deleting data, when running in upsert mode."
);
}
if (upsert && !opFieldValue.equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
if (
upsertKeepDeletes
|| !(opFieldValue.equals("d")))// anything which not an insert is upsert
{
if (!upsert) {
// APPEND ONLY MODE!!
writer.write(row);
} else {
// UPSERT MODE
if (!opFieldValue.equals("c")) {// anything which not created is deleted first
writer.delete(row);
}
// when upsertKeepDeletes = FALSE we dont keep deleted record
if (upsertKeepDeletes || !opFieldValue.equals("d")) {
writer.write(row);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import io.debezium.server.iceberg.IcebergUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import jakarta.enterprise.context.Dependent;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -37,8 +36,7 @@ public BaseTaskWriter<Record> create(Table icebergTable) {
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
List<Integer> equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());

Set<Integer> equalityFieldIds = icebergTable.schema().identifierFieldIds();
BaseTaskWriter<Record> writer;

// 1. TABLE DONT HAVE identifierFieldIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
Expand All @@ -29,7 +29,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.debezium.server.iceberg.tableoperator;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -21,7 +21,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public TestConfigSource() {
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.replica.identity.autoset.values", "inventory.*:FULL");

config.put("quarkus.devservices.enabled", "false");
config.put("quarkus.log.level", "WARN");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
config.put("quarkus.log.category.\"org.apache.hadoop\".level", "ERROR");
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<version.assembly.plugin>3.7.1</version.assembly.plugin>
<!-- Use same version as iceberg https://github.com/apache/iceberg/blob/main/gradle/libs.versions.toml#L53-->
<version.jackson>2.14.2</version.jackson>
<version.iceberg>1.6.0</version.iceberg>
<version.iceberg>1.6.1</version.iceberg>
<!-- Following two properties defines which version of iceberg-spark-runtime is used -->
<!-- Example https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.4_2.13/1.4.3 -->
<version.spark.major>4.0</version.spark.major>
Expand Down

0 comments on commit 6e553bb

Please sign in to comment.