diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 4b8a3df8..7a01a6a2 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -102,6 +102,8 @@ type cfg struct { isolationLevel int8 keepControl bool rack string + + allowedConcurrentFetches int } func (cfg *cfg) validate() error { @@ -172,6 +174,9 @@ func (cfg *cfg) validate() error { {v: int64(cfg.maxBrokerWriteBytes), allowed: int64(cfg.maxRecordBatchBytes), badcmp: i64lt, fmt: "max broker write bytes %v is erroneously less than max record batch bytes %v"}, {v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"}, + // 0 <= allowed concurrency + {name: "allowed concurrency", v: int64(cfg.allowedConcurrentFetches), allowed: 0, badcmp: i64lt}, + // 1s <= conn timeout overhead <= 15m {name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true}, {name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true}, @@ -289,6 +294,8 @@ func defaultCfg() cfg { maxPartBytes: 10 << 20, resetOffset: NewOffset().AtStart(), isolationLevel: 0, + + allowedConcurrentFetches: 0, // unbounded default } } @@ -774,6 +781,34 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }} } +// AllowedConcurrentFetches sets the maximum number of fetch requests to allow +// in flight or buffered at once, overriding the unbounded (i.e. number of +// brokers) default. +// +// This setting, paired with FetchMaxBytes, can upper bound the maximum amount +// of memory that the client can use for consuming. +// +// Requests are issued to brokers in a FIFO order: once the client is ready to +// issue a request to a broker, it registers that request and issues it in +// order with other registrations. +// +// If Kafka replies with any data, the client does not track the fetch as +// completed until the user has polled the buffered fetch. Thus, a concurrent +// fetch is not considered complete until all data from it is done being +// processed and out of the client itself. +// +// Note that brokers are allowed to hang for up to FetchMaxWait before replying +// to a request, so if this option is too constrained and you are consuming a +// low throughput topic, the client may take a long time before requesting a +// broker that has new data. For high throughput topics, or if the allowed +// concurrent fetches is large enough, this should not be a concern. +// +// A value of 0 implies the allowed concurrency is unbounded and will be +// limited only by the number of brokers in the cluster. +func AllowedConcurrentFetches(n int) ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.allowedConcurrentFetches = n }} +} + // ConsumeResetOffset sets the offset to restart consuming from when a // partition has no commits (for groups) or when a fetch sees an // OffsetOutOfRange error, overriding the default ConsumeStartOffset. diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 7453c721..e03a479a 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -592,6 +592,15 @@ type consumerSession struct { ctx context.Context cancel func() + // desireFetchCh is sized to the number of concurrent fetches we are + // configured to be able to send. + // + // We receive desires from sources, we reply when they can fetch, and + // they send back when they are done. Thus, three level chan. + desireFetchCh chan chan chan<- struct{} + allowedConcurrency int + fetchManagerStarted uint32 // atomic, once 1, we start the fetch manager + // Workers signify the number of fetch and list / epoch goroutines that // are currently running within the context of this consumer session. // Stopping a session only returns once workers hits zero. @@ -610,6 +619,9 @@ func (c *consumer) newConsumerSession() *consumerSession { session := &consumerSession{ c: c, + desireFetchCh: make(chan chan chan<- struct{}, 8), + allowedConcurrency: c.cl.cfg.allowedConcurrentFetches, + ctx: ctx, cancel: cancel, } @@ -617,6 +629,43 @@ func (c *consumer) newConsumerSession() *consumerSession { return session } +func (c *consumerSession) desireFetch() chan<- chan chan<- struct{} { + if atomic.SwapUint32(&c.fetchManagerStarted, 1) == 0 { + go c.manageFetchConcurrency() + } + return c.desireFetchCh +} + +func (c *consumerSession) manageFetchConcurrency() { + var ( + activeFetches int + doneFetch = make(chan struct{}, 20) + wantFetch []chan chan<- struct{} + ) + for { + select { + case register := <-c.desireFetchCh: + wantFetch = append(wantFetch, register) + case <-doneFetch: + activeFetches-- + + if activeFetches == 0 { + select { + case <-c.ctx.Done(): + return // we are dead + default: + } + } + } + + if len(wantFetch) > 0 && (activeFetches < c.allowedConcurrency || c.allowedConcurrency == 0) { // 0 means unbounded + wantFetch[0] <- doneFetch + wantFetch = wantFetch[1:] + activeFetches++ + } + } +} + func (c *consumerSession) incWorker() { c.workersMu.Lock() defer c.workersMu.Unlock() @@ -698,7 +747,9 @@ func (c *consumer) stopSession() listOrEpochLoads { } session.workersMu.Unlock() - // At this point, all fetches, lists, and loads are dead. + // At this point, all fetches, lists, and loads are dead. We can close + // our num-fetches manager without worrying about a source trying to + // register itself. c.cl.sinksAndSourcesMu.Lock() for _, sns := range c.cl.sinksAndSources { @@ -707,7 +758,7 @@ func (c *consumer) stopSession() listOrEpochLoads { c.cl.sinksAndSourcesMu.Unlock() // At this point, if we begin fetching anew, then the sources will not - // be using stale sessions. + // be using stale fetch sessions. c.sourcesReadyMu.Lock() defer c.sourcesReadyMu.Unlock() diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 2f4c9a01..0bcec929 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -286,25 +286,30 @@ func (os usedOffsets) finishUsingAll() { type bufferedFetch struct { fetch Fetch - usedOffsets usedOffsets // what the offsets will be next if this fetch is used + doneFetch chan<- struct{} // when unbuffered, we send down this + usedOffsets usedOffsets // what the offsets will be next if this fetch is used } // takeBuffered drains a buffered fetch and updates offsets. func (s *source) takeBuffered() Fetch { - r := s.buffered - s.buffered = bufferedFetch{} - r.usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) { - o.from.setOffset(o.cursorOffset) + return s.takeBufferedFn(func(usedOffsets usedOffsets) { + usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) { + o.from.setOffset(o.cursorOffset) + }) }) - close(s.sem) - return r.fetch } func (s *source) discardBuffered() { + s.takeBufferedFn(usedOffsets.finishUsingAll) +} + +func (s *source) takeBufferedFn(offsetFn func(usedOffsets)) Fetch { r := s.buffered s.buffered = bufferedFetch{} - r.usedOffsets.finishUsingAll() + offsetFn(r.usedOffsets) + r.doneFetch <- struct{}{} close(s.sem) + return r.fetch } // createReq actually creates a fetch request. @@ -375,6 +380,10 @@ func (s *source) loopFetch() { default: } + // We receive on canFetch when we can fetch, and we send back when we + // are done fetching. + canFetch := make(chan chan<- struct{}, 1) + again := true for again { select { @@ -383,7 +392,21 @@ func (s *source) loopFetch() { return case <-s.sem: } - again = s.fetchState.maybeFinish(s.fetch(session)) + + select { + case <-session.ctx.Done(): + s.fetchState.hardFinish() + return + case session.desireFetch() <- canFetch: + } + + select { + case <-session.ctx.Done(): + s.fetchState.hardFinish() + return + case doneFetch := <-canFetch: + again = s.fetchState.maybeFinish(s.fetch(session, doneFetch)) + } } } @@ -405,8 +428,23 @@ func (s *source) loopFetch() { // *even if* the source needs to be stopped. The knowledge of which preferred // replica to use would not be out of date even if the consumer session is // changing. -func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { +func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct{}) (fetched bool) { req := s.createReq() + + // For all returns, if we do not buffer our fetch, then we want to + // ensure our used offsets are usable again. + var alreadySentToDoneFetch bool + defer func() { + if len(s.buffered.fetch.Topics) == 0 { + if req.numOffsets > 0 { + req.usedOffsets.finishUsingAll() + } + if !alreadySentToDoneFetch { + doneFetch <- struct{}{} + } + } + }() + if req.numOffsets == 0 { // cursors could have been set unusable return } @@ -433,8 +471,6 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { case <-requested: fetched = true case <-ctx.Done(): - s.session.reset() - req.usedOffsets.finishUsingAll() return } @@ -442,6 +478,13 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { // but that is fine; we may just re-request too early and fall into // another backoff. if err != nil { + // We preemptively allow more fetches (since we are not buffering) + // and reset our session because of the error (who knows if kafka + // processed the request but the client failed to receive it). + doneFetch <- struct{}{} + alreadySentToDoneFetch = true + s.session.reset() + s.cl.triggerUpdateMetadata() s.consecutiveFailures++ after := time.NewTimer(s.cl.cfg.retryBackoff(s.consecutiveFailures)) @@ -450,8 +493,6 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { case <-after.C: case <-ctx.Done(): } - s.session.reset() - req.usedOffsets.finishUsingAll() return } s.consecutiveFailures = 0 @@ -479,8 +520,6 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { select { case <-handled: case <-ctx.Done(): - req.usedOffsets.finishUsingAll() - s.session.reset() return } @@ -528,12 +567,10 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session") s.session.reset() } - req.usedOffsets.finishUsingAll() return case kerr.InvalidFetchSessionEpoch: s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err) s.session.reset() - req.usedOffsets.finishUsingAll() return } @@ -543,7 +580,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { // If we moved any partitions to preferred replicas, we reset the // session. We do this after bumping the epoch just to ensure that we - // have truly reset the session. + // have truly reset the session. (TODO switch to usingForgottenTopics) if len(preferreds) > 0 { s.session.reset() } @@ -557,12 +594,11 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { if len(fetch.Topics) > 0 { s.buffered = bufferedFetch{ fetch: fetch, + doneFetch: doneFetch, usedOffsets: req.usedOffsets, } s.sem = make(chan struct{}) s.cl.consumer.addSourceReadyForDraining(s) - } else { - req.usedOffsets.finishUsingAll() } return }