Skip to content

Commit

Permalink
kgo: add AllowedConcurrentFetches option
Browse files Browse the repository at this point in the history
See doc on the option; this can be used to bound the max memory used as
a consumer.
  • Loading branch information
twmb committed Feb 10, 2021
1 parent 6f7c002 commit 4509d41
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 23 deletions.
35 changes: 35 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type cfg struct {
isolationLevel int8
keepControl bool
rack string

allowedConcurrentFetches int
}

func (cfg *cfg) validate() error {
Expand Down Expand Up @@ -172,6 +174,9 @@ func (cfg *cfg) validate() error {
{v: int64(cfg.maxBrokerWriteBytes), allowed: int64(cfg.maxRecordBatchBytes), badcmp: i64lt, fmt: "max broker write bytes %v is erroneously less than max record batch bytes %v"},
{v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"},

// 0 <= allowed concurrency
{name: "allowed concurrency", v: int64(cfg.allowedConcurrentFetches), allowed: 0, badcmp: i64lt},

// 1s <= conn timeout overhead <= 15m
{name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
{name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true},
Expand Down Expand Up @@ -289,6 +294,8 @@ func defaultCfg() cfg {
maxPartBytes: 10 << 20,
resetOffset: NewOffset().AtStart(),
isolationLevel: 0,

allowedConcurrentFetches: 0, // unbounded default
}
}

Expand Down Expand Up @@ -774,6 +781,34 @@ func FetchMaxPartitionBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }}
}

// AllowedConcurrentFetches sets the maximum number of fetch requests to allow
// in flight or buffered at once, overriding the unbounded (i.e. number of
// brokers) default.
//
// This setting, paired with FetchMaxBytes, can upper bound the maximum amount
// of memory that the client can use for consuming.
//
// Requests are issued to brokers in a FIFO order: once the client is ready to
// issue a request to a broker, it registers that request and issues it in
// order with other registrations.
//
// If Kafka replies with any data, the client does not track the fetch as
// completed until the user has polled the buffered fetch. Thus, a concurrent
// fetch is not considered complete until all data from it is done being
// processed and out of the client itself.
//
// Note that brokers are allowed to hang for up to FetchMaxWait before replying
// to a request, so if this option is too constrained and you are consuming a
// low throughput topic, the client may take a long time before requesting a
// broker that has new data. For high throughput topics, or if the allowed
// concurrent fetches is large enough, this should not be a concern.
//
// A value of 0 implies the allowed concurrency is unbounded and will be
// limited only by the number of brokers in the cluster.
func AllowedConcurrentFetches(n int) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.allowedConcurrentFetches = n }}
}

