diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java index d2e5409f..6caaa6a8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java @@ -68,6 +68,14 @@ public JsonNode value() { return value; } + public Long cdcSourceTsMsValue(String cdcSourceTsMsField) { + return value().get(cdcSourceTsMsField).asLong(0); + } + + public String cdcOpValue(String cdcOpField) { + return value().get(cdcOpField).asText("c"); + } + public SchemaConverter schemaConverter() { try { return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index b6ba62cc..e45f95e5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -64,7 +64,7 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) { // deduplicate using key(PK) deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> { - if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) { + if (this.compareByTsThenOp(oldValue, newValue) <= 0) { return newValue; } else { return oldValue; @@ -89,15 +89,15 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) { * @param rhs * @return */ - private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { + private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) { - int result = Long.compare(lhs.get(cdcSourceTsMsField).asLong(0), rhs.get(cdcSourceTsMsField).asLong(0)); + int result = Long.compare(lhs.cdcSourceTsMsValue(cdcSourceTsMsField), rhs.cdcSourceTsMsValue(cdcSourceTsMsField)); if (result == 0) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); - result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.get(cdcOpField).asText("c"), -1) + result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(cdcOpField), -1) .compareTo( - CDC_OPERATION_PRIORITY.getOrDefault(rhs.get(cdcOpField).asText("c"), -1) + CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(cdcOpField), -1) ); }