Skip to content

Commit

Permalink
kgo.EpochOffset: export Less, add docs
Browse files Browse the repository at this point in the history
Closes #192.
  • Loading branch information
twmb committed Aug 22, 2022
1 parent 2b38ec5 commit 5dd3321
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,11 +1637,23 @@ type uncommit struct {
// EpochOffset combines a record offset with the leader epoch the broker
// was at when the record was written.
type EpochOffset struct {
Epoch int32
// Epoch is the leader epoch of the record being committed. Truncation
// detection relies on the epoch of the CURRENT record. For truncation
// detection, the client asks "what is the the end of this epoch?",
// which returns one after the end offset (see the next field, and
// check the docs on kmsg.OffsetForLeaderEpochRequest).
Epoch int32

// Offset is the offset of a record. If committing, this should be one
// AFTER a record's offset. Clients start consuming at the offset that
// is committed.
Offset int64
}

func (e EpochOffset) less(o EpochOffset) bool {
// Less returns whether the this EpochOffset is less than another. This is less
// than the other if this one's epoch is less, or the epoch's are equal and
// this one's offset is less.
func (e EpochOffset) Less(o EpochOffset) bool {
return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset
}

Expand Down Expand Up @@ -1876,7 +1888,7 @@ func (g *groupConsumer) updateCommitted(
//
// w.r.t. leader epoch's, we document that modifying
// leader epoch's is not recommended.
if uncommit.head.less(set) {
if uncommit.head.Less(set) {
uncommit.head = set
}

Expand Down Expand Up @@ -2205,7 +2217,7 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
if newHead := (EpochOffset{
r.LeaderEpoch,
r.Offset + 1,
}); current.head.less(newHead) {
}); current.head.Less(newHead) {
curPartitions[r.Partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
Expand Down

0 comments on commit 5dd3321

Please sign in to comment.