diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java index da990415..3101d58a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java @@ -29,6 +29,7 @@ import java.util.*; import static io.debezium.server.iceberg.IcebergChangeConsumer.*; +import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.cdcOpField; /** * Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema. @@ -41,7 +42,6 @@ public class RecordConverter { protected static final Logger LOGGER = LoggerFactory.getLogger(RecordConverter.class); public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt(); - protected static final String cdcOpField = "__op"; protected final String destination; protected final byte[] valueData; protected final byte[] keyData; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index bef6f2f7..01f81d04 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -41,7 +41,7 @@ public class IcebergTableOperator { static final ImmutableMap CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1, Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); - protected static final String cdcOpField = "__op"; + public static final String cdcOpField = "__op"; @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String cdcSourceTsMsField; @ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")