Skip to content

Commit

Permalink
consuming: bugfix for consuming compacted topics
Browse files Browse the repository at this point in the history
See KAFKA-5443: a compacted topic can have later records removed from
within a batch, but the last offset delta is preserved. We need to use
the last offset delta for a batch rather than just relying on consumed
records. If we do not, we could ask for 144, get 143 while having a last
offset delta of 145, and never advance since we discard 143 and do
nothing about 145.
  • Loading branch information
twmb committed Feb 14, 2021
1 parent c517009 commit 4979b52
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ func (o *cursorOffsetNext) processRecordBatch(
return
}
}

lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta)
if lastOffset < o.offset {
// If the last offset in this batch is less than what we asked
// for, we got a batch that we entirely do not need. We can
// avoid all work (although we should not get this batch).
return
}

krecords, err := kmsg.ReadRecords(int(batch.NumRecords), rawRecords)
if err != nil {
fp.Err = fmt.Errorf("invalid record batch: %v", err)
Expand All @@ -880,6 +889,15 @@ func (o *cursorOffsetNext) processRecordBatch(
o.maybeKeepRecord(fp, record, abortBatch)
}

nextAskOffset := lastOffset + 1
if o.offset < nextAskOffset {
// KAFKA-5443: compacted topics preserve the last offset in a
// batch, even if the last record is removed, meaning that
// using offsets from records alone may not get us to the next
// offset we need to ask for.
o.offset = nextAskOffset
}

if abortBatch && lastRecord != nil && lastRecord.Attrs.IsControl() {
aborter.trackAbortedPID(batch.ProducerID)
}
Expand Down

0 comments on commit 4979b52

Please sign in to comment.