From e185676761dd5bb4d60f37c195dbdb970687f379 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 25 Aug 2021 10:32:14 -0600 Subject: [PATCH] breaking: change AllowedConcurrentFetches to MaxConcurrentFetches The name is clearer with the docs that immediately follow ("...the maximum number of fetch requests") and is more consistent with all other Max config options. --- pkg/kgo/config.go | 14 +++++++------- pkg/kgo/consumer.go | 10 +++++----- pkg/kgo/errors.go | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 81c928fb..fda8225a 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -133,7 +133,7 @@ type cfg struct { keepControl bool rack string - allowedConcurrentFetches int + maxConcurrentFetches int topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions partitions map[string]map[int32]Offset // partitions to directly consume from @@ -260,7 +260,7 @@ func (cfg *cfg) validate() error { {v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"}, // 0 <= allowed concurrency - {name: "allowed concurrency", v: int64(cfg.allowedConcurrentFetches), allowed: 0, badcmp: i64lt}, + {name: "max concurrent fetches", v: int64(cfg.maxConcurrentFetches), allowed: 0, badcmp: i64lt}, // 1s <= conn timeout overhead <= 15m {name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true}, @@ -432,7 +432,7 @@ func defaultCfg() cfg { resetOffset: NewOffset().AtStart(), isolationLevel: 0, - allowedConcurrentFetches: 0, // unbounded default + maxConcurrentFetches: 0, // unbounded default /////////// // group // @@ -1015,8 +1015,8 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }} } -// AllowedConcurrentFetches sets the maximum number of fetch requests to allow -// in flight or buffered at once, overriding the unbounded (i.e. number of +// MaxConcurrentFetches sets the maximum number of fetch requests to allow in +// flight or buffered at once, overriding the unbounded (i.e. number of // brokers) default. // // This setting, paired with FetchMaxBytes, can upper bound the maximum amount @@ -1039,8 +1039,8 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt { // // A value of 0 implies the allowed concurrency is unbounded and will be // limited only by the number of brokers in the cluster. -func AllowedConcurrentFetches(n int) ConsumerOpt { - return consumerOpt{func(cfg *cfg) { cfg.allowedConcurrentFetches = n }} +func MaxConcurrentFetches(n int) ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.maxConcurrentFetches = n }} } // ConsumeResetOffset sets the offset to restart consuming from when a diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 573d7a3d..feb3d1e4 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -795,7 +795,7 @@ type consumerSession struct { // they send back when they are done. Thus, three level chan. desireFetchCh chan chan chan struct{} cancelFetchCh chan chan chan struct{} - allowedConcurrency int + allowedFetches int fetchManagerStarted uint32 // atomic, once 1, we start the fetch manager // Workers signify the number of fetch and list / epoch goroutines that @@ -824,9 +824,9 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession { tps: tps, - desireFetchCh: make(chan chan chan struct{}, 8), - cancelFetchCh: make(chan chan chan struct{}, 4), - allowedConcurrency: c.cl.cfg.allowedConcurrentFetches, + desireFetchCh: make(chan chan chan struct{}, 8), + cancelFetchCh: make(chan chan chan struct{}, 4), + allowedFetches: c.cl.cfg.maxConcurrentFetches, } session.workersCond = sync.NewCond(&session.workersMu) return session @@ -875,7 +875,7 @@ func (c *consumerSession) manageFetchConcurrency() { ctxCh = nil } - if len(wantFetch) > 0 && (activeFetches < c.allowedConcurrency || c.allowedConcurrency == 0) { // 0 means unbounded + if len(wantFetch) > 0 && (activeFetches < c.allowedFetches || c.allowedFetches == 0) { // 0 means unbounded wantFetch[0] <- doneFetch wantFetch = wantFetch[1:] activeFetches++ diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 70259d6b..e4995bfe 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -93,7 +93,7 @@ var ( // A temporary error returned when a broker chosen for a request is // stopped due to a concurrent metadata response. - errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists") + errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this request has died--either the broker id is migrating or no longer exists") errProducerIDLoadFail = errors.New("unable to initialize a producer ID due to request failures")