Skip to content

Commit

Permalink
kgo: fix new niche problem against Kafka 3.5
Browse files Browse the repository at this point in the history
Kafka 3.5 seems to have a problem of
(a) creating a topic
(b) replying to a client with the current topic information
(c) migrating partition leadership immediately to another broker
(d) having that other broker reply with unknown partition for a brief
window

For regex consuming, this meant the client was purging the topic because
it would first see the topic, then not see the topic.

We now allow a 15s window of retries for regex consuming where a topic
can be missing. A previous attempt at this patch used 3 tries but that
also seemingly was not working.

Updates #434.
  • Loading branch information
twmb committed Jul 8, 2023
1 parent acb925c commit 0df3ec0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func TestIssue434(t *testing.T) {
start := time.Now()
var missingTopic int
for missingTopic < 2 {
if time.Since(start) > 2*time.Second {
t.Fatal("still seeing topic after 2s")
if time.Since(start) > 30*time.Second {
t.Fatal("still seeing topic after 30s")
}

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
Expand Down
62 changes: 51 additions & 11 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ loop:
}
}

var errMissingTopic = errors.New("topic_missing")

// Updates all producer and consumer partition data, returning whether a new
// update needs scheduling or if an error occurred.
//
Expand Down Expand Up @@ -339,6 +341,8 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
}
groupExternal.updateLatest(latest)

const maxMissTime = 15 * time.Second

// If we are consuming with regex and fetched all topics, the metadata
// may have returned topics the consumer is not yet tracking. We ensure
// that we will store the topics at the end of our metadata update.
Expand All @@ -352,12 +356,20 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
defer tpsConsumer.storeData(tpsConsumerLoad)

// For regex consuming, if a topic is not returned in the
// response that we were previously consuming, we assume the
// topic has been deleted and purge it.
// response and for at least maxMissTime from when we first
// discovered it, we assume the topic has been deleted and
// purge it. We allow for maxMissTime because (in testing
// locally) Kafka can originally broadcast a newly created
// topic exists and then fail to broadcast that info again for
// a while.
var purgeTopics []string
for topic := range tpsConsumerLoad {
for topic, tps := range tpsConsumerLoad {
if _, ok := latest[topic]; !ok {
purgeTopics = append(purgeTopics, topic)
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime {
purgeTopics = append(purgeTopics, td.topic)
} else {
retryWhy.add(topic, -1, errMissingTopic)
}
}
}
if len(purgeTopics) > 0 {
Expand Down Expand Up @@ -395,7 +407,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
}
}()

var missingProduceTopics []string
var missingProduceTopics []*topicPartitions
for _, m := range []struct {
priors map[string]*topicPartitions
isProduce bool
Expand All @@ -407,7 +419,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
newParts, exists := latest[topic]
if !exists {
if m.isProduce {
missingProduceTopics = append(missingProduceTopics, topic)
missingProduceTopics = append(missingProduceTopics, priorParts)
}
continue
}
Expand All @@ -422,12 +434,33 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
)
}
}

// For all produce topics that were missing, we want to bump their
// retries that a failure happened. However, if we are regex consuming,
// then it is possible in a rare scenario for the broker to not return
// a topic that actually does exist and that we previously received a
// metadata response for. This is handled above for consuming, we now
// handle it the same way for consuming.
if len(missingProduceTopics) > 0 {
cl.bumpMetadataFailForTopics(
tpsProducerLoad,
errors.New("metadata request did not return this topic"),
missingProduceTopics...,
)
var bumpFail []string
for _, tps := range missingProduceTopics {
if all {
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime {
bumpFail = append(bumpFail, td.topic)
} else {
retryWhy.add(td.topic, -1, errMissingTopic)
}
} else {
bumpFail = append(bumpFail, tps.load().topic)
}
}
if len(bumpFail) > 0 {
cl.bumpMetadataFailForTopics(
tpsProducerLoad,
fmt.Errorf("metadata request did not return topics: %v", bumpFail),
bumpFail...,
)
}
}

return retryWhy, nil
Expand All @@ -444,6 +477,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
type metadataTopic struct {
loadErr error
isInternal bool
topic string
partitions []metadataPartition
}

Expand All @@ -454,6 +488,8 @@ func (mt *metadataTopic) newPartitions(cl *Client, isProduce bool) *topicPartiti
isInternal: mt.isInternal,
partitions: make([]*topicPartition, 0, n),
writablePartitions: make([]*topicPartition, 0, n),
topic: mt.topic,
when: time.Now().Unix(),
}
for i := range mt.partitions {
p := mt.partitions[i].newPartition(cl, isProduce)
Expand Down Expand Up @@ -539,6 +575,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
mt := &metadataTopic{
loadErr: kerr.ErrorForCode(topicMeta.ErrorCode),
isInternal: topicMeta.IsInternal,
topic: topic,
partitions: make([]metadataPartition, 0, len(topicMeta.Partitions)),
}

Expand Down Expand Up @@ -646,6 +683,9 @@ func (cl *Client) mergeTopicPartitions(

lv.loadErr = r.loadErr
lv.isInternal = r.isInternal
if lv.when == 0 {
lv.when = r.when
}

// If the load had an error for the entire topic, we set the load error
// but keep our stale partition information. For anything being
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ type topicPartitionsData struct {
isInternal bool
partitions []*topicPartition // partition num => partition
writablePartitions []*topicPartition // subset of above
topic string
when int64
}

// topicPartition contains all information from Kafka for a topic's partition,
Expand Down

0 comments on commit 0df3ec0

Please sign in to comment.