Skip to content

Commit

Permalink
kgo: purge missing-from-meta topics while regex consuming
Browse files Browse the repository at this point in the history
While regex consuming, if a topic is deleted, we previously would
forever continue to try fetching it. Now, if a topic is missing from a
metadata response, we internally purge it.

We delete immediately with the assumption that once a topic is
discovered, there is enough broker consensus that it will not be somehow
accidentally missing on the next metadata update -- i.e., if a topic is
missing, it's definitely deleted.

We only do this for regex consuming; for non-regex, the assumption is
that if you specify topics to consume, you want them. If they are
missing in a metadata response, we will continue to try to update the
metadata even if the topic was deleted (and we will continue trying to
fetch).

Closes #434
  • Loading branch information
twmb committed Apr 30, 2023
1 parent 5379fb0 commit bb66f24
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 9 deletions.
21 changes: 19 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ type cfg struct {
rack string
preferLagFn PreferLagFn

maxConcurrentFetches int
disableFetchSessions bool
maxConcurrentFetches int
disableFetchSessions bool
keepFetchRetryableErrors bool

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -1342,6 +1343,22 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
}

// KeepFetchRetryableErrors switches the client to always return any retryable
// broker error when fetching, rather than stripping them. By default, the
// client strips retryable errors from fetch responses; these are usually
// signals that a client needs to update its metadata to learn of where a
// partition has moved to (from one broker to another), or they are signals
// that one broker is temporarily unhealthy (broker not available). You can opt
// into keeping these errors if you want to specifically react to certain
// events. For example, if you want to react to you yourself deleting a topic,
// you can watch for either UNKNOWN_TOPIC_OR_PARTITION or UNKNOWN_TOPIC_ID
// errors being returned in fetches (and ignore the other errors).
//
// TODO not exported / usable yet
func keepFetchRetryableErrors() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.keepFetchRetryableErrors = true }}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand Down
67 changes: 67 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package kgo

import (
"context"
"fmt"
"testing"
"time"
)

// Allow adding a topic to consume after the client is initialized with nothing
// to consume.
func TestIssue325(t *testing.T) {
t.Parallel()

Expand All @@ -17,6 +20,8 @@ func TestIssue325(t *testing.T) {
DefaultProduceTopic(topic),
UnknownTopicRetries(-1),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil {
t.Fatal(err)
}
Expand All @@ -30,6 +35,7 @@ func TestIssue325(t *testing.T) {
}
}

// Ensure we only consume one partition if we only ask for one partition.
func TestIssue337(t *testing.T) {
t.Parallel()

Expand All @@ -45,6 +51,7 @@ func TestIssue337(t *testing.T) {
topic: {0: NewOffset().At(0)},
}),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(),
&Record{Partition: 0, Value: []byte("foo")},
Expand Down Expand Up @@ -91,6 +98,7 @@ func TestDirectPartitionPurge(t *testing.T) {
topic: {0: NewOffset().At(0)},
}),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(),
&Record{Partition: 0, Value: []byte("foo")},
Expand Down Expand Up @@ -132,3 +140,62 @@ func TestDirectPartitionPurge(t *testing.T) {
t.Errorf("did not see expected values %v", exp)
}
}

// Ensure a deleted topic while regex consuming is no longer fetched.
func TestIssue434(t *testing.T) {
t.Parallel()

var (
t1, cleanup1 = tmpTopicPartitions(t, 1)
t2, cleanup2 = tmpTopicPartitions(t, 1)
)
defer cleanup1()
defer cleanup2()

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)),
ConsumeRegex(),
FetchMaxWait(100*time.Millisecond),
keepFetchRetryableErrors(),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(),
&Record{Topic: t1, Value: []byte("t1")},
&Record{Topic: t2, Value: []byte("t2")},
).FirstErr(); err != nil {
t.Fatal(err)
}
cleanup2()

// This test is a slight heuristic check test. We are keeping retryable
// errors, so if the purge is successful, then we expect no response
// and we expect the fetch to just contain context.DeadlineExceeded.
//
// We can get the topic in the response for a little bit if our fetch
// is fast enough, so we ignore any errors (UNKNOWN_TOPIC_ID) at the
// start. We want to ensure the topic is just outright missing from
// the response because that will mean it is internally purged.
start := time.Now()
var missingTopic int
for missingTopic < 2 {
if time.Since(start) > 2*time.Second {
t.Fatal("still seeing topic after 2s")
}

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
fs := cl.PollFetches(ctx)
cancel()
var foundTopic bool
fs.EachTopic(func(ft FetchTopic) {
if ft.Topic == t2 {
foundTopic = true
}
})
if !foundTopic {
missingTopic++
}
}
}
6 changes: 6 additions & 0 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,15 @@ issue:
tb.Fatalf("unable to create topic %q: %v", topic, err)
}

var cleaned bool
return topic, func() {
tb.Helper()

if cleaned {
return
}
cleaned = true

if tb.Failed() {
tb.Logf("FAILED TESTING -- NOT DELETING TOPIC %s", topic)
return
Expand Down
17 changes: 17 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,23 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
}
tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

// For regex consuming, if a topic is not returned in the
// response that we were previously consuming, we assume the
// topic has been deleted and purge it.
var purgeTopics []string
for topic := range tpsConsumerLoad {
if _, ok := latest[topic]; !ok {
purgeTopics = append(purgeTopics, topic)
}
}
if len(purgeTopics) > 0 {
// We have to `go` because Purge issues a blocking
// metadata fn; this will wait for our current
// execution to finish then purge.
cl.cfg.logger.Log(LogLevelInfo, "regex consumer purging topics that were previously consumed because they are missing in a metadata response, we are assuming they are deleted", "topics", purgeTopics)
go cl.PurgeTopicsFromClient(purgeTopics...)
}
}

// Migrating a cursor requires stopping any consumer session. If we
Expand Down
16 changes: 9 additions & 7 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
var keep bool
switch fp.Err {
default:
if kerr.IsRetriable(fp.Err) {
if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepFetchRetryableErrors {
// UnknownLeaderEpoch: our meta is newer than the broker we fetched from
// OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2)
// UnknownTopicID: kafka has not synced the state on all brokers
Expand All @@ -887,15 +887,17 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// consume the topic again anymore. This is an error
// worth bubbling up.
//
// FUN FACT: Kafka will actually return this error
// for a brief window immediately after creating a
// topic for the first time, meaning the controller
// has not yet propagated to the leader that it is
// the leader of a new partition. We need to ignore
// this error for a _litttttlllleee bit_.
// Kafka will actually return this error for a brief
// window immediately after creating a topic for the
// first time, meaning the controller has not yet
// propagated to the leader that it is now the leader
// of a new partition. We need to ignore this error
// for a little bit.
if fails := partOffset.from.unknownIDFails.Add(1); fails > 5 {
partOffset.from.unknownIDFails.Add(-1)
keep = true
} else if s.cl.cfg.keepFetchRetryableErrors {
keep = true
} else {
numErrsStripped++
}
Expand Down

0 comments on commit bb66f24

Please sign in to comment.