From 8325ba7c59b1f9b3b25de5e01242194fa73d44e9 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 6 Dec 2021 21:12:22 -0700 Subject: [PATCH] metadata: minor changes, & wait a bit when looping for now triggers Previously, if an immediate trigger failed either due to request failure or inner partition errors, we would immediately loop and re-load metadata. This retry could happen twice, resulting in three total metadata loads that were nearly instantaneous. This immediate retry is not too beneficial: a failure should imply *some* backoff. If a user has auto topic creation enabled, a partition will not have a leader right away, and our immediate triggers would fail on first produce and the client would stall 10s until the delayed trigger re-takes and re-loads. We split the retry into two cases: - On error, we do not retry. Fetching metadata itself already retries 3x on request failure, so we should not retry further when we know the request itself failed 3 times. We will just go to the delayed update case. - On non-error, but inner-partition-error, we sleep 250ms and try again up to 8x, meaning we try across 2s. This gives Kafka a chance to harmonize its internal issues, and allows us to be less immediately spammy. This does mean, however, that we could end up trying a bit more in the end for a bit longer of time. We'd need to retry anyway eventually, so, minor wash. This commit also drops the default min metadata reload interval from 10s to 5s. This speeds up some random cases where an immediate trigger continues to fail. Hopefully, this does not result in unnecessary metadata load for users. Lastly, this removes the min metadata refresh interval, instead just globally defaulting to the user's configuration. The min interval was originally 1s for not much reason, then bumped to 2.5s for similar lack of reasoning. Actually thinking about it, it does not make sense to allow waitmeta to trigger a metadata update that will not immediately run. The purpose of waitmeta is to wait for a metadata update. We should either always return a cached value, or always immediately trigger and then run. --- pkg/kgo/client.go | 2 +- pkg/kgo/config.go | 16 ++++++++-------- pkg/kgo/metadata.go | 39 +++++++++++++++++++++++++-------------- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 8b6d1de0..fc375311 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -399,7 +399,7 @@ func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, // number more accurate, we should *never* retry here, but this is // pretty intolerant of immediately-temporary network issues. Rather, // we use a small count of 3 retries, which with the default backoff, - // will be <500ms of retrying. This is still intolerant of temporary + // will be <2s of retrying. This is still intolerant of temporary // failures, but it does allow recovery from a dns issue / bad path. if limitRetries { r.limitRetries = 3 diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 49c72570..dd733ec2 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -436,7 +436,7 @@ func defaultCfg() cfg { maxBrokerReadBytes: 100 << 20, metadataMaxAge: 5 * time.Minute, - metadataMinAge: 10 * time.Second, + metadataMinAge: 5 * time.Second, ////////////// // producer // @@ -738,13 +738,13 @@ func MetadataMaxAge(age time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.metadataMaxAge = age }} } -// MetadataMinAge sets the minimum time between metadata queries, -// overriding the default 10s. You may want to raise or lower this to reduce -// the number of metadata queries the client will make. Notably, if metadata -// detects an error in any topic or partition, it triggers itself to update as -// soon as allowed. Additionally, any connection failures causing backoff while -// producing or consuming trigger metadata updates, because the client must -// assume that maybe the connection died due to a broker dying. +// MetadataMinAge sets the minimum time between metadata queries, overriding +// the default 5s. You may want to raise or lower this to reduce the number of +// metadata queries the client will make. Notably, if metadata detects an error +// in any topic or partition, it triggers itself to update as soon as allowed. +// Additionally, any connection failures causing backoff while producing or +// consuming trigger metadata updates, because the client must assume that +// maybe the connection died due to a broker dying. func MetadataMinAge(age time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.metadataMinAge = age }} } diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 55b7008b..c80a2007 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -12,16 +12,6 @@ import ( "github.com/twmb/franz-go/pkg/kerr" ) -// This corresponds to the amount of time we cache stale metadata before -// forcing a refresh when calling `waitmeta` or `triggerUpdateMetadata`. If -// either function is called within the refresh window, then we just return -// immediately / avoid triggering a refresh. -// -// This is similar to the client configurable metadata min age, the difference -// being is that that limit kicks in when a trigger actually does happen and -// causes a sleep before proceeding into a metadata request. -const minRefreshTrigger = 5 * time.Second / 2 - type metawait struct { mu sync.Mutex c *sync.Cond @@ -42,7 +32,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string) now := time.Now() cl.metawait.mu.Lock() - if now.Sub(cl.metawait.lastUpdate) < minRefreshTrigger { + if now.Sub(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge { cl.metawait.mu.Unlock() return } @@ -61,7 +51,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string) defer cl.metawait.mu.Unlock() for !quit { - if now.Sub(cl.metawait.lastUpdate) < minRefreshTrigger { + if now.Sub(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge { return } cl.metawait.c.Wait() @@ -86,7 +76,7 @@ func (cl *Client) triggerUpdateMetadata(must bool, why string) bool { if !must { cl.metawait.mu.Lock() defer cl.metawait.mu.Unlock() - if time.Since(cl.metawait.lastUpdate) < minRefreshTrigger { + if time.Since(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge { return false } } @@ -163,7 +153,28 @@ func (cl *Client) updateMetadataLoop() { again, err, why := cl.updateMetadata() if again || err != nil { - if now && nowTries < 3 { + // If err is non-nil, the metadata request failed + // itself and already retried 3x; we do not loop more. + // + // If err is nil, the a topic or partition had a load + // error and is perhaps still being created. We retry a + // few more times to give Kafka a chance to figure + // things out. By default this will put us at 2s of + // looping+waiting (250ms per wait, 8x), and if things + // still fail we will fall into the slower update below + // which waits (default) 5s between tries. + if now && err == nil && nowTries < 8 { + wait := 250 * time.Millisecond + if cl.cfg.metadataMinAge < wait { + wait = cl.cfg.metadataMinAge + } + timer := time.NewTimer(wait) + select { + case <-cl.ctx.Done(): + timer.Stop() + return + case <-timer.C: + } goto start } if err != nil {