Skip to content

Commit

Permalink
kgo: improve logging on metadata triggers, fetched offsets, rejoins
Browse files Browse the repository at this point in the history
This commit adds a lot of logging:

- all metadata triggers now say why they are triggering metadata; the
triggers that actually causes metadata refreshes are logged (duplicate
triggers during a refresh are dropped)

- all internally forced rejoins now say why they are triggering a rejoin

- fetched offsets are now completely logged by default at info

- we now absolutely sleep at least 10ms in metadata between updates,
previously a delayed trigger followed by an immediate trigger could
bypass the 10ms
  • Loading branch information
twmb committed Oct 25, 2021
1 parent d9054e0 commit 696392e
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 57 deletions.
8 changes: 4 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type Client struct {
coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]*coordinatorLoad

updateMetadataCh chan struct{}
updateMetadataNowCh chan struct{} // like above, but with high priority
updateMetadataCh chan string
updateMetadataNowCh chan string // like above, but with high priority
metawait metawait
metadone chan struct{}
}
Expand Down Expand Up @@ -164,8 +164,8 @@ func NewClient(opts ...Opt) (*Client, error) {

coordinators: make(map[coordinatorKey]*coordinatorLoad),

updateMetadataCh: make(chan struct{}, 1),
updateMetadataNowCh: make(chan struct{}, 1),
updateMetadataCh: make(chan string, 1),
updateMetadataNowCh: make(chan string, 1),
metadone: make(chan struct{}),
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *consumer) init(cl *Client) {
return // not consuming
}

defer cl.triggerUpdateMetadata(true) // we definitely want to trigger a metadata update
defer cl.triggerUpdateMetadata(true, "client initialization") // we definitely want to trigger a metadata update

if len(cl.cfg.group) == 0 {
c.initDirect()
Expand Down Expand Up @@ -527,7 +527,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
} else { // else we guarded it
c.unguardSessionChange(session)
}
loadOffsets.loadWithSession(session) // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow
loadOffsets.loadWithSession(session, "loading offsets in new session from assign") // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow

// If we started a new session or if we unguarded, we have one
// worker. This one worker allowed us to safely add our load
Expand Down Expand Up @@ -868,17 +868,17 @@ func (dst *listOrEpochLoads) mergeFrom(src listOrEpochLoads) {

func (l listOrEpochLoads) isEmpty() bool { return len(l.List) == 0 && len(l.Epoch) == 0 }

func (l listOrEpochLoads) loadWithSession(s *consumerSession) {
func (l listOrEpochLoads) loadWithSession(s *consumerSession, why string) {
if !l.isEmpty() {
s.incWorker()
go s.listOrEpoch(l, false)
go s.listOrEpoch(l, false, why)
}
}

func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession) bool {
func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession, why string) bool {
if !l.isEmpty() {
s.incWorker()
go s.listOrEpoch(l, true)
go s.listOrEpoch(l, true, why)
return true
}
return false
Expand Down Expand Up @@ -1153,14 +1153,14 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
// This function is responsible for issuing ListOffsets or
// OffsetForLeaderEpoch. These requests's responses are only handled within
// the context of a consumer session.
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool) {
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
defer s.decWorker()

wait := true
if immediate {
s.c.cl.triggerUpdateMetadataNow()
s.c.cl.triggerUpdateMetadataNow(why)
} else {
wait = s.c.cl.triggerUpdateMetadata(false) // avoid trigger if within refresh interval
wait = s.c.cl.triggerUpdateMetadata(false, why) // avoid trigger if within refresh interval
}

s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during meta update into one
Expand Down Expand Up @@ -1217,7 +1217,7 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool)
// add things back to the session, we could abandon
// loading these offsets and have a stuck cursor.
defer s.decWorker()
defer reloads.loadWithSession(s)
defer reloads.loadWithSession(s, "reload offsets from load failure")
after := time.NewTimer(time.Second)
defer after.Stop()
select {
Expand Down
31 changes: 15 additions & 16 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type groupConsumer struct {
// happening.
syncCommitMu sync.RWMutex

rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex)
rejoinCh chan string // cap 1; sent to if subscription changes (regex)

// For EOS, before we commit, we force a heartbeat. If the client and
// group member are both configured properly, then the transactional
Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *consumer) initGroup() {
manageDone: make(chan struct{}),
cooperative: c.cl.cfg.cooperative(),
tps: newTopicsPartitions(),
rejoinCh: make(chan struct{}, 1),
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
using: make(map[string]int),
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (g *groupConsumer) manage() {
"backoff", backoff,
)
deadline := time.Now().Add(backoff)
g.cl.waitmeta(g.ctx, backoff)
g.cl.waitmeta(g.ctx, backoff, "waitmeta during join & sync error backoff")
after := time.NewTimer(time.Until(deadline))
select {
case <-g.ctx.Done():
Expand Down Expand Up @@ -522,7 +522,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
return
}

defer g.rejoin() // cooperative consumers rejoin after they revoking what they lost
defer g.rejoin("cooperative rejoin after revoking what we lost") // cooperative consumers rejoin after they revoking what they lost

// The block below deletes everything lost from our uncommitted map.
// All commits should be **completed** by the time this runs. An async
Expand Down Expand Up @@ -721,9 +721,10 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
heartbeat = true
case force = <-g.heartbeatForceCh:
heartbeat = true
case <-g.rejoinCh:
case why := <-g.rejoinCh:
// If a metadata update changes our subscription,
// we just pretend we are rebalancing.
g.cfg.logger.Log(LogLevelInfo, "forced rejoin quitting heartbeat loop", "why", why)
err = kerr.RebalanceInProgress
case err = <-fetchErrCh:
fetchErrCh = nil
Expand Down Expand Up @@ -811,7 +812,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
waited := make(chan struct{})
metadone = waited
go func() {
g.cl.waitmeta(g.ctx, g.cfg.sessionTimeout)
g.cl.waitmeta(g.ctx, g.cfg.sessionTimeout, "waitmeta after heartbeat error")
close(waited)
}()
}
Expand All @@ -837,16 +838,16 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// rebalance and will instead reply to the member with its current assignment.
func (cl *Client) ForceRebalance() {
if g := cl.consumer.g; g != nil {
g.rejoin()
g.rejoin("rejoin from ForceRebalance")
}
}

// rejoin is called after a cooperative member revokes what it lost at the
// beginning of a session, or if we are leader and detect new partitions to
// consume.
func (g *groupConsumer) rejoin() {
func (g *groupConsumer) rejoin(why string) {
select {
case g.rejoinCh <- struct{}{}:
case g.rejoinCh <- why:
default:
}
}
Expand Down Expand Up @@ -1186,11 +1187,7 @@ start:
}
}

if g.cfg.logger.Level() >= LogLevelDebug {
g.cfg.logger.Log(LogLevelDebug, "fetched committed offsets", "group", g.cfg.group, "fetched", offsets)
} else {
g.cfg.logger.Log(LogLevelInfo, "fetched committed offsets", "group", g.cfg.group)
}
g.cfg.logger.Log(LogLevelInfo, "fetched committed offsets", "group", g.cfg.group, "fetched", offsets)
return nil
}

Expand Down Expand Up @@ -1279,8 +1276,10 @@ func (g *groupConsumer) findNewAssignments() {
return
}

if numNewTopics > 0 || g.leader.get() {
g.rejoin()
if numNewTopics > 0 {
g.rejoin("rejoining because there are more topics to consume, our interests have changed")
} else if g.leader.get() {
g.rejoin("rejoining because we are the leader and noticed some topics have new partitions")
}
}

Expand Down
38 changes: 23 additions & 15 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (m *metawait) signal() {

// waitmeta returns immediately if metadata was updated within the last second,
// otherwise this waits for up to wait for a metadata update to complete.
func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) {
func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string) {
now := time.Now()

cl.metawait.mu.Lock()
Expand All @@ -47,7 +47,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) {
}
cl.metawait.mu.Unlock()

cl.triggerUpdateMetadataNow()
cl.triggerUpdateMetadataNow(why)

quit := false
done := make(chan struct{})
Expand Down Expand Up @@ -81,7 +81,7 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) {
cl.metawait.c.Broadcast()
}

func (cl *Client) triggerUpdateMetadata(must bool) bool {
func (cl *Client) triggerUpdateMetadata(must bool, why string) bool {
if !must {
cl.metawait.mu.Lock()
defer cl.metawait.mu.Unlock()
Expand All @@ -91,15 +91,15 @@ func (cl *Client) triggerUpdateMetadata(must bool) bool {
}

select {
case cl.updateMetadataCh <- struct{}{}:
case cl.updateMetadataCh <- why:
default:
}
return true
}

func (cl *Client) triggerUpdateMetadataNow() {
func (cl *Client) triggerUpdateMetadataNow(why string) {
select {
case cl.updateMetadataNowCh <- struct{}{}:
case cl.updateMetadataNowCh <- why:
default:
}
}
Expand All @@ -119,8 +119,11 @@ func (cl *Client) updateMetadataLoop() {
case <-cl.ctx.Done():
return
case <-ticker.C:
case <-cl.updateMetadataCh:
case <-cl.updateMetadataNowCh:
cl.cfg.logger.Log(LogLevelInfo, "updating metadata due to max age ticker")
case why := <-cl.updateMetadataCh:
cl.cfg.logger.Log(LogLevelInfo, "updating metadata", "why", why)
case why := <-cl.updateMetadataNowCh:
cl.cfg.logger.Log(LogLevelInfo, "immediately updating metadata", "why", why)
now = true
}

Expand All @@ -134,17 +137,18 @@ func (cl *Client) updateMetadataLoop() {
case <-cl.ctx.Done():
timer.Stop()
return
case <-cl.updateMetadataNowCh:
case why := <-cl.updateMetadataNowCh:
timer.Stop()
cl.cfg.logger.Log(LogLevelInfo, "immediately updating metadata, bypassing normal metadata wait", "why", why)
case <-timer.C:
}
}
} else {
// Even with an "update now", we sleep just a bit to allow some
// potential pile on now triggers.
time.Sleep(10 * time.Millisecond)
}

// Even with an "update now", we sleep just a bit to allow some
// potential pile on now triggers.
time.Sleep(time.Until(lastAt.Add(10 * time.Millisecond)))

// Drain any refires that occured during our waiting.
out:
for {
Expand All @@ -161,7 +165,11 @@ func (cl *Client) updateMetadataLoop() {
if now && nowTries < 3 {
goto start
}
cl.triggerUpdateMetadata(true)
if err != nil {
cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err))
} else {
cl.triggerUpdateMetadata(true, "re-updating metadata due inner topic or partition error")
}
}
if err == nil {
lastAt = time.Now()
Expand Down Expand Up @@ -265,7 +273,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
if consumerSessionStopped {
session := cl.consumer.startNewSession(tpsPrior)
defer session.decWorker()
reloadOffsets.loadWithSession(session)
reloadOffsets.loadWithSession(session, "resuming reload offsets after session stopped for cursor migrating in metadata")
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, *

p.topics.storeTopics([]string{topic})
cl.addUnknownTopicRecord(pr)
cl.triggerUpdateMetadataNow()
cl.triggerUpdateMetadataNow("forced load due to unknown produce topic")
return nil, nil
}
}
Expand All @@ -664,7 +664,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, *
return parts, v
}
cl.addUnknownTopicRecord(pr)
cl.triggerUpdateMetadata(false)
cl.triggerUpdateMetadata(false, "reload trigger due to produce topic still not known")

return nil, nil // our record is buffered waiting for metadata update; nothing to return
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *sink) maybeBackoff() {
}
defer s.clearBackoff()

s.cl.triggerUpdateMetadata(false) // as good a time as any
s.cl.triggerUpdateMetadata(false, "opportunistic load during sink backoff") // as good a time as any

tries := int(atomic.AddUint32(&s.consecutiveFailures, 1))
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
if updateMeta {
s.cl.cfg.logger.Log(LogLevelInfo, "produce request failed triggering metadata update", "broker", logID(s.nodeID), "err", err)
}
s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false)
s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false, "failed produce request triggering metadata update")

case err == ErrClientClosed:
s.cl.failBufferedRecords(ErrClientClosed)
Expand Down Expand Up @@ -623,10 +623,10 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response

if len(req.batches) > 0 {
s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID))
s.handleRetryBatches(req.batches, 0, true, false)
s.handleRetryBatches(req.batches, 0, true, false, "kafka did not reply to all topics in produce request")
}
if len(reqRetry) > 0 {
s.handleRetryBatches(reqRetry, 0, true, true)
s.handleRetryBatches(reqRetry, 0, true, true, "produce request had retry batches")
}
}

Expand Down Expand Up @@ -867,6 +867,7 @@ func (s *sink) handleRetryBatches(
backoffSeq uint32,
updateMeta bool, // if we should maybe update the metadata
canFail bool, // if records can fail if they are at limits
why string,
) {
var needsMetaUpdate bool
retry.eachOwnerLocked(func(batch seqRecBatch) {
Expand Down Expand Up @@ -896,7 +897,7 @@ func (s *sink) handleRetryBatches(
// If we do want to metadata update, we only do so if any batch was the
// first batch in its buf / not concurrently failed.
if needsMetaUpdate {
s.cl.triggerUpdateMetadata(true)
s.cl.triggerUpdateMetadata(true, why)
} else if !updateMeta {
s.maybeTriggerBackoff(backoffSeq)
s.maybeDrain()
Expand Down
Loading

0 comments on commit 696392e

Please sign in to comment.