From 3f4c8761655b15ad1b9c4c4160cb6d4298e7d1a4 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 8 Sep 2024 10:40:55 +0200 Subject: [PATCH] Use UUID as operation id for OutputFileFactory --- .../debezium/server/iceberg/IcebergUtil.java | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index b3a2e44b..9042bfd8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -8,32 +8,35 @@ package io.debezium.server.iceberg; -import io.debezium.DebeziumException; - -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; - import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.primitives.Ints; +import io.debezium.DebeziumException; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.literal.NamedLiteral; -import org.apache.iceberg.*; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.OutputFileFactory; -import com.google.common.primitives.Ints; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.ConfigValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; + import static org.apache.iceberg.TableProperties.*; /** @@ -67,7 +70,7 @@ public static boolean configIncludesUnwrapSmt() { static boolean configIncludesUnwrapSmt(Config config) { // first lets find the config value for debezium statements ConfigValue stms = config.getConfigValue("debezium.transforms"); - if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()){ + if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()) { return false; } @@ -75,8 +78,8 @@ static boolean configIncludesUnwrapSmt(Config config) { final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$"; // we have debezium statements configured! let's check if we have event flattening config is set. for (String stmName : stmsList) { - ConfigValue stmVal = config.getConfigValue("debezium.transforms."+stmName+".type"); - if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)){ + ConfigValue stmVal = config.getConfigValue("debezium.transforms." + stmName + ".type"); + if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)) { return true; } } @@ -150,18 +153,33 @@ public static FileFormat getTableFileFormat(Table icebergTable) { } public static GenericAppenderFactory getTableAppender(Table icebergTable) { - return new GenericAppenderFactory( - icebergTable.schema(), - icebergTable.spec(), - Ints.toArray(icebergTable.schema().identifierFieldIds()), - icebergTable.schema(), - null); + final Set identifierFieldIds = icebergTable.schema().identifierFieldIds(); + if (identifierFieldIds == null || identifierFieldIds.isEmpty()) { + return new GenericAppenderFactory( + icebergTable.schema(), + icebergTable.spec(), + null, + null, + null) + .setAll(icebergTable.properties()); + } else { + return new GenericAppenderFactory( + icebergTable.schema(), + icebergTable.spec(), + Ints.toArray(identifierFieldIds), + icebergTable.schema(), + null) + .setAll(icebergTable.properties()); + } } public static OutputFileFactory getTableOutputFileFactory(Table icebergTable, FileFormat format) { return OutputFileFactory.builderFor(icebergTable, IcebergUtil.partitionId(), 1L) - .defaultSpec(icebergTable.spec()).format(format).build(); + .defaultSpec(icebergTable.spec()) + .operationId(java.util.UUID.randomUUID().toString()) + .format(format) + .build(); } public static int partitionId() {