Skip to content

Commit

Permalink
Add log line for committing/retrieving watermarks in streaming (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-moseley authored and phet committed Dec 3, 2022
1 parent 07a7303 commit 3102ff1
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void commitWatermarks(Iterable<CheckpointableWatermark> watermarks)
for (CheckpointableWatermark watermark: watermarks) {
String tableName = watermark.getSource();
_stateStore.put(_storeName, tableName, new CheckpointableWatermarkState(watermark, GSON));
log.info("Committed watermark {} for table {}", watermark.getWatermark().toString(), tableName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<Kafk
longWatermarkMap.put(topicPartition, new LongWatermark(0L));
}
}
for (Map.Entry<KafkaPartition, LongWatermark> entry : longWatermarkMap.entrySet()) {
log.info("Retrieved watermark {} for partition {}", entry.getValue().toString(), entry.getKey().toString());
}
return longWatermarkMap;
}

Expand Down

0 comments on commit 3102ff1

Please sign in to comment.