Skip to content

Commit

Permalink
kgo consumer: do not use the partition epoch when assigning offsets
Browse files Browse the repository at this point in the history
See comment; this is a little bit confusing to explain though.
  • Loading branch information
twmb committed Apr 4, 2023
1 parent c5d9afb commit 055b349
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,13 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}

// If an exact offset is specified and we have loaded
// the partition, we use it. Without an epoch, if it is
// out of bounds, we just reset appropriately.
// the partition, we use it. We have to use epoch -1
// rather than the latest loaded epoch on the partition
// because the offset being requested to use could be
// from an epoch after OUR loaded epoch. Otherwise, we
// could update the metadata, see the later epoch,
// request the end offset for our prior epoch, and then
// think data loss occurred.
//
// If an offset is unspecified or we have not loaded
// the partition, we list offsets to find out what to
Expand All @@ -1012,7 +1017,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
cursor := part.cursor
cursor.setOffset(cursorOffset{
offset: offset.at,
lastConsumedEpoch: part.leaderEpoch,
lastConsumedEpoch: -1,
})
cursor.allowUsable()
c.usingCursors.use(cursor)
Expand Down

0 comments on commit 055b349

Please sign in to comment.