From d581e71178e907a6f91eda4b09295e608e23975c Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 5 Oct 2024 13:57:11 +0200 Subject: [PATCH] Use RecordWrapper --- .../java/io/debezium/server/iceberg/RecordConverter.java | 4 ++-- .../server/iceberg/tableoperator/IcebergTableOperator.java | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java index 3101d58a..0dca1d46 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java @@ -29,7 +29,7 @@ import java.util.*; import static io.debezium.server.iceberg.IcebergChangeConsumer.*; -import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.cdcOpField; +import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.CDC_OPERATION_FIELD_NAME; /** * Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema. @@ -112,7 +112,7 @@ public Operation cdcOpValue(String cdcOpField) { public RecordWrapper convert(Schema schema) { GenericRecord row = convert(schema.asStruct(), value()); - Operation op = cdcOpValue(cdcOpField); + Operation op = cdcOpValue(CDC_OPERATION_FIELD_NAME); return new RecordWrapper(row, op); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 01f81d04..688eebb5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -15,7 +15,6 @@ import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.iceberg.*; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.WriteResult; @@ -41,7 +40,7 @@ public class IcebergTableOperator { static final ImmutableMap CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1, Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); - public static final String cdcOpField = "__op"; + public static final String CDC_OPERATION_FIELD_NAME = "__op"; @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String cdcSourceTsMsField; @ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true") @@ -96,9 +95,9 @@ private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) { if (result == 0) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); - result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(cdcOpField), -1) + result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(CDC_OPERATION_FIELD_NAME), -1) .compareTo( - CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(cdcOpField), -1) + CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(CDC_OPERATION_FIELD_NAME), -1) ); }