// ConsumeResetOffset sets the offset to restart consuming from when a
// partition has no commits (for groups) or when a fetch sees an
// OffsetOutOfRange error, overriding the default ConsumeStartOffset.
Expand Down
55 changes: 53 additions & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,15 @@ type consumerSession struct {
ctx context.Context
cancel func()

// desireFetchCh is sized to the number of concurrent fetches we are
// configured to be able to send.
//
// We receive desires from sources, we reply when they can fetch, and
// they send back when they are done. Thus, three level chan.
desireFetchCh chan chan chan<- struct{}
allowedConcurrency int
fetchManagerStarted uint32 // atomic, once 1, we start the fetch manager

// Workers signify the number of fetch and list / epoch goroutines that
// are currently running within the context of this consumer session.
// Stopping a session only returns once workers hits zero.
Expand All @@ -610,13 +619,53 @@ func (c *consumer) newConsumerSession() *consumerSession {
session := &consumerSession{
c: c,

desireFetchCh: make(chan chan chan<- struct{}, 8),
allowedConcurrency: c.cl.cfg.allowedConcurrentFetches,

ctx: ctx,
cancel: cancel,
}
session.workersCond = sync.NewCond(&session.workersMu)
return session
}

func (c *consumerSession) desireFetch() chan<- chan chan<- struct{} {
if atomic.SwapUint32(&c.fetchManagerStarted, 1) == 0 {
go c.manageFetchConcurrency()
}
return c.desireFetchCh
}

func (c *consumerSession) manageFetchConcurrency() {
var (
activeFetches int
doneFetch = make(chan struct{}, 20)
wantFetch []chan chan<- struct{}
)
for {
select {
case register := <-c.desireFetchCh:
wantFetch = append(wantFetch, register)
case <-doneFetch:
activeFetches--

if activeFetches == 0 {
select {
case <-c.ctx.Done():
return // we are dead
default:
}
}
}

if len(wantFetch) > 0 && (activeFetches < c.allowedConcurrency || c.allowedConcurrency == 0) { // 0 means unbounded
wantFetch[0] <- doneFetch
wantFetch = wantFetch[1:]
activeFetches++
}
}
}

func (c *consumerSession) incWorker() {
c.workersMu.Lock()
defer c.workersMu.Unlock()
Expand Down Expand Up @@ -698,7 +747,9 @@ func (c *consumer) stopSession() listOrEpochLoads {
}
session.workersMu.Unlock()

// At this point, all fetches, lists, and loads are dead.
// At this point, all fetches, lists, and loads are dead. We can close
// our num-fetches manager without worrying about a source trying to
// register itself.

c.cl.sinksAndSourcesMu.Lock()
for _, sns := range c.cl.sinksAndSources {
Expand All @@ -707,7 +758,7 @@ func (c *consumer) stopSession() listOrEpochLoads {
c.cl.sinksAndSourcesMu.Unlock()

// At this point, if we begin fetching anew, then the sources will not
// be using stale sessions.
// be using stale fetch sessions.

c.sourcesReadyMu.Lock()
defer c.sourcesReadyMu.Unlock()
Expand Down
78 changes: 57 additions & 21 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,25 +286,30 @@ func (os usedOffsets) finishUsingAll() {
type bufferedFetch struct {
fetch Fetch

usedOffsets usedOffsets // what the offsets will be next if this fetch is used
doneFetch chan<- struct{} // when unbuffered, we send down this
usedOffsets usedOffsets // what the offsets will be next if this fetch is used
}

// takeBuffered drains a buffered fetch and updates offsets.
func (s *source) takeBuffered() Fetch {
r := s.buffered
s.buffered = bufferedFetch{}
r.usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) {
o.from.setOffset(o.cursorOffset)
return s.takeBufferedFn(func(usedOffsets usedOffsets) {
usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) {
o.from.setOffset(o.cursorOffset)
})
})
close(s.sem)
return r.fetch
}

func (s *source) discardBuffered() {
s.takeBufferedFn(usedOffsets.finishUsingAll)
}

func (s *source) takeBufferedFn(offsetFn func(usedOffsets)) Fetch {
r := s.buffered
s.buffered = bufferedFetch{}
r.usedOffsets.finishUsingAll()
offsetFn(r.usedOffsets)
r.doneFetch <- struct{}{}
close(s.sem)
return r.fetch
}

// createReq actually creates a fetch request.
Expand Down Expand Up @@ -375,6 +380,10 @@ func (s *source) loopFetch() {
default:
}

// We receive on canFetch when we can fetch, and we send back when we
// are done fetching.
canFetch := make(chan chan<- struct{}, 1)

again := true
for again {
select {
Expand All @@ -383,7 +392,21 @@ func (s *source) loopFetch() {
return
case <-s.sem:
}
again = s.fetchState.maybeFinish(s.fetch(session))

select {
case <-session.ctx.Done():
s.fetchState.hardFinish()
return
case session.desireFetch() <- canFetch:
}

select {
case <-session.ctx.Done():
s.fetchState.hardFinish()
return
case doneFetch := <-canFetch:
again = s.fetchState.maybeFinish(s.fetch(session, doneFetch))
}
}

}
Expand All @@ -405,8 +428,23 @@ func (s *source) loopFetch() {
// *even if* the source needs to be stopped. The knowledge of which preferred
// replica to use would not be out of date even if the consumer session is
// changing.
func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct{}) (fetched bool) {
req := s.createReq()

// For all returns, if we do not buffer our fetch, then we want to
// ensure our used offsets are usable again.
var alreadySentToDoneFetch bool
defer func() {
if len(s.buffered.fetch.Topics) == 0 {
if req.numOffsets > 0 {
req.usedOffsets.finishUsingAll()
}
if !alreadySentToDoneFetch {
doneFetch <- struct{}{}
}
}
}()

if req.numOffsets == 0 { // cursors could have been set unusable
return
}
Expand All @@ -433,15 +471,20 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
case <-requested:
fetched = true
case <-ctx.Done():
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}

// If we had an error, we backoff. Killing a fetch quits the backoff,
// but that is fine; we may just re-request too early and fall into
// another backoff.
if err != nil {
// We preemptively allow more fetches (since we are not buffering)
// and reset our session because of the error (who knows if kafka
// processed the request but the client failed to receive it).
doneFetch <- struct{}{}
alreadySentToDoneFetch = true
s.session.reset()

s.cl.triggerUpdateMetadata()
s.consecutiveFailures++
after := time.NewTimer(s.cl.cfg.retryBackoff(s.consecutiveFailures))
Expand All @@ -450,8 +493,6 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
case <-after.C:
case <-ctx.Done():
}
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}
s.consecutiveFailures = 0
Expand Down Expand Up @@ -479,8 +520,6 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
select {
case <-handled:
case <-ctx.Done():
req.usedOffsets.finishUsingAll()
s.session.reset()
return
}

Expand Down Expand Up @@ -528,12 +567,10 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session")
s.session.reset()
}
req.usedOffsets.finishUsingAll()
return
case kerr.InvalidFetchSessionEpoch:
s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err)
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}

Expand All @@ -543,7 +580,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {

// If we moved any partitions to preferred replicas, we reset the
// session. We do this after bumping the epoch just to ensure that we
// have truly reset the session.
// have truly reset the session. (TODO switch to usingForgottenTopics)
if len(preferreds) > 0 {
s.session.reset()
}
Expand All @@ -557,12 +594,11 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
if len(fetch.Topics) > 0 {
s.buffered = bufferedFetch{
fetch: fetch,
doneFetch: doneFetch,
usedOffsets: req.usedOffsets,
}
s.sem = make(chan struct{})
s.cl.consumer.addSourceReadyForDraining(s)
} else {
req.usedOffsets.finishUsingAll()
}
return
}
Expand Down

0 comments on commit 4509d41

Please sign in to comment.