From cf392a3b94ca6200eef7e495947ae2a08b87265d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Nov 2022 09:01:52 -0700 Subject: [PATCH 1/2] kgo: bump FetchRequest to v13, add test to ensure we always track latest We were encoding fetch requests as v12, meaning we avoided topic UUID's. We now ensure we are always encoding the latest version and add a test for it. --- pkg/kgo/client_test.go | 11 +++++++++++ pkg/kgo/source.go | 18 +++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/client_test.go b/pkg/kgo/client_test.go index c1c43516..13c229cf 100644 --- a/pkg/kgo/client_test.go +++ b/pkg/kgo/client_test.go @@ -2,8 +2,19 @@ package kgo import ( "testing" + + "github.com/twmb/franz-go/pkg/kmsg" ) +func TestMaxVersions(t *testing.T) { + if ours, main := new(fetchRequest).MaxVersion(), new(kmsg.FetchRequest).MaxVersion(); ours != main { + t.Errorf("our fetch request max version %d != kmsg's %d", ours, main) + } + if ours, main := new(produceRequest).MaxVersion(), new(kmsg.ProduceRequest).MaxVersion(); ours != main { + t.Errorf("our produce request max version %d != kmsg's %d", ours, main) + } +} + func TestParseBrokerAddr(t *testing.T) { tests := []struct { name string diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 020753bc..8cf68128 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1566,8 +1566,24 @@ func (f *fetchRequest) addCursor(c *cursor) { f.numOffsets++ } +// If the end user prefers to consume lag, we +func (f *fetchRequest) adjustPreferringLag() { + if f.preferLagAt < 0 { + return + } + for t, ps := range f.usedOffsets { + for p, c := range ps { + lag := c.hwm - c.offset + if lag < f.preferLagAt { + continue + } + _, _, _ = t, p, c + } + } +} + func (*fetchRequest) Key() int16 { return 1 } -func (*fetchRequest) MaxVersion() int16 { return 12 } +func (*fetchRequest) MaxVersion() int16 { return 13 } func (f *fetchRequest) SetVersion(v int16) { f.version = v } func (f *fetchRequest) GetVersion() int16 { return f.version } func (f *fetchRequest) IsFlexible() bool { return f.version >= 12 } // version 12+ is flexible From 76430a8512f27bf2c30a32004da0f2aec1b34e60 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Nov 2022 13:58:33 -0700 Subject: [PATCH 2/2] kgo: add option to consume preferring laggy partitions This adds three new APIs: func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32) func PreferLagAt(preferLagAt int64) PreferLagFn These functions allow an end user to adjust the order of partitions that are being fetched. Ideally, an end user will only need: kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50)) But, PreferLagFn exists to allow for more advanced use cases. Closes #222 --- pkg/kgo/config.go | 22 +++++ pkg/kgo/group_test.go | 1 + pkg/kgo/source.go | 192 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 209 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f1046e49..98273a58 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -138,6 +138,7 @@ type cfg struct { isolationLevel int8 keepControl bool rack string + preferLagFn PreferLagFn maxConcurrentFetches int disableFetchSessions bool @@ -1306,6 +1307,27 @@ func DisableFetchSessions() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }} } +// ConsumePreferringLagFn allows you to re-order partitions before they are +// fetched, given each partition's current lag. +// +// By default, the client rotates partitions fetched by one after every fetch +// request. Kafka answers fetch requests in the order that partitions are +// requested, filling the fetch response until FetchMaxBytes and +// FetchMaxPartitionBytes are hit. All partitions eventually rotate to the +// front, ensuring no partition is starved. +// +// With this option, you can return topic order and per-topic partition +// ordering. These orders will sort to the front (first by topic, then by +// partition). Any topic or partitions that you do not return are added to the +// end, preserving their original ordering. +// +// For a simple lag preference that sorts the laggiest topics and partitions +// first, use `kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))` (or some other +// similar lag number). +func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }} +} + ////////////////////////////////// // CONSUMER GROUP CONFIGURATION // ////////////////////////////////// diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 39069dce..19c6ca86 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { ConsumeTopics(c.consumeFrom), Balancers(c.balancer), MaxBufferedRecords(10000), + ConsumePreferringLagFn(PreferLagAt(10)), // Even with autocommitting, autocommitting does not commit // *the latest* when being revoked. We always want to commit diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 8cf68128..c3cf6648 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "sort" "sync" "sync/atomic" "time" @@ -149,6 +150,10 @@ type cursorOffset struct { // See kmsg.OffsetForLeaderEpochResponseTopicPartition for more // details. lastConsumedEpoch int32 + + // The current high watermark of the partition. Uninitialized (0) means + // we do not know the HWM, or there is no lag. + hwm int64 } // use, for fetch requests, freezes a view of the cursorOffset. @@ -172,6 +177,7 @@ func (c *cursor) unset() { c.setOffset(cursorOffset{ offset: -1, lastConsumedEpoch: -1, + hwm: 0, }) } @@ -395,6 +401,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { pCursor.from.setOffset(cursorOffset{ offset: lastReturnedRecord.Offset + 1, lastConsumedEpoch: lastReturnedRecord.LeaderEpoch, + hwm: p.HighWatermark, }) } @@ -433,6 +440,7 @@ func (s *source) createReq() *fetchRequest { maxPartBytes: s.cl.cfg.maxPartBytes, rack: s.cl.cfg.rack, isolationLevel: s.cl.cfg.isolationLevel, + preferLagFn: s.cl.cfg.preferLagFn, // We copy a view of the session for the request, which allows // modify source while the request may be reading its copy. @@ -958,6 +966,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } aborter := buildAborter(rp) @@ -1530,6 +1541,7 @@ type fetchRequest struct { rack string isolationLevel int8 + preferLagFn PreferLagFn numOffsets int usedOffsets usedOffsets @@ -1566,19 +1578,185 @@ func (f *fetchRequest) addCursor(c *cursor) { f.numOffsets++ } -// If the end user prefers to consume lag, we +// PreferLagFn accepts topic and partition lag, the previously determined topic +// order, and the previously determined per-topic partition order, and returns +// a new topic and per-topic partition order. +// +// Most use cases will not need to look at the prior orders, but they exist +// if you want to get fancy. +// +// You can return partial results: if you only return topics, partitions within +// each topic keep their prior ordering. If you only return some topics but not +// all, the topics you do not return / the partitions you do not return will +// retain their original ordering *after* your given ordering. +type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32) + +// PreferLagAt is a simple PreferLagFn that orders the largest lag first, for +// any topic that is collectively lagging more than preferLagAt, and for any +// partition that is lagging more than preferLagAt. +// +// The function does not prescribe any ordering for topics that have the same +// lag. It is recommended to use a number more than 0 or 1: if you use 0, you +// may just always undo client ordering when there is no actual lag. +func PreferLagAt(preferLagAt int64) PreferLagFn { + if preferLagAt < 0 { + return nil + } + return func(lag map[string]map[int32]int64, _ []string, _ map[string][]int32) ([]string, map[string][]int32) { + type plag struct { + p int32 + lag int64 + } + type tlag struct { + t string + lag int64 + ps []plag + } + + // First, collect all partition lag into per-topic lag. + tlags := make(map[string]tlag, len(lag)) + for t, ps := range lag { + for p, lag := range ps { + prior := tlags[t] + tlags[t] = tlag{ + t: t, + lag: prior.lag + lag, + ps: append(prior.ps, plag{p, lag}), + } + } + } + + // We now remove topics and partitions that are not lagging + // enough. Collectively, the topic could be lagging too much, + // but individually, no partition is lagging that much: we will + // sort the topic first and keep the old partition ordering. + for t, tlag := range tlags { + if tlag.lag < preferLagAt { + delete(tlags, t) + continue + } + for i := 0; i < len(tlag.ps); i++ { + plag := tlag.ps[i] + if plag.lag < preferLagAt { + tlag.ps[i] = tlag.ps[len(tlag.ps)-1] + tlag.ps = tlag.ps[:len(tlag.ps)-1] + i-- + } + } + } + if len(tlags) == 0 { + return nil, nil + } + + var sortedLags []tlag + for _, tlag := range tlags { + sort.Slice(tlag.ps, func(i, j int) bool { return tlag.ps[i].lag > tlag.ps[j].lag }) + sortedLags = append(sortedLags, tlag) + } + sort.Slice(sortedLags, func(i, j int) bool { return sortedLags[i].lag > sortedLags[j].lag }) + + // We now return our laggy topics and partitions, and let the + // caller add back any missing topics / partitions in their + // prior order. + torder := make([]string, 0, len(sortedLags)) + for _, t := range sortedLags { + torder = append(torder, t.t) + } + porder := make(map[string][]int32, len(sortedLags)) + for _, tlag := range sortedLags { + ps := make([]int32, 0, len(tlag.ps)) + for _, p := range tlag.ps { + ps = append(ps, p.p) + } + porder[tlag.t] = ps + } + return torder, porder + } +} + +// If the end user prefers to consume lag, we reorder our previously ordered +// partitions, preferring first the laggiest topics, and then within those, the +// laggiest partitions. func (f *fetchRequest) adjustPreferringLag() { - if f.preferLagAt < 0 { + if f.preferLagFn == nil { return } + + tall := make(map[string]struct{}, len(f.torder)) + for _, t := range f.torder { + tall[t] = struct{}{} + } + pall := make(map[string][]int32, len(f.porder)) + for t, ps := range f.porder { + pall[t] = append([]int32(nil), ps...) + } + + lag := make(map[string]map[int32]int64, len(f.torder)) for t, ps := range f.usedOffsets { + plag := make(map[int32]int64, len(ps)) + lag[t] = plag for p, c := range ps { - lag := c.hwm - c.offset - if lag < f.preferLagAt { - continue + hwm := c.hwm + if c.hwm < 0 { + hwm = 0 + } + lag := hwm - c.offset + if c.offset <= 0 { + lag = hwm + } + plag[p] = lag + } + } + + torder, porder := f.preferLagFn(lag, f.torder, f.porder) + if torder == nil && porder == nil { + return + } + if torder == nil { + torder = f.torder + } + if porder == nil { + porder = f.porder + } + defer func() { f.torder, f.porder = torder, porder }() + + // Remove any extra topics the user returned that we were not + // consuming, and add all topics they did not give back. + for i := 0; i < len(torder); i++ { + t := torder[i] + if _, exists := tall[t]; !exists { + torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching + i-- + } + delete(tall, t) + } + for t := range tall { + torder = append(torder, t) // user did not return topic we were fetching + } + + // Now, same thing for partitions. + pused := make(map[int32]struct{}) + for t, ps := range pall { + order, exists := porder[t] + if !exists { + porder[t] = ps // shortcut: user did not define this partition's oorder, keep old order + continue + } + for _, p := range ps { + pused[p] = struct{}{} + } + for i := 0; i < len(order); i++ { + p := order[i] + if _, exists := pused[p]; !exists { + order = append(order[:i], order[i+1:]...) + i-- } - _, _, _ = t, p, c + delete(pused, p) + } + for p := range pused { + order = append(order, p) } + porder[t] = order } } @@ -1607,6 +1785,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets)) } + f.adjustPreferringLag() + for _, topic := range f.torder { partitions := f.usedOffsets[topic]