Skip to content

Commit

Permalink
Fix, Create GenericAppenderFactory respecting to identifier fields. A…
Browse files Browse the repository at this point in the history
…nd create it with table properties (#413)
  • Loading branch information
ismailsimsek authored Sep 8, 2024
1 parent e3b3097 commit 836f319
Showing 1 changed file with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,35 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import io.debezium.DebeziumException;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.apache.iceberg.*;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.OutputFileFactory;
import com.google.common.primitives.Ints;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;

import static org.apache.iceberg.TableProperties.*;

/**
Expand Down Expand Up @@ -67,16 +70,16 @@ public static boolean configIncludesUnwrapSmt() {
static boolean configIncludesUnwrapSmt(Config config) {
// first lets find the config value for debezium statements
ConfigValue stms = config.getConfigValue("debezium.transforms");
if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()){
if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()) {
return false;
}

String[] stmsList = stms.getValue().split(",");
final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$";
// we have debezium statements configured! let's check if we have event flattening config is set.
for (String stmName : stmsList) {
ConfigValue stmVal = config.getConfigValue("debezium.transforms."+stmName+".type");
if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)){
ConfigValue stmVal = config.getConfigValue("debezium.transforms." + stmName + ".type");
if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)) {
return true;
}
}
Expand Down Expand Up @@ -150,12 +153,24 @@ public static FileFormat getTableFileFormat(Table icebergTable) {
}

public static GenericAppenderFactory getTableAppender(Table icebergTable) {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
final Set<Integer> identifierFieldIds = icebergTable.schema().identifierFieldIds();
if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
null,
null,
null)
.setAll(icebergTable.properties());
} else {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(identifierFieldIds),
icebergTable.schema(),
null)
.setAll(icebergTable.properties());
}
}

public static OutputFileFactory getTableOutputFileFactory(Table icebergTable, FileFormat format) {
Expand Down

0 comments on commit 836f319

Please sign in to comment.