Skip to content

Commit

Permalink
client: lower some default timeouts
Browse files Browse the repository at this point in the history
* ConnTimeoutOverhead has been lowered from 20s to 10s. This will allow
quicker detection of hung writes. For most purposes, this will be a fine
timeout, and requests will retry if necessary. For extremely bad
connections, users may need to raise this.

* RetryTimeout has been lowered from a default of 1m to 30s, with
JoinGroup, SyncGroup, and HeartbeatRequest being exempted to the session
interval by default. For requests, this will allow quicker detection of
hung brokers and will not retry as long. Internally in the client, this
*should* result in just extra logging in most cases. For groups, this
will avoid trying to stay in a group longer than the group would be
valid.

For #92.
  • Loading branch information
twmb committed Oct 16, 2021
1 parent a67edb5 commit 2d6c1a8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
12 changes: 12 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ func NewClient(opts ...Opt) (*Client, error) {
opt.apply(&cfg)
}

if cfg.retryTimeout == nil {
cfg.retryTimeout = func(key int16) time.Duration {
switch key {
case ((*kmsg.JoinGroupRequest)(nil)).Key(),
((*kmsg.SyncGroupRequest)(nil)).Key(),
((*kmsg.HeartbeatRequest)(nil)).Key():
return cfg.sessionTimeout
}
return 30 * time.Second
}
}

if err := cfg.validate(); err != nil {
return nil, err
}
Expand Down
25 changes: 14 additions & 11 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func defaultCfg() cfg {
id: &defaultID,
dialFn: defaultDialer.DialContext,

requestTimeoutOverhead: 20 * time.Second,
requestTimeoutOverhead: 10 * time.Second,
connIdleTimeout: 20 * time.Second,

softwareName: "kgo",
Expand Down Expand Up @@ -430,12 +430,6 @@ func defaultCfg() cfg {
}
}(),
retries: 20,
retryTimeout: func(key int16) time.Duration {
if key == 26 { // EndTxn key
return 5 * time.Minute
}
return time.Minute
},

maxBrokerWriteBytes: 100 << 20, // Kafka socket.request.max.bytes default is 100<<20
maxBrokerReadBytes: 100 << 20,
Expand Down Expand Up @@ -522,7 +516,7 @@ func WithLogger(l Logger) Opt {
}

// RequestTimeoutOverhead uses the given time as overhead while deadlining
// requests, overriding the default overhead of 20s.
// requests, overriding the default overhead of 10s.
//
// For most requests, the overhead will simply be this timeout. However, for
// any request with a TimeoutMillis field, the overhead is added on top of the
Expand Down Expand Up @@ -662,7 +656,12 @@ func RequestRetries(n int) Opt {
}

// RetryTimeout sets the upper limit on how long we allow requests to retry,
// overriding the default of 5m for EndTxn requests, 1m for all others.
// overriding the default of:
//
// JoinGroup: cfg.SessionTimeout (default 45s)
// SyncGroup: cfg.SessionTimeout (default 45s)
// Heartbeat: cfg.SessionTimeout (default 45s)
// others: 30s
//
// This timeout applies to any request issued through a client's Request
// function. It does not apply to fetches nor produces.
Expand All @@ -677,8 +676,12 @@ func RetryTimeout(t time.Duration) Opt {
}

// RetryTimeoutFn sets the per-request upper limit on how long we allow
// requests to retry, overriding the default of 5m for EndTxn requests, 1m for
// all others.
// requests to retry, overriding the default of:
//
// JoinGroup: cfg.SessionTimeout (default 45s)
// SyncGroup: cfg.SessionTimeout (default 45s)
// Heartbeat: cfg.SessionTimeout (default 45s)
// others: 30s
//
// This timeout applies to any request issued through a client's Request
// function. It does not apply to fetches nor produces.
Expand Down

0 comments on commit 2d6c1a8

Please sign in to comment.