From 0df3ec06e2af90bd46f117f3fcf1fbf396fd60e2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 7 Jul 2023 21:53:16 -0600 Subject: [PATCH] kgo: fix new niche problem against Kafka 3.5 Kafka 3.5 seems to have a problem of (a) creating a topic (b) replying to a client with the current topic information (c) migrating partition leadership immediately to another broker (d) having that other broker reply with unknown partition for a brief window For regex consuming, this meant the client was purging the topic because it would first see the topic, then not see the topic. We now allow a 15s window of retries for regex consuming where a topic can be missing. A previous attempt at this patch used 3 tries but that also seemingly was not working. Updates #434. --- pkg/kgo/consumer_direct_test.go | 4 +-- pkg/kgo/metadata.go | 62 ++++++++++++++++++++++++++------ pkg/kgo/topics_and_partitions.go | 2 ++ 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index 72f78625..2940b1a1 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -181,8 +181,8 @@ func TestIssue434(t *testing.T) { start := time.Now() var missingTopic int for missingTopic < 2 { - if time.Since(start) > 2*time.Second { - t.Fatal("still seeing topic after 2s") + if time.Since(start) > 30*time.Second { + t.Fatal("still seeing topic after 30s") } ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 29042eb1..6007b097 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -287,6 +287,8 @@ loop: } } +var errMissingTopic = errors.New("topic_missing") + // Updates all producer and consumer partition data, returning whether a new // update needs scheduling or if an error occurred. // @@ -339,6 +341,8 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { } groupExternal.updateLatest(latest) + const maxMissTime = 15 * time.Second + // If we are consuming with regex and fetched all topics, the metadata // may have returned topics the consumer is not yet tracking. We ensure // that we will store the topics at the end of our metadata update. @@ -352,12 +356,20 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { defer tpsConsumer.storeData(tpsConsumerLoad) // For regex consuming, if a topic is not returned in the - // response that we were previously consuming, we assume the - // topic has been deleted and purge it. + // response and for at least maxMissTime from when we first + // discovered it, we assume the topic has been deleted and + // purge it. We allow for maxMissTime because (in testing + // locally) Kafka can originally broadcast a newly created + // topic exists and then fail to broadcast that info again for + // a while. var purgeTopics []string - for topic := range tpsConsumerLoad { + for topic, tps := range tpsConsumerLoad { if _, ok := latest[topic]; !ok { - purgeTopics = append(purgeTopics, topic) + if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime { + purgeTopics = append(purgeTopics, td.topic) + } else { + retryWhy.add(topic, -1, errMissingTopic) + } } } if len(purgeTopics) > 0 { @@ -395,7 +407,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { } }() - var missingProduceTopics []string + var missingProduceTopics []*topicPartitions for _, m := range []struct { priors map[string]*topicPartitions isProduce bool @@ -407,7 +419,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { newParts, exists := latest[topic] if !exists { if m.isProduce { - missingProduceTopics = append(missingProduceTopics, topic) + missingProduceTopics = append(missingProduceTopics, priorParts) } continue } @@ -422,12 +434,33 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { ) } } + + // For all produce topics that were missing, we want to bump their + // retries that a failure happened. However, if we are regex consuming, + // then it is possible in a rare scenario for the broker to not return + // a topic that actually does exist and that we previously received a + // metadata response for. This is handled above for consuming, we now + // handle it the same way for consuming. if len(missingProduceTopics) > 0 { - cl.bumpMetadataFailForTopics( - tpsProducerLoad, - errors.New("metadata request did not return this topic"), - missingProduceTopics..., - ) + var bumpFail []string + for _, tps := range missingProduceTopics { + if all { + if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime { + bumpFail = append(bumpFail, td.topic) + } else { + retryWhy.add(td.topic, -1, errMissingTopic) + } + } else { + bumpFail = append(bumpFail, tps.load().topic) + } + } + if len(bumpFail) > 0 { + cl.bumpMetadataFailForTopics( + tpsProducerLoad, + fmt.Errorf("metadata request did not return topics: %v", bumpFail), + bumpFail..., + ) + } } return retryWhy, nil @@ -444,6 +477,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { type metadataTopic struct { loadErr error isInternal bool + topic string partitions []metadataPartition } @@ -454,6 +488,8 @@ func (mt *metadataTopic) newPartitions(cl *Client, isProduce bool) *topicPartiti isInternal: mt.isInternal, partitions: make([]*topicPartition, 0, n), writablePartitions: make([]*topicPartition, 0, n), + topic: mt.topic, + when: time.Now().Unix(), } for i := range mt.partitions { p := mt.partitions[i].newPartition(cl, isProduce) @@ -539,6 +575,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]* mt := &metadataTopic{ loadErr: kerr.ErrorForCode(topicMeta.ErrorCode), isInternal: topicMeta.IsInternal, + topic: topic, partitions: make([]metadataPartition, 0, len(topicMeta.Partitions)), } @@ -646,6 +683,9 @@ func (cl *Client) mergeTopicPartitions( lv.loadErr = r.loadErr lv.isInternal = r.isInternal + if lv.when == 0 { + lv.when = r.when + } // If the load had an error for the entire topic, we set the load error // but keep our stale partition information. For anything being diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 9164e62c..5a52edac 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -457,6 +457,8 @@ type topicPartitionsData struct { isInternal bool partitions []*topicPartition // partition num => partition writablePartitions []*topicPartition // subset of above + topic string + when int64 } // topicPartition contains all information from Kafka for a topic's partition,