Skip to content

Commit

Permalink
consumer: allow disabling fetch sessions with a config opt
Browse files Browse the repository at this point in the history
Sometimes users may not want these.
  • Loading branch information
twmb committed Nov 12, 2021
1 parent 7cd959c commit b6759bc
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
30 changes: 30 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type cfg struct {
rack string

maxConcurrentFetches int
disableFetchSessions bool

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -1193,6 +1194,35 @@ func ConsumeRegex() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.regex = true }}
}

// DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).
//
// A "fetch session" is is a way to reduce bandwidth for fetch requests &
// responses, and to potentially reduce the amount of work that brokers have to
// do to handle fetch requests. A fetch session opts in to the broker tracking
// some state of what the client is interested in. For example, say that you
// are interested in thousands of topics, and most of these topics are
// receiving data only rarely. A fetch session allows the client to register
// that it is interested in those thousands of topics on the first request. On
// future requests, if the offsets for these topics have not changed, those
// topics will be elided from the request. The broker knows to reply with the
// extra topics if any new data is available, otherwise the topics are also
// elided from the response. This massively reduces the amount of information
// that needs to be included in requests or responses.
//
// Using fetch sessions means more state is stored on brokers. Maintaining this
// state eats some memory. If you have thousands of consumers, you may not want
// fetch sessions to be used for everything. Brokers intelligently handle this
// by not creating sessions if they are at their configured limit, but you may
// consider disabling sessions if they are generally not useful to you. Brokers
// have metrics for the number of fetch sessions active, so you can monitor
// that to determine whether enabling or disabling sessions is beneficial or
// not.
//
// For more details on fetch sessions, see KIP-227.
func DisableFetchSessions() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand Down
3 changes: 3 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (cl *Client) newSource(nodeID int32) *source {
nodeID: nodeID,
sem: make(chan struct{}),
}
if cl.cfg.disableFetchSessions {
s.session.kill()
}
close(s.sem)
return s
}
Expand Down

0 comments on commit b6759bc

Please sign in to comment.