Skip to content

Commit

Permalink
breaking: change AllowedConcurrentFetches to MaxConcurrentFetches
Browse files Browse the repository at this point in the history
The name is clearer with the docs that immediately follow ("...the
maximum number of fetch requests") and is more consistent with all other
Max config options.
  • Loading branch information
twmb committed Aug 25, 2021
1 parent d5e80b3 commit e185676
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
14 changes: 7 additions & 7 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type cfg struct {
keepControl bool
rack string

allowedConcurrentFetches int
maxConcurrentFetches int

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -260,7 +260,7 @@ func (cfg *cfg) validate() error {
{v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"},

// 0 <= allowed concurrency
{name: "allowed concurrency", v: int64(cfg.allowedConcurrentFetches), allowed: 0, badcmp: i64lt},
{name: "max concurrent fetches", v: int64(cfg.maxConcurrentFetches), allowed: 0, badcmp: i64lt},

// 1s <= conn timeout overhead <= 15m
{name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
Expand Down Expand Up @@ -432,7 +432,7 @@ func defaultCfg() cfg {
resetOffset: NewOffset().AtStart(),
isolationLevel: 0,

allowedConcurrentFetches: 0, // unbounded default
maxConcurrentFetches: 0, // unbounded default

///////////
// group //
Expand Down Expand Up @@ -1015,8 +1015,8 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }}
}

// AllowedConcurrentFetches sets the maximum number of fetch requests to allow
// in flight or buffered at once, overriding the unbounded (i.e. number of
// MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// flight or buffered at once, overriding the unbounded (i.e. number of
// brokers) default.
//
// This setting, paired with FetchMaxBytes, can upper bound the maximum amount
Expand All @@ -1039,8 +1039,8 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt {
//
// A value of 0 implies the allowed concurrency is unbounded and will be
// limited only by the number of brokers in the cluster.
func AllowedConcurrentFetches(n int) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.allowedConcurrentFetches = n }}
func MaxConcurrentFetches(n int) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxConcurrentFetches = n }}
}

// ConsumeResetOffset sets the offset to restart consuming from when a
Expand Down
10 changes: 5 additions & 5 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ type consumerSession struct {
// they send back when they are done. Thus, three level chan.
desireFetchCh chan chan chan struct{}
cancelFetchCh chan chan chan struct{}
allowedConcurrency int
allowedFetches int
fetchManagerStarted uint32 // atomic, once 1, we start the fetch manager

// Workers signify the number of fetch and list / epoch goroutines that
Expand Down Expand Up @@ -824,9 +824,9 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession {

tps: tps,

desireFetchCh: make(chan chan chan struct{}, 8),
cancelFetchCh: make(chan chan chan struct{}, 4),
allowedConcurrency: c.cl.cfg.allowedConcurrentFetches,
desireFetchCh: make(chan chan chan struct{}, 8),
cancelFetchCh: make(chan chan chan struct{}, 4),
allowedFetches: c.cl.cfg.maxConcurrentFetches,
}
session.workersCond = sync.NewCond(&session.workersMu)
return session
Expand Down Expand Up @@ -875,7 +875,7 @@ func (c *consumerSession) manageFetchConcurrency() {
ctxCh = nil
}

if len(wantFetch) > 0 && (activeFetches < c.allowedConcurrency || c.allowedConcurrency == 0) { // 0 means unbounded
if len(wantFetch) > 0 && (activeFetches < c.allowedFetches || c.allowedFetches == 0) { // 0 means unbounded
wantFetch[0] <- doneFetch
wantFetch = wantFetch[1:]
activeFetches++
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var (

// A temporary error returned when a broker chosen for a request is
// stopped due to a concurrent metadata response.
errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists")
errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this request has died--either the broker id is migrating or no longer exists")

errProducerIDLoadFail = errors.New("unable to initialize a producer ID due to request failures")

Expand Down

0 comments on commit e185676

Please sign in to comment.