Skip to content

Commit

Permalink
metadata: minor changes, & wait a bit when looping for now triggers
Browse files Browse the repository at this point in the history
Previously, if an immediate trigger failed either due to request failure
or inner partition errors, we would immediately loop and re-load
metadata. This retry could happen twice, resulting in three total
metadata loads that were nearly instantaneous. This immediate retry is
not too beneficial: a failure should imply *some* backoff. If a user has
auto topic creation enabled, a partition will not have a leader right
away, and our immediate triggers would fail on first produce and the
client would stall 10s until the delayed trigger re-takes and re-loads.

We split the retry into two cases:

- On error, we do not retry. Fetching metadata itself already retries 3x
on request failure, so we should not retry further when we know the
request itself failed 3 times. We will just go to the delayed update
case.

- On non-error, but inner-partition-error, we sleep 250ms and try again
up to 8x, meaning we try across 2s. This gives Kafka a chance to
harmonize its internal issues, and allows us to be less immediately
spammy. This does mean, however, that we could end up trying a bit more
in the end for a bit longer of time. We'd need to retry anyway
eventually, so, minor wash.

This commit also drops the default min metadata reload interval from 10s
to 5s. This speeds up some random cases where an immediate trigger
continues to fail. Hopefully, this does not result in unnecessary
metadata load for users.

Lastly, this removes the min metadata refresh interval, instead just
globally defaulting to the user's configuration. The min interval was
originally 1s for not much reason, then bumped to 2.5s for similar lack
of reasoning. Actually thinking about it, it does not make sense to
allow waitmeta to trigger a metadata update that will not immediately
run. The purpose of waitmeta is to wait for a metadata update. We should
either always return a cached value, or always immediately trigger and
then run.
  • Loading branch information
twmb committed Dec 7, 2021
1 parent b8b7bd1 commit 8325ba7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest,
// number more accurate, we should *never* retry here, but this is
// pretty intolerant of immediately-temporary network issues. Rather,
// we use a small count of 3 retries, which with the default backoff,
// will be <500ms of retrying. This is still intolerant of temporary
// will be <2s of retrying. This is still intolerant of temporary
// failures, but it does allow recovery from a dns issue / bad path.
if limitRetries {
r.limitRetries = 3
Expand Down
16 changes: 8 additions & 8 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func defaultCfg() cfg {
maxBrokerReadBytes: 100 << 20,

metadataMaxAge: 5 * time.Minute,
metadataMinAge: 10 * time.Second,
metadataMinAge: 5 * time.Second,

//////////////
// producer //
Expand Down Expand Up @@ -738,13 +738,13 @@ func MetadataMaxAge(age time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.metadataMaxAge = age }}
}

// MetadataMinAge sets the minimum time between metadata queries,
// overriding the default 10s. You may want to raise or lower this to reduce
// the number of metadata queries the client will make. Notably, if metadata
// detects an error in any topic or partition, it triggers itself to update as
// soon as allowed. Additionally, any connection failures causing backoff while
// producing or consuming trigger metadata updates, because the client must
// assume that maybe the connection died due to a broker dying.
// MetadataMinAge sets the minimum time between metadata queries, overriding
// the default 5s. You may want to raise or lower this to reduce the number of
// metadata queries the client will make. Notably, if metadata detects an error
// in any topic or partition, it triggers itself to update as soon as allowed.
// Additionally, any connection failures causing backoff while producing or
// consuming trigger metadata updates, because the client must assume that
// maybe the connection died due to a broker dying.
func MetadataMinAge(age time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.metadataMinAge = age }}
}
Expand Down
39 changes: 25 additions & 14 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
)

// This corresponds to the amount of time we cache stale metadata before
// forcing a refresh when calling `waitmeta` or `triggerUpdateMetadata`. If
// either function is called within the refresh window, then we just return
// immediately / avoid triggering a refresh.
//
// This is similar to the client configurable metadata min age, the difference
// being is that that limit kicks in when a trigger actually does happen and
// causes a sleep before proceeding into a metadata request.
const minRefreshTrigger = 5 * time.Second / 2

type metawait struct {
mu sync.Mutex
c *sync.Cond
Expand All @@ -42,7 +32,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string)
now := time.Now()

cl.metawait.mu.Lock()
if now.Sub(cl.metawait.lastUpdate) < minRefreshTrigger {
if now.Sub(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
cl.metawait.mu.Unlock()
return
}
Expand All @@ -61,7 +51,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string)
defer cl.metawait.mu.Unlock()

for !quit {
if now.Sub(cl.metawait.lastUpdate) < minRefreshTrigger {
if now.Sub(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
return
}
cl.metawait.c.Wait()
Expand All @@ -86,7 +76,7 @@ func (cl *Client) triggerUpdateMetadata(must bool, why string) bool {
if !must {
cl.metawait.mu.Lock()
defer cl.metawait.mu.Unlock()
if time.Since(cl.metawait.lastUpdate) < minRefreshTrigger {
if time.Since(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
return false
}
}
Expand Down Expand Up @@ -163,7 +153,28 @@ func (cl *Client) updateMetadataLoop() {

again, err, why := cl.updateMetadata()
if again || err != nil {
if now && nowTries < 3 {
// If err is non-nil, the metadata request failed
// itself and already retried 3x; we do not loop more.
//
// If err is nil, the a topic or partition had a load
// error and is perhaps still being created. We retry a
// few more times to give Kafka a chance to figure
// things out. By default this will put us at 2s of
// looping+waiting (250ms per wait, 8x), and if things
// still fail we will fall into the slower update below
// which waits (default) 5s between tries.
if now && err == nil && nowTries < 8 {
wait := 250 * time.Millisecond
if cl.cfg.metadataMinAge < wait {
wait = cl.cfg.metadataMinAge
}
timer := time.NewTimer(wait)
select {
case <-cl.ctx.Done():
timer.Stop()
return
case <-timer.C:
}
goto start
}
if err != nil {
Expand Down

0 comments on commit 8325ba7

Please sign in to comment.