diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f1046e49..98273a58 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -138,6 +138,7 @@ type cfg struct { isolationLevel int8 keepControl bool rack string + preferLagFn PreferLagFn maxConcurrentFetches int disableFetchSessions bool @@ -1306,6 +1307,27 @@ func DisableFetchSessions() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }} } +// ConsumePreferringLagFn allows you to re-order partitions before they are +// fetched, given each partition's current lag. +// +// By default, the client rotates partitions fetched by one after every fetch +// request. Kafka answers fetch requests in the order that partitions are +// requested, filling the fetch response until FetchMaxBytes and +// FetchMaxPartitionBytes are hit. All partitions eventually rotate to the +// front, ensuring no partition is starved. +// +// With this option, you can return topic order and per-topic partition +// ordering. These orders will sort to the front (first by topic, then by +// partition). Any topic or partitions that you do not return are added to the +// end, preserving their original ordering. +// +// For a simple lag preference that sorts the laggiest topics and partitions +// first, use `kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))` (or some other +// similar lag number). +func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }} +} + ////////////////////////////////// // CONSUMER GROUP CONFIGURATION // ////////////////////////////////// diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 39069dce..19c6ca86 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { ConsumeTopics(c.consumeFrom), Balancers(c.balancer), MaxBufferedRecords(10000), + ConsumePreferringLagFn(PreferLagAt(10)), // Even with autocommitting, autocommitting does not commit // *the latest* when being revoked. We always want to commit diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 8cf68128..c3cf6648 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "sort" "sync" "sync/atomic" "time" @@ -149,6 +150,10 @@ type cursorOffset struct { // See kmsg.OffsetForLeaderEpochResponseTopicPartition for more // details. lastConsumedEpoch int32 + + // The current high watermark of the partition. Uninitialized (0) means + // we do not know the HWM, or there is no lag. + hwm int64 } // use, for fetch requests, freezes a view of the cursorOffset. @@ -172,6 +177,7 @@ func (c *cursor) unset() { c.setOffset(cursorOffset{ offset: -1, lastConsumedEpoch: -1, + hwm: 0, }) } @@ -395,6 +401,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { pCursor.from.setOffset(cursorOffset{ offset: lastReturnedRecord.Offset + 1, lastConsumedEpoch: lastReturnedRecord.LeaderEpoch, + hwm: p.HighWatermark, }) } @@ -433,6 +440,7 @@ func (s *source) createReq() *fetchRequest { maxPartBytes: s.cl.cfg.maxPartBytes, rack: s.cl.cfg.rack, isolationLevel: s.cl.cfg.isolationLevel, + preferLagFn: s.cl.cfg.preferLagFn, // We copy a view of the session for the request, which allows // modify source while the request may be reading its copy. @@ -958,6 +966,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } aborter := buildAborter(rp) @@ -1530,6 +1541,7 @@ type fetchRequest struct { rack string isolationLevel int8 + preferLagFn PreferLagFn numOffsets int usedOffsets usedOffsets @@ -1566,19 +1578,185 @@ func (f *fetchRequest) addCursor(c *cursor) { f.numOffsets++ } -// If the end user prefers to consume lag, we +// PreferLagFn accepts topic and partition lag, the previously determined topic +// order, and the previously determined per-topic partition order, and returns +// a new topic and per-topic partition order. +// +// Most use cases will not need to look at the prior orders, but they exist +// if you want to get fancy. +// +// You can return partial results: if you only return topics, partitions within +// each topic keep their prior ordering. If you only return some topics but not +// all, the topics you do not return / the partitions you do not return will +// retain their original ordering *after* your given ordering. +type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32) + +// PreferLagAt is a simple PreferLagFn that orders the largest lag first, for +// any topic that is collectively lagging more than preferLagAt, and for any +// partition that is lagging more than preferLagAt. +// +// The function does not prescribe any ordering for topics that have the same +// lag. It is recommended to use a number more than 0 or 1: if you use 0, you +// may just always undo client ordering when there is no actual lag. +func PreferLagAt(preferLagAt int64) PreferLagFn { + if preferLagAt < 0 { + return nil + } + return func(lag map[string]map[int32]int64, _ []string, _ map[string][]int32) ([]string, map[string][]int32) { + type plag struct { + p int32 + lag int64 + } + type tlag struct { + t string + lag int64 + ps []plag + } + + // First, collect all partition lag into per-topic lag. + tlags := make(map[string]tlag, len(lag)) + for t, ps := range lag { + for p, lag := range ps { + prior := tlags[t] + tlags[t] = tlag{ + t: t, + lag: prior.lag + lag, + ps: append(prior.ps, plag{p, lag}), + } + } + } + + // We now remove topics and partitions that are not lagging + // enough. Collectively, the topic could be lagging too much, + // but individually, no partition is lagging that much: we will + // sort the topic first and keep the old partition ordering. + for t, tlag := range tlags { + if tlag.lag < preferLagAt { + delete(tlags, t) + continue + } + for i := 0; i < len(tlag.ps); i++ { + plag := tlag.ps[i] + if plag.lag < preferLagAt { + tlag.ps[i] = tlag.ps[len(tlag.ps)-1] + tlag.ps = tlag.ps[:len(tlag.ps)-1] + i-- + } + } + } + if len(tlags) == 0 { + return nil, nil + } + + var sortedLags []tlag + for _, tlag := range tlags { + sort.Slice(tlag.ps, func(i, j int) bool { return tlag.ps[i].lag > tlag.ps[j].lag }) + sortedLags = append(sortedLags, tlag) + } + sort.Slice(sortedLags, func(i, j int) bool { return sortedLags[i].lag > sortedLags[j].lag }) + + // We now return our laggy topics and partitions, and let the + // caller add back any missing topics / partitions in their + // prior order. + torder := make([]string, 0, len(sortedLags)) + for _, t := range sortedLags { + torder = append(torder, t.t) + } + porder := make(map[string][]int32, len(sortedLags)) + for _, tlag := range sortedLags { + ps := make([]int32, 0, len(tlag.ps)) + for _, p := range tlag.ps { + ps = append(ps, p.p) + } + porder[tlag.t] = ps + } + return torder, porder + } +} + +// If the end user prefers to consume lag, we reorder our previously ordered +// partitions, preferring first the laggiest topics, and then within those, the +// laggiest partitions. func (f *fetchRequest) adjustPreferringLag() { - if f.preferLagAt < 0 { + if f.preferLagFn == nil { return } + + tall := make(map[string]struct{}, len(f.torder)) + for _, t := range f.torder { + tall[t] = struct{}{} + } + pall := make(map[string][]int32, len(f.porder)) + for t, ps := range f.porder { + pall[t] = append([]int32(nil), ps...) + } + + lag := make(map[string]map[int32]int64, len(f.torder)) for t, ps := range f.usedOffsets { + plag := make(map[int32]int64, len(ps)) + lag[t] = plag for p, c := range ps { - lag := c.hwm - c.offset - if lag < f.preferLagAt { - continue + hwm := c.hwm + if c.hwm < 0 { + hwm = 0 + } + lag := hwm - c.offset + if c.offset <= 0 { + lag = hwm + } + plag[p] = lag + } + } + + torder, porder := f.preferLagFn(lag, f.torder, f.porder) + if torder == nil && porder == nil { + return + } + if torder == nil { + torder = f.torder + } + if porder == nil { + porder = f.porder + } + defer func() { f.torder, f.porder = torder, porder }() + + // Remove any extra topics the user returned that we were not + // consuming, and add all topics they did not give back. + for i := 0; i < len(torder); i++ { + t := torder[i] + if _, exists := tall[t]; !exists { + torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching + i-- + } + delete(tall, t) + } + for t := range tall { + torder = append(torder, t) // user did not return topic we were fetching + } + + // Now, same thing for partitions. + pused := make(map[int32]struct{}) + for t, ps := range pall { + order, exists := porder[t] + if !exists { + porder[t] = ps // shortcut: user did not define this partition's oorder, keep old order + continue + } + for _, p := range ps { + pused[p] = struct{}{} + } + for i := 0; i < len(order); i++ { + p := order[i] + if _, exists := pused[p]; !exists { + order = append(order[:i], order[i+1:]...) + i-- } - _, _, _ = t, p, c + delete(pused, p) + } + for p := range pused { + order = append(order, p) } + porder[t] = order } } @@ -1607,6 +1785,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets)) } + f.adjustPreferringLag() + for _, topic := range f.torder { partitions := f.usedOffsets[topic]