Skip to content

Commit

Permalink
Improve deduplication code (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Sep 5, 2024
1 parent a7f541c commit 2a40003
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public JsonNode value() {
return value;
}

public Long cdcSourceTsMsValue(String cdcSourceTsMsField) {
return value().get(cdcSourceTsMsField).asLong(0);
}

public String cdcOpValue(String cdcOpField) {
return value().get(cdcOpField).asText("c");
}

public SchemaConverter schemaConverter() {
try {
return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {

// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) {
if (this.compareByTsThenOp(oldValue, newValue) <= 0) {
return newValue;
} else {
return oldValue;
Expand All @@ -89,15 +89,15 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {
* @param rhs
* @return
*/
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {
private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) {

int result = Long.compare(lhs.get(cdcSourceTsMsField).asLong(0), rhs.get(cdcSourceTsMsField).asLong(0));
int result = Long.compare(lhs.cdcSourceTsMsValue(cdcSourceTsMsField), rhs.cdcSourceTsMsValue(cdcSourceTsMsField));

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.get(cdcOpField).asText("c"), -1)
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(cdcOpField), -1)
.compareTo(
CDC_OPERATION_PRIORITY.getOrDefault(rhs.get(cdcOpField).asText("c"), -1)
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(cdcOpField), -1)
);
}

Expand Down

0 comments on commit 2a40003

Please sign in to comment.