Skip to content

Commit

Permalink
kgo: add option to consume preferring laggy partitions
Browse files Browse the repository at this point in the history
This adds three new APIs:

func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt
type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32)
func PreferLagAt(preferLagAt int64) PreferLagFn

These functions allow an end user to adjust the order of partitions that
are being fetched. Ideally, an end user will only need:

    kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))

But, PreferLagFn exists to allow for more advanced use cases.

Closes #222
  • Loading branch information
twmb committed Nov 14, 2022
1 parent cf392a3 commit 76430a8
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 6 deletions.
22 changes: 22 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type cfg struct {
isolationLevel int8
keepControl bool
rack string
preferLagFn PreferLagFn

maxConcurrentFetches int
disableFetchSessions bool
Expand Down Expand Up @@ -1306,6 +1307,27 @@ func DisableFetchSessions() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }}
}

// ConsumePreferringLagFn allows you to re-order partitions before they are
// fetched, given each partition's current lag.
//
// By default, the client rotates partitions fetched by one after every fetch
// request. Kafka answers fetch requests in the order that partitions are
// requested, filling the fetch response until FetchMaxBytes and
// FetchMaxPartitionBytes are hit. All partitions eventually rotate to the
// front, ensuring no partition is starved.
//
// With this option, you can return topic order and per-topic partition
// ordering. These orders will sort to the front (first by topic, then by
// partition). Any topic or partitions that you do not return are added to the
// end, preserving their original ordering.
//
// For a simple lag preference that sorts the laggiest topics and partitions
// first, use `kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))` (or some other
// similar lag number).
func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
ConsumeTopics(c.consumeFrom),
Balancers(c.balancer),
MaxBufferedRecords(10000),
ConsumePreferringLagFn(PreferLagAt(10)),

// Even with autocommitting, autocommitting does not commit
// *the latest* when being revoked. We always want to commit
Expand Down
192 changes: 186 additions & 6 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -149,6 +150,10 @@ type cursorOffset struct {
// See kmsg.OffsetForLeaderEpochResponseTopicPartition for more
// details.
lastConsumedEpoch int32

// The current high watermark of the partition. Uninitialized (0) means
// we do not know the HWM, or there is no lag.
hwm int64
}

// use, for fetch requests, freezes a view of the cursorOffset.
Expand All @@ -172,6 +177,7 @@ func (c *cursor) unset() {
c.setOffset(cursorOffset{
offset: -1,
lastConsumedEpoch: -1,
hwm: 0,
})
}

Expand Down Expand Up @@ -395,6 +401,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
pCursor.from.setOffset(cursorOffset{
offset: lastReturnedRecord.Offset + 1,
lastConsumedEpoch: lastReturnedRecord.LeaderEpoch,
hwm: p.HighWatermark,
})
}

Expand Down Expand Up @@ -433,6 +440,7 @@ func (s *source) createReq() *fetchRequest {
maxPartBytes: s.cl.cfg.maxPartBytes,
rack: s.cl.cfg.rack,
isolationLevel: s.cl.cfg.isolationLevel,
preferLagFn: s.cl.cfg.preferLagFn,

// We copy a view of the session for the request, which allows
// modify source while the request may be reading its copy.
Expand Down Expand Up @@ -958,6 +966,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
LastStableOffset: rp.LastStableOffset,
LogStartOffset: rp.LogStartOffset,
}
if rp.ErrorCode == 0 {
o.hwm = rp.HighWatermark
}

aborter := buildAborter(rp)

Expand Down Expand Up @@ -1530,6 +1541,7 @@ type fetchRequest struct {
rack string

isolationLevel int8
preferLagFn PreferLagFn

numOffsets int
usedOffsets usedOffsets
Expand Down Expand Up @@ -1566,19 +1578,185 @@ func (f *fetchRequest) addCursor(c *cursor) {
f.numOffsets++
}

// If the end user prefers to consume lag, we
// PreferLagFn accepts topic and partition lag, the previously determined topic
// order, and the previously determined per-topic partition order, and returns
// a new topic and per-topic partition order.
//
// Most use cases will not need to look at the prior orders, but they exist
// if you want to get fancy.
//
// You can return partial results: if you only return topics, partitions within
// each topic keep their prior ordering. If you only return some topics but not
// all, the topics you do not return / the partitions you do not return will
// retain their original ordering *after* your given ordering.
type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32)

