Skip to content

Commit

Permalink
client: force a coordinator refresh if the coordinator is unknown
Browse files Browse the repository at this point in the history
Previously, once we loaded a coordinator successfully, we would cache
it. We would delete the cache entry if a response indicated
NOT_COORDINATOR. This was fine.

One problem with this scheme is that once we know the coordinator, the
client still needs to map that number to a broker internally. If a
metadata refresh cleared the broker from the client, then our cached
coordinator would return errUnknownCoordinator, and then a request would
fail in an unretriable way that would also not clear our cache.

Now, if our cached coordinator cannot map to a broker, we clear the
cache and restart from the top. This ensures a fresh coordinator lookup,
which always proceeds to the number => broker mapping. If we fail
*again*, then this is a fresh failure and we continue with the old
behavior of returning errUnknownCoordinator, which fails the request.
However, we now give the request one chance to retry and load a new
coordinator, which is more resilient to leadership changes.
  • Loading branch information
twmb committed Aug 11, 2021
1 parent f29fb7f commit aecaf27
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,22 @@ func (cl *Client) findCoordinator(ctx context.Context, req *kmsg.FindCoordinator
return r.last, resp, err
}

func (cl *Client) deleteStaleCoordinatorIfEqual(key coordinatorKey, current *coordinatorLoad) {
cl.coordinatorsMu.Lock()
defer cl.coordinatorsMu.Unlock()
if existing, ok := cl.coordinators[key]; ok && current == existing {
delete(cl.coordinators, key)
}
}

// loadController returns the group/txn coordinator for the given key, retrying
// as necessary. Any non-retriable error does not cache the coordinator.
func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) {
var restarted bool
start:
cl.coordinatorsMu.Lock()
c, ok := cl.coordinators[key]

if !ok {
c = &coordinatorLoad{
done: make(chan struct{}), // all requests for the same coordinator get collapsed into one
Expand All @@ -851,11 +862,7 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro
// but only if something else has not already replaced
// our pointer.
if c.err != nil {
cl.coordinatorsMu.Lock()
if existing, ok := cl.coordinators[key]; ok && c == existing {
delete(cl.coordinators, key)
}
cl.coordinatorsMu.Unlock()
cl.deleteStaleCoordinatorIfEqual(key, c)
}
close(c.done)
}()
Expand All @@ -868,7 +875,19 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro
if c.err != nil {
return nil, c.err
}
return cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key})

// If brokerOrErr returns an error, then our cached coordinator
// is using metadata that has updated and removed knowledge of
// that coordinator. We delete the stale coordinator here and
// retry once. The retry will force a coordinator reload, and
// everything will be fresh. Any errors after that we keep.
b, err := cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key})
if err != nil && !restarted {
restarted = true
cl.deleteStaleCoordinatorIfEqual(key, c)
goto start
}
return b, err
}

var resp *kmsg.FindCoordinatorResponse
Expand Down

0 comments on commit aecaf27

Please sign in to comment.