Skip to content

Commit

Permalink
consumer(ticdc): partition resolved ts fallback ignore caused by read…
Browse files Browse the repository at this point in the history
… offset message (#11412)

close #11413
  • Loading branch information
3AceShowHand authored Jul 30, 2024
1 parent 2fcbb31 commit 65e6dca
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,17 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool

watermark := atomic.LoadUint64(&progress.watermark)
if ts < watermark {
log.Panic("partition resolved ts fallback, skip it",
if message.TopicPartition.Offset > progress.watermarkOffset {
log.Panic("partition resolved ts fallback, skip it",
zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", partition))
}
log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message",
zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", partition))
continue
}

for tableID, group := range eventGroup {
Expand Down

0 comments on commit 65e6dca

Please sign in to comment.