From 2d6c1a803900f3a4d1f1e0010448763191b93afd Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 15 Oct 2021 18:10:28 -0600 Subject: [PATCH] client: lower some default timeouts * ConnTimeoutOverhead has been lowered from 20s to 10s. This will allow quicker detection of hung writes. For most purposes, this will be a fine timeout, and requests will retry if necessary. For extremely bad connections, users may need to raise this. * RetryTimeout has been lowered from a default of 1m to 30s, with JoinGroup, SyncGroup, and HeartbeatRequest being exempted to the session interval by default. For requests, this will allow quicker detection of hung brokers and will not retry as long. Internally in the client, this *should* result in just extra logging in most cases. For groups, this will avoid trying to stay in a group longer than the group would be valid. For #92. --- pkg/kgo/client.go | 12 ++++++++++++ pkg/kgo/config.go | 25 ++++++++++++++----------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 4f370214..b8658ca3 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -117,6 +117,18 @@ func NewClient(opts ...Opt) (*Client, error) { opt.apply(&cfg) } + if cfg.retryTimeout == nil { + cfg.retryTimeout = func(key int16) time.Duration { + switch key { + case ((*kmsg.JoinGroupRequest)(nil)).Key(), + ((*kmsg.SyncGroupRequest)(nil)).Key(), + ((*kmsg.HeartbeatRequest)(nil)).Key(): + return cfg.sessionTimeout + } + return 30 * time.Second + } + } + if err := cfg.validate(); err != nil { return nil, err } diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 0490b605..3c68ca96 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -389,7 +389,7 @@ func defaultCfg() cfg { id: &defaultID, dialFn: defaultDialer.DialContext, - requestTimeoutOverhead: 20 * time.Second, + requestTimeoutOverhead: 10 * time.Second, connIdleTimeout: 20 * time.Second, softwareName: "kgo", @@ -430,12 +430,6 @@ func defaultCfg() cfg { } }(), retries: 20, - retryTimeout: func(key int16) time.Duration { - if key == 26 { // EndTxn key - return 5 * time.Minute - } - return time.Minute - }, maxBrokerWriteBytes: 100 << 20, // Kafka socket.request.max.bytes default is 100<<20 maxBrokerReadBytes: 100 << 20, @@ -522,7 +516,7 @@ func WithLogger(l Logger) Opt { } // RequestTimeoutOverhead uses the given time as overhead while deadlining -// requests, overriding the default overhead of 20s. +// requests, overriding the default overhead of 10s. // // For most requests, the overhead will simply be this timeout. However, for // any request with a TimeoutMillis field, the overhead is added on top of the @@ -662,7 +656,12 @@ func RequestRetries(n int) Opt { } // RetryTimeout sets the upper limit on how long we allow requests to retry, -// overriding the default of 5m for EndTxn requests, 1m for all others. +// overriding the default of: +// +// JoinGroup: cfg.SessionTimeout (default 45s) +// SyncGroup: cfg.SessionTimeout (default 45s) +// Heartbeat: cfg.SessionTimeout (default 45s) +// others: 30s // // This timeout applies to any request issued through a client's Request // function. It does not apply to fetches nor produces. @@ -677,8 +676,12 @@ func RetryTimeout(t time.Duration) Opt { } // RetryTimeoutFn sets the per-request upper limit on how long we allow -// requests to retry, overriding the default of 5m for EndTxn requests, 1m for -// all others. +// requests to retry, overriding the default of: +// +// JoinGroup: cfg.SessionTimeout (default 45s) +// SyncGroup: cfg.SessionTimeout (default 45s) +// Heartbeat: cfg.SessionTimeout (default 45s) +// others: 30s // // This timeout applies to any request issued through a client's Request // function. It does not apply to fetches nor produces.