diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 51052583..7c40f1bd 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -141,8 +141,9 @@ type cfg struct { rack string preferLagFn PreferLagFn - maxConcurrentFetches int - disableFetchSessions bool + maxConcurrentFetches int + disableFetchSessions bool + keepFetchRetryableErrors bool topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions partitions map[string]map[int32]Offset // partitions to directly consume from @@ -1342,6 +1343,22 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }} } +// KeepFetchRetryableErrors switches the client to always return any retryable +// broker error when fetching, rather than stripping them. By default, the +// client strips retryable errors from fetch responses; these are usually +// signals that a client needs to update its metadata to learn of where a +// partition has moved to (from one broker to another), or they are signals +// that one broker is temporarily unhealthy (broker not available). You can opt +// into keeping these errors if you want to specifically react to certain +// events. For example, if you want to react to you yourself deleting a topic, +// you can watch for either UNKNOWN_TOPIC_OR_PARTITION or UNKNOWN_TOPIC_ID +// errors being returned in fetches (and ignore the other errors). +// +// TODO not exported / usable yet +func keepFetchRetryableErrors() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.keepFetchRetryableErrors = true }} +} + ////////////////////////////////// // CONSUMER GROUP CONFIGURATION // ////////////////////////////////// diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index 3a2d01ef..72f78625 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -2,10 +2,13 @@ package kgo import ( "context" + "fmt" "testing" "time" ) +// Allow adding a topic to consume after the client is initialized with nothing +// to consume. func TestIssue325(t *testing.T) { t.Parallel() @@ -17,6 +20,8 @@ func TestIssue325(t *testing.T) { DefaultProduceTopic(topic), UnknownTopicRetries(-1), ) + defer cl.Close() + if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil { t.Fatal(err) } @@ -30,6 +35,7 @@ func TestIssue325(t *testing.T) { } } +// Ensure we only consume one partition if we only ask for one partition. func TestIssue337(t *testing.T) { t.Parallel() @@ -45,6 +51,7 @@ func TestIssue337(t *testing.T) { topic: {0: NewOffset().At(0)}, }), ) + defer cl.Close() if err := cl.ProduceSync(context.Background(), &Record{Partition: 0, Value: []byte("foo")}, @@ -91,6 +98,7 @@ func TestDirectPartitionPurge(t *testing.T) { topic: {0: NewOffset().At(0)}, }), ) + defer cl.Close() if err := cl.ProduceSync(context.Background(), &Record{Partition: 0, Value: []byte("foo")}, @@ -132,3 +140,62 @@ func TestDirectPartitionPurge(t *testing.T) { t.Errorf("did not see expected values %v", exp) } } + +// Ensure a deleted topic while regex consuming is no longer fetched. +func TestIssue434(t *testing.T) { + t.Parallel() + + var ( + t1, cleanup1 = tmpTopicPartitions(t, 1) + t2, cleanup2 = tmpTopicPartitions(t, 1) + ) + defer cleanup1() + defer cleanup2() + + cl, _ := NewClient( + getSeedBrokers(), + UnknownTopicRetries(-1), + ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)), + ConsumeRegex(), + FetchMaxWait(100*time.Millisecond), + keepFetchRetryableErrors(), + ) + defer cl.Close() + + if err := cl.ProduceSync(context.Background(), + &Record{Topic: t1, Value: []byte("t1")}, + &Record{Topic: t2, Value: []byte("t2")}, + ).FirstErr(); err != nil { + t.Fatal(err) + } + cleanup2() + + // This test is a slight heuristic check test. We are keeping retryable + // errors, so if the purge is successful, then we expect no response + // and we expect the fetch to just contain context.DeadlineExceeded. + // + // We can get the topic in the response for a little bit if our fetch + // is fast enough, so we ignore any errors (UNKNOWN_TOPIC_ID) at the + // start. We want to ensure the topic is just outright missing from + // the response because that will mean it is internally purged. + start := time.Now() + var missingTopic int + for missingTopic < 2 { + if time.Since(start) > 2*time.Second { + t.Fatal("still seeing topic after 2s") + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + fs := cl.PollFetches(ctx) + cancel() + var foundTopic bool + fs.EachTopic(func(ft FetchTopic) { + if ft.Topic == t2 { + foundTopic = true + } + }) + if !foundTopic { + missingTopic++ + } + } +} diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 2b669d79..4e8bc2fd 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -156,9 +156,15 @@ issue: tb.Fatalf("unable to create topic %q: %v", topic, err) } + var cleaned bool return topic, func() { tb.Helper() + if cleaned { + return + } + cleaned = true + if tb.Failed() { tb.Logf("FAILED TESTING -- NOT DELETING TOPIC %s", topic) return diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index b30989e0..29042eb1 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -350,6 +350,23 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { } tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics) defer tpsConsumer.storeData(tpsConsumerLoad) + + // For regex consuming, if a topic is not returned in the + // response that we were previously consuming, we assume the + // topic has been deleted and purge it. + var purgeTopics []string + for topic := range tpsConsumerLoad { + if _, ok := latest[topic]; !ok { + purgeTopics = append(purgeTopics, topic) + } + } + if len(purgeTopics) > 0 { + // We have to `go` because Purge issues a blocking + // metadata fn; this will wait for our current + // execution to finish then purge. + cl.cfg.logger.Log(LogLevelInfo, "regex consumer purging topics that were previously consumed because they are missing in a metadata response, we are assuming they are deleted", "topics", purgeTopics) + go cl.PurgeTopicsFromClient(purgeTopics...) + } } // Migrating a cursor requires stopping any consumer session. If we diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 72f9f825..ed5ded6d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -861,7 +861,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe var keep bool switch fp.Err { default: - if kerr.IsRetriable(fp.Err) { + if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepFetchRetryableErrors { // UnknownLeaderEpoch: our meta is newer than the broker we fetched from // OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2) // UnknownTopicID: kafka has not synced the state on all brokers @@ -887,15 +887,17 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // consume the topic again anymore. This is an error // worth bubbling up. // - // FUN FACT: Kafka will actually return this error - // for a brief window immediately after creating a - // topic for the first time, meaning the controller - // has not yet propagated to the leader that it is - // the leader of a new partition. We need to ignore - // this error for a _litttttlllleee bit_. + // Kafka will actually return this error for a brief + // window immediately after creating a topic for the + // first time, meaning the controller has not yet + // propagated to the leader that it is now the leader + // of a new partition. We need to ignore this error + // for a little bit. if fails := partOffset.from.unknownIDFails.Add(1); fails > 5 { partOffset.from.unknownIDFails.Add(-1) keep = true + } else if s.cl.cfg.keepFetchRetryableErrors { + keep = true } else { numErrsStripped++ }