// PreferLagAt is a simple PreferLagFn that orders the largest lag first, for
// any topic that is collectively lagging more than preferLagAt, and for any
// partition that is lagging more than preferLagAt.
//
// The function does not prescribe any ordering for topics that have the same
// lag. It is recommended to use a number more than 0 or 1: if you use 0, you
// may just always undo client ordering when there is no actual lag.
func PreferLagAt(preferLagAt int64) PreferLagFn {
if preferLagAt < 0 {
return nil
}
return func(lag map[string]map[int32]int64, _ []string, _ map[string][]int32) ([]string, map[string][]int32) {
type plag struct {
p int32
lag int64
}
type tlag struct {
t string
lag int64
ps []plag
}

// First, collect all partition lag into per-topic lag.
tlags := make(map[string]tlag, len(lag))
for t, ps := range lag {
for p, lag := range ps {
prior := tlags[t]
tlags[t] = tlag{
t: t,
lag: prior.lag + lag,
ps: append(prior.ps, plag{p, lag}),
}
}
}

// We now remove topics and partitions that are not lagging
// enough. Collectively, the topic could be lagging too much,
// but individually, no partition is lagging that much: we will
// sort the topic first and keep the old partition ordering.
for t, tlag := range tlags {
if tlag.lag < preferLagAt {
delete(tlags, t)
continue
}
for i := 0; i < len(tlag.ps); i++ {
plag := tlag.ps[i]
if plag.lag < preferLagAt {
tlag.ps[i] = tlag.ps[len(tlag.ps)-1]
tlag.ps = tlag.ps[:len(tlag.ps)-1]
i--
}
}
}
if len(tlags) == 0 {
return nil, nil
}

var sortedLags []tlag
for _, tlag := range tlags {
sort.Slice(tlag.ps, func(i, j int) bool { return tlag.ps[i].lag > tlag.ps[j].lag })
sortedLags = append(sortedLags, tlag)
}
sort.Slice(sortedLags, func(i, j int) bool { return sortedLags[i].lag > sortedLags[j].lag })

// We now return our laggy topics and partitions, and let the
// caller add back any missing topics / partitions in their
// prior order.
torder := make([]string, 0, len(sortedLags))
for _, t := range sortedLags {
torder = append(torder, t.t)
}
porder := make(map[string][]int32, len(sortedLags))
for _, tlag := range sortedLags {
ps := make([]int32, 0, len(tlag.ps))
for _, p := range tlag.ps {
ps = append(ps, p.p)
}
porder[tlag.t] = ps
}
return torder, porder
}
}

// If the end user prefers to consume lag, we reorder our previously ordered
// partitions, preferring first the laggiest topics, and then within those, the
// laggiest partitions.
func (f *fetchRequest) adjustPreferringLag() {
if f.preferLagAt < 0 {
if f.preferLagFn == nil {
return
}

tall := make(map[string]struct{}, len(f.torder))
for _, t := range f.torder {
tall[t] = struct{}{}
}
pall := make(map[string][]int32, len(f.porder))
for t, ps := range f.porder {
pall[t] = append([]int32(nil), ps...)
}

lag := make(map[string]map[int32]int64, len(f.torder))
for t, ps := range f.usedOffsets {
plag := make(map[int32]int64, len(ps))
lag[t] = plag
for p, c := range ps {
lag := c.hwm - c.offset
if lag < f.preferLagAt {
continue
hwm := c.hwm
if c.hwm < 0 {
hwm = 0
}
lag := hwm - c.offset
if c.offset <= 0 {
lag = hwm
}
plag[p] = lag
}
}

torder, porder := f.preferLagFn(lag, f.torder, f.porder)
if torder == nil && porder == nil {
return
}
if torder == nil {
torder = f.torder
}
if porder == nil {
porder = f.porder
}
defer func() { f.torder, f.porder = torder, porder }()

// Remove any extra topics the user returned that we were not
// consuming, and add all topics they did not give back.
for i := 0; i < len(torder); i++ {
t := torder[i]
if _, exists := tall[t]; !exists {
torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching
i--
}
delete(tall, t)
}
for t := range tall {
torder = append(torder, t) // user did not return topic we were fetching
}

// Now, same thing for partitions.
pused := make(map[int32]struct{})
for t, ps := range pall {
order, exists := porder[t]
if !exists {
porder[t] = ps // shortcut: user did not define this partition's oorder, keep old order
continue
}
for _, p := range ps {
pused[p] = struct{}{}
}
for i := 0; i < len(order); i++ {
p := order[i]
if _, exists := pused[p]; !exists {
order = append(order[:i], order[i+1:]...)
i--
}
_, _, _ = t, p, c
delete(pused, p)
}
for p := range pused {
order = append(order, p)
}
porder[t] = order
}
}

Expand Down Expand Up @@ -1607,6 +1785,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets))
}

f.adjustPreferringLag()

for _, topic := range f.torder {
partitions := f.usedOffsets[topic]

Expand Down

0 comments on commit 76430a8

Please sign in to comment.