Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code improvements #409

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>iceberg-azure-bundle</artifactId>
<version>${version.iceberg}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${version.iceberg}</version>
</dependency>
<!-- Google -->
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.iceberg.types.TypeUtil;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;

Expand All @@ -32,7 +32,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
Expand All @@ -59,16 +59,18 @@ public void write(Record row) throws IOException {
"This field is required when updating or deleting data, when running in upsert mode."
);
}
if (upsert && !opFieldValue.equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
if (
upsertKeepDeletes
|| !(opFieldValue.equals("d")))// anything which not an insert is upsert
{
if (!upsert) {
// APPEND ONLY MODE!!
writer.write(row);
} else {
// UPSERT MODE
if (!opFieldValue.equals("c")) {// anything which not created is deleted first
writer.delete(row);
}
// when upsertKeepDeletes = FALSE we dont keep deleted record
if (upsertKeepDeletes || !opFieldValue.equals("d")) {
writer.write(row);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import io.debezium.server.iceberg.IcebergUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import jakarta.enterprise.context.Dependent;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -37,8 +36,7 @@ public BaseTaskWriter<Record> create(Table icebergTable) {
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
List<Integer> equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());

Set<Integer> equalityFieldIds = icebergTable.schema().identifierFieldIds();
BaseTaskWriter<Record> writer;

// 1. TABLE DONT HAVE identifierFieldIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
Expand All @@ -29,7 +29,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.debezium.server.iceberg.tableoperator;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -21,7 +21,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
FileIO io,
long targetFileSize,
Schema schema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert,
boolean upsertKeepDeletes) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public TestConfigSource() {
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.replica.identity.autoset.values", "inventory.*:FULL");

config.put("quarkus.devservices.enabled", "false");
config.put("quarkus.log.level", "WARN");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
config.put("quarkus.log.category.\"org.apache.hadoop\".level", "ERROR");
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<version.assembly.plugin>3.7.1</version.assembly.plugin>
<!-- Use same version as iceberg https://github.com/apache/iceberg/blob/main/gradle/libs.versions.toml#L53-->
<version.jackson>2.14.2</version.jackson>
<version.iceberg>1.6.0</version.iceberg>
<version.iceberg>1.6.1</version.iceberg>
<!-- Following two properties defines which version of iceberg-spark-runtime is used -->
<!-- Example https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.4_2.13/1.4.3 -->
<version.spark.major>4.0</version.spark.major>
Expand Down