Skip to content

Commit

Permalink
pkg/kgo: bugfix ConsumePreferringLagFn
Browse files Browse the repository at this point in the history
adjustPreferringLag adds back topics and partitions that the user did
not specify. For partitions specifically, we reuse a partition map for
all topics rather than create potentially dozens of partition maps.

Previously, the code did not clear out the partition map entirely before
moving onto the next topic. If a second topic had fewer partitions than
a first topic, then we would add back fake partitions into our fetch,
and when we went to build a fetch request, we would panic because the
consumer offset pointer to use for the fake partition didn't exist.

We now properly clear the map entirely between topics.

Additionally, this fixes a bit of the previous behavior w.r.t. ordering:
we documented that any unchanged ordering was preserved -- i.e. if you
only wanted one partition at the front, the prior ordering for the
remaining partitions was unchanged. The implementation did not honor
this -- we iterated over maps to add back topics & partitions the user
did not specify, and map iteration is unordered. We now iterate over the
original input and compare if the input still exists in the map, and if
so, add back. This makes adjusting slower, but _hopefully_ the impact
is not noticeably worse.

The integration tests are improved slightly, but I've been unable to
trigger this in the integration tests -- instead, I recreated this
locally.

* One broker
* Create two topics, one with one partition, the other with 2
* Produce 5mil messages to each topic
* Group consume to the end so there is no lag
* Seek before the end for the one-topic partition, seek one partition of
the two-partition topic to the beginning, the other at the end
* Redo the consumer

The above will panic on the last step without this commit, and will not
panic with this commit.

Closes #310.
  • Loading branch information
twmb committed Jan 18, 2023
1 parent bfcfa08 commit 38f2ec6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
ConsumeTopics(c.consumeFrom),
Balancers(c.balancer),
MaxBufferedRecords(10000),
ConsumePreferringLagFn(PreferLagAt(10)),
ConsumePreferringLagFn(PreferLagAt(1)),

// Even with autocommitting, autocommitting does not commit
// *the latest* when being revoked. We always want to commit
Expand Down
10 changes: 9 additions & 1 deletion pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,6 +35,12 @@ var (
// EndTxn to return successfully before beginning a new transaction. We
// cannot use EndAndBeginTransaction with EndBeginTxnUnsafe.
allowUnsafe = false

// We create topics with a different number of partitions to exercise
// a few extra code paths; we index into npartitions with npartitionsAt,
// an atomic that we modulo after load.
npartitions = []int{7, 11, 31}
npartitionsAt int64
)

func init() {
Expand Down Expand Up @@ -110,10 +117,11 @@ func tmpTopic(tb testing.TB) (string, func()) {

topic := randsha()

partitions := npartitions[int(atomic.AddInt64(&npartitionsAt, 1))%len(npartitions)]
req := kmsg.NewPtrCreateTopicsRequest()
reqTopic := kmsg.NewCreateTopicsRequestTopic()
reqTopic.Topic = topic
reqTopic.NumPartitions = 20
reqTopic.NumPartitions = int32(partitions)
reqTopic.ReplicationFactor = int16(testrf)
req.Topics = append(req.Topics, reqTopic)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,9 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
isRetriableBrokerErr(err):
updateMeta := !isRetriableBrokerErr(err)
if updateMeta {
s.cl.cfg.logger.Log(LogLevelInfo, "produce request failed triggering metadata update", "broker", logID(s.nodeID), "err", err)
s.cl.cfg.logger.Log(LogLevelInfo, "produce request failed, triggering metadata update", "broker", logID(s.nodeID), "err", err)
}
s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false, "failed produce request triggering metadata update")
s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false, "failed produce request triggered metadata update")

case errors.Is(err, ErrClientClosed):
s.cl.failBufferedRecords(ErrClientClosed)
Expand Down
57 changes: 36 additions & 21 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,13 +1582,17 @@ func (f *fetchRequest) addCursor(c *cursor) {
// order, and the previously determined per-topic partition order, and returns
// a new topic and per-topic partition order.
//
// Most use cases will not need to look at the prior orders, but they exist
// if you want to get fancy.
// Most use cases will not need to look at the prior orders, but they exist if
// you want to get fancy.
//
// You can return partial results: if you only return topics, partitions within
// each topic keep their prior ordering. If you only return some topics but not
// all, the topics you do not return / the partitions you do not return will
// retain their original ordering *after* your given ordering.
//
// NOTE: torderPrior and porderPrior must not be modified. To avoid a bit of
// unnecessary allocations, these arguments are views into data that is used to
// build a fetch request.
type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32)

// PreferLagAt is a simple PreferLagFn that orders the largest lag first, for
Expand Down Expand Up @@ -1704,6 +1708,9 @@ func (f *fetchRequest) adjustPreferringLag() {
if c.offset <= 0 {
lag = hwm
}
if lag < 0 {
lag = 0
}
plag[p] = lag
}
}
Expand All @@ -1712,29 +1719,34 @@ func (f *fetchRequest) adjustPreferringLag() {
if torder == nil && porder == nil {
return
}
if torder == nil {
torder = f.torder
}
if porder == nil {
porder = f.porder
}
defer func() { f.torder, f.porder = torder, porder }()

// Remove any extra topics the user returned that we were not
// consuming, and add all topics they did not give back.
for i := 0; i < len(torder); i++ {
t := torder[i]
if _, exists := tall[t]; !exists {
torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching
i--
if len(torder) == 0 {
torder = f.torder // user did not modify topic order, keep old order
} else {
// Remove any extra topics the user returned that we were not
// consuming, and add all topics they did not give back.
for i := 0; i < len(torder); i++ {
t := torder[i]
if _, exists := tall[t]; !exists {
torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching
i--
}
delete(tall, t)
}
for _, t := range f.torder {
if _, exists := tall[t]; exists {
torder = append(torder, t) // user did not return topic we were fetching
delete(tall, t)
}
}
delete(tall, t)
}
for t := range tall {
torder = append(torder, t) // user did not return topic we were fetching

if len(porder) == 0 {
porder = f.porder // user did not modify partition order, keep old order
return
}

// Now, same thing for partitions.
pused := make(map[int32]struct{})
for t, ps := range pall {
order, exists := porder[t]
Expand All @@ -1753,8 +1765,11 @@ func (f *fetchRequest) adjustPreferringLag() {
}
delete(pused, p)
}
for p := range pused {
order = append(order, p)
for _, p := range f.porder[t] {
if _, exists := pused[p]; exists {
order = append(order, p)
delete(pused, p)
}
}
porder[t] = order
}
Expand Down

0 comments on commit 38f2ec6

Please sign in to comment.