diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 8c07254d..d7612081 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -78,8 +78,8 @@ type Client struct { coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad - updateMetadataCh chan struct{} - updateMetadataNowCh chan struct{} // like above, but with high priority + updateMetadataCh chan string + updateMetadataNowCh chan string // like above, but with high priority metawait metawait metadone chan struct{} } @@ -164,8 +164,8 @@ func NewClient(opts ...Opt) (*Client, error) { coordinators: make(map[coordinatorKey]*coordinatorLoad), - updateMetadataCh: make(chan struct{}, 1), - updateMetadataNowCh: make(chan struct{}, 1), + updateMetadataCh: make(chan string, 1), + updateMetadataNowCh: make(chan string, 1), metadone: make(chan struct{}), } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index b7663101..ad717dc2 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -167,7 +167,7 @@ func (c *consumer) init(cl *Client) { return // not consuming } - defer cl.triggerUpdateMetadata(true) // we definitely want to trigger a metadata update + defer cl.triggerUpdateMetadata(true, "client initialization") // we definitely want to trigger a metadata update if len(cl.cfg.group) == 0 { c.initDirect() @@ -527,7 +527,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } else { // else we guarded it c.unguardSessionChange(session) } - loadOffsets.loadWithSession(session) // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow + loadOffsets.loadWithSession(session, "loading offsets in new session from assign") // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow // If we started a new session or if we unguarded, we have one // worker. This one worker allowed us to safely add our load @@ -868,17 +868,17 @@ func (dst *listOrEpochLoads) mergeFrom(src listOrEpochLoads) { func (l listOrEpochLoads) isEmpty() bool { return len(l.List) == 0 && len(l.Epoch) == 0 } -func (l listOrEpochLoads) loadWithSession(s *consumerSession) { +func (l listOrEpochLoads) loadWithSession(s *consumerSession, why string) { if !l.isEmpty() { s.incWorker() - go s.listOrEpoch(l, false) + go s.listOrEpoch(l, false, why) } } -func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession) bool { +func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession, why string) bool { if !l.isEmpty() { s.incWorker() - go s.listOrEpoch(l, true) + go s.listOrEpoch(l, true, why) return true } return false @@ -1153,14 +1153,14 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession { // This function is responsible for issuing ListOffsets or // OffsetForLeaderEpoch. These requests's responses are only handled within // the context of a consumer session. -func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool) { +func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) { defer s.decWorker() wait := true if immediate { - s.c.cl.triggerUpdateMetadataNow() + s.c.cl.triggerUpdateMetadataNow(why) } else { - wait = s.c.cl.triggerUpdateMetadata(false) // avoid trigger if within refresh interval + wait = s.c.cl.triggerUpdateMetadata(false, why) // avoid trigger if within refresh interval } s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during meta update into one @@ -1217,7 +1217,7 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool) // add things back to the session, we could abandon // loading these offsets and have a stuck cursor. defer s.decWorker() - defer reloads.loadWithSession(s) + defer reloads.loadWithSession(s, "reload offsets from load failure") after := time.NewTimer(time.Second) defer after.Stop() select { diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 65e883ec..84fe9672 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -37,7 +37,7 @@ type groupConsumer struct { // happening. syncCommitMu sync.RWMutex - rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) + rejoinCh chan string // cap 1; sent to if subscription changes (regex) // For EOS, before we commit, we force a heartbeat. If the client and // group member are both configured properly, then the transactional @@ -142,7 +142,7 @@ func (c *consumer) initGroup() { manageDone: make(chan struct{}), cooperative: c.cl.cfg.cooperative(), tps: newTopicsPartitions(), - rejoinCh: make(chan struct{}, 1), + rejoinCh: make(chan string, 1), heartbeatForceCh: make(chan func(error)), using: make(map[string]int), } @@ -303,7 +303,7 @@ func (g *groupConsumer) manage() { "backoff", backoff, ) deadline := time.Now().Add(backoff) - g.cl.waitmeta(g.ctx, backoff) + g.cl.waitmeta(g.ctx, backoff, "waitmeta during join & sync error backoff") after := time.NewTimer(time.Until(deadline)) select { case <-g.ctx.Done(): @@ -522,7 +522,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi return } - defer g.rejoin() // cooperative consumers rejoin after they revoking what they lost + defer g.rejoin("cooperative rejoin after revoking what we lost") // cooperative consumers rejoin after they revoking what they lost // The block below deletes everything lost from our uncommitted map. // All commits should be **completed** by the time this runs. An async @@ -721,9 +721,10 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio heartbeat = true case force = <-g.heartbeatForceCh: heartbeat = true - case <-g.rejoinCh: + case why := <-g.rejoinCh: // If a metadata update changes our subscription, // we just pretend we are rebalancing. + g.cfg.logger.Log(LogLevelInfo, "forced rejoin quitting heartbeat loop", "why", why) err = kerr.RebalanceInProgress case err = <-fetchErrCh: fetchErrCh = nil @@ -811,7 +812,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio waited := make(chan struct{}) metadone = waited go func() { - g.cl.waitmeta(g.ctx, g.cfg.sessionTimeout) + g.cl.waitmeta(g.ctx, g.cfg.sessionTimeout, "waitmeta after heartbeat error") close(waited) }() } @@ -837,16 +838,16 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // rebalance and will instead reply to the member with its current assignment. func (cl *Client) ForceRebalance() { if g := cl.consumer.g; g != nil { - g.rejoin() + g.rejoin("rejoin from ForceRebalance") } } // rejoin is called after a cooperative member revokes what it lost at the // beginning of a session, or if we are leader and detect new partitions to // consume. -func (g *groupConsumer) rejoin() { +func (g *groupConsumer) rejoin(why string) { select { - case g.rejoinCh <- struct{}{}: + case g.rejoinCh <- why: default: } } @@ -1186,11 +1187,7 @@ start: } } - if g.cfg.logger.Level() >= LogLevelDebug { - g.cfg.logger.Log(LogLevelDebug, "fetched committed offsets", "group", g.cfg.group, "fetched", offsets) - } else { - g.cfg.logger.Log(LogLevelInfo, "fetched committed offsets", "group", g.cfg.group) - } + g.cfg.logger.Log(LogLevelInfo, "fetched committed offsets", "group", g.cfg.group, "fetched", offsets) return nil } @@ -1279,8 +1276,10 @@ func (g *groupConsumer) findNewAssignments() { return } - if numNewTopics > 0 || g.leader.get() { - g.rejoin() + if numNewTopics > 0 { + g.rejoin("rejoining because there are more topics to consume, our interests have changed") + } else if g.leader.get() { + g.rejoin("rejoining because we are the leader and noticed some topics have new partitions") } } diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 35f45d82..c9f4fee4 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -37,7 +37,7 @@ func (m *metawait) signal() { // waitmeta returns immediately if metadata was updated within the last second, // otherwise this waits for up to wait for a metadata update to complete. -func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) { +func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string) { now := time.Now() cl.metawait.mu.Lock() @@ -47,7 +47,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) { } cl.metawait.mu.Unlock() - cl.triggerUpdateMetadataNow() + cl.triggerUpdateMetadataNow(why) quit := false done := make(chan struct{}) @@ -81,7 +81,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) { cl.metawait.c.Broadcast() } -func (cl *Client) triggerUpdateMetadata(must bool) bool { +func (cl *Client) triggerUpdateMetadata(must bool, why string) bool { if !must { cl.metawait.mu.Lock() defer cl.metawait.mu.Unlock() @@ -91,15 +91,15 @@ func (cl *Client) triggerUpdateMetadata(must bool) bool { } select { - case cl.updateMetadataCh <- struct{}{}: + case cl.updateMetadataCh <- why: default: } return true } -func (cl *Client) triggerUpdateMetadataNow() { +func (cl *Client) triggerUpdateMetadataNow(why string) { select { - case cl.updateMetadataNowCh <- struct{}{}: + case cl.updateMetadataNowCh <- why: default: } } @@ -119,8 +119,11 @@ func (cl *Client) updateMetadataLoop() { case <-cl.ctx.Done(): return case <-ticker.C: - case <-cl.updateMetadataCh: - case <-cl.updateMetadataNowCh: + cl.cfg.logger.Log(LogLevelInfo, "updating metadata due to max age ticker") + case why := <-cl.updateMetadataCh: + cl.cfg.logger.Log(LogLevelInfo, "updating metadata", "why", why) + case why := <-cl.updateMetadataNowCh: + cl.cfg.logger.Log(LogLevelInfo, "immediately updating metadata", "why", why) now = true } @@ -134,17 +137,18 @@ func (cl *Client) updateMetadataLoop() { case <-cl.ctx.Done(): timer.Stop() return - case <-cl.updateMetadataNowCh: + case why := <-cl.updateMetadataNowCh: timer.Stop() + cl.cfg.logger.Log(LogLevelInfo, "immediately updating metadata, bypassing normal metadata wait", "why", why) case <-timer.C: } } - } else { - // Even with an "update now", we sleep just a bit to allow some - // potential pile on now triggers. - time.Sleep(10 * time.Millisecond) } + // Even with an "update now", we sleep just a bit to allow some + // potential pile on now triggers. + time.Sleep(time.Until(lastAt.Add(10 * time.Millisecond))) + // Drain any refires that occured during our waiting. out: for { @@ -161,7 +165,11 @@ func (cl *Client) updateMetadataLoop() { if now && nowTries < 3 { goto start } - cl.triggerUpdateMetadata(true) + if err != nil { + cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err)) + } else { + cl.triggerUpdateMetadata(true, "re-updating metadata due inner topic or partition error") + } } if err == nil { lastAt = time.Now() @@ -265,7 +273,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { if consumerSessionStopped { session := cl.consumer.startNewSession(tpsPrior) defer session.decWorker() - reloadOffsets.loadWithSession(session) + reloadOffsets.loadWithSession(session, "resuming reload offsets after session stopped for cursor migrating in metadata") } }() diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 9fbd0b64..42a47956 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -649,7 +649,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, * p.topics.storeTopics([]string{topic}) cl.addUnknownTopicRecord(pr) - cl.triggerUpdateMetadataNow() + cl.triggerUpdateMetadataNow("forced load due to unknown produce topic") return nil, nil } } @@ -664,7 +664,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, * return parts, v } cl.addUnknownTopicRecord(pr) - cl.triggerUpdateMetadata(false) + cl.triggerUpdateMetadata(false, "reload trigger due to produce topic still not known") return nil, nil // our record is buffered waiting for metadata update; nothing to return } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index e295e1a3..ddff1e10 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -185,7 +185,7 @@ func (s *sink) maybeBackoff() { } defer s.clearBackoff() - s.cl.triggerUpdateMetadata(false) // as good a time as any + s.cl.triggerUpdateMetadata(false, "opportunistic load during sink backoff") // as good a time as any tries := int(atomic.AddUint32(&s.consecutiveFailures, 1)) after := time.NewTimer(s.cl.cfg.retryBackoff(tries)) @@ -501,7 +501,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { if updateMeta { s.cl.cfg.logger.Log(LogLevelInfo, "produce request failed triggering metadata update", "broker", logID(s.nodeID), "err", err) } - s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false) + s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false, "failed produce request triggering metadata update") case err == ErrClientClosed: s.cl.failBufferedRecords(ErrClientClosed) @@ -623,10 +623,10 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response if len(req.batches) > 0 { s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID)) - s.handleRetryBatches(req.batches, 0, true, false) + s.handleRetryBatches(req.batches, 0, true, false, "kafka did not reply to all topics in produce request") } if len(reqRetry) > 0 { - s.handleRetryBatches(reqRetry, 0, true, true) + s.handleRetryBatches(reqRetry, 0, true, true, "produce request had retry batches") } } @@ -867,6 +867,7 @@ func (s *sink) handleRetryBatches( backoffSeq uint32, updateMeta bool, // if we should maybe update the metadata canFail bool, // if records can fail if they are at limits + why string, ) { var needsMetaUpdate bool retry.eachOwnerLocked(func(batch seqRecBatch) { @@ -896,7 +897,7 @@ func (s *sink) handleRetryBatches( // If we do want to metadata update, we only do so if any batch was the // first batch in its buf / not concurrently failed. if needsMetaUpdate { - s.cl.triggerUpdateMetadata(true) + s.cl.triggerUpdateMetadata(true, why) } else if !updateMeta { s.maybeTriggerBackoff(backoffSeq) s.maybeDrain() diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index cb98a31c..1bc9de43 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -233,7 +233,7 @@ func (p *cursorOffsetPreferred) move() { c.source.cl.sinksAndSourcesMu.Unlock() if !exists { - c.source.cl.triggerUpdateMetadataNow() + c.source.cl.triggerUpdateMetadataNow("cursor moving to a different broker that is not yet known") return } @@ -604,7 +604,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct alreadySentToDoneFetch = true s.session.reset() - s.cl.triggerUpdateMetadata(false) // as good a time as any + s.cl.triggerUpdateMetadata(false, "opportunistic load during source backoff") // as good a time as any s.consecutiveFailures++ after := time.NewTimer(s.cl.cfg.retryBackoff(s.consecutiveFailures)) defer after.Stop() @@ -695,7 +695,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct case kerr.FetchSessionTopicIDError, kerr.UnknownTopicID, kerr.InconsistentTopicID: s.cl.cfg.logger.Log(LogLevelInfo, "topic id issues, resetting session and updating metadata", "broker", logID(s.nodeID), "err", err) s.session.reset() - s.cl.triggerUpdateMetadataNow() + s.cl.triggerUpdateMetadataNow("topic id issues") return } @@ -721,8 +721,8 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct s.session.reset() } - if updateMeta && !reloadOffsets.loadWithSessionNow(consumerSession) { - s.cl.triggerUpdateMetadataNow() + if updateMeta && !reloadOffsets.loadWithSessionNow(consumerSession, "out of range offset / fenced leader epoch caused reload offsets on fetch") { + s.cl.triggerUpdateMetadataNow("fetch partitition had an error causing immediate metadata update") } if fetch.hasErrorsOrRecords() {