From e2e80bf73359e8b9e901a3d8481cb549d896fc06 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 6 Dec 2022 10:42:42 -0500 Subject: [PATCH] kgo: clear controller/coordinator caches on failed dials If we are repeatedly unable to dial the controller or coordinator, it is possible that the broker is just gone. We need to clear our cache internally so that we refresh the cache and pick up a new controller / coordinator. We retry 3 times just to handle temporary dial errors. Closes #239. --- pkg/kgo/client.go | 66 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 2d70a157..3dd2c3bc 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -810,11 +810,27 @@ type retriable struct { // that can fail / do not need to retry forever. limitRetries int - // parseRetryErr, if non-nil, can parse a retriable error out of the - // response and return it. This error is *not* returned from the - // request if the req cannot be retried due to timeout or retry limits, - // but it *can* allow a retry if neither limit is hit yet. - parseRetryErr func(kmsg.Response) error + // parseRetryErr, if non-nil, can delete stale cached brokers. We do + // *not* return the error from this function to the caller, but we do + // use it to potentially retry. It is not necessary, but also not + // harmful, to return the input error. + parseRetryErr func(kmsg.Response, error) error +} + +type failDial struct{ fails int8 } + +// The controller and group/txn coordinators are cached. If dialing the broker +// repeatedly fails, we need to forget our cache to force a re-load: the broker +// may have completely died. +func (d *failDial) isRepeatedDialFail(err error) bool { + if isDialErr(err) { + d.fails++ + if d.fails == 3 { + d.fails = 0 + return true + } + } + return false } func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { @@ -831,8 +847,8 @@ start: var retryErr error if err == nil { resp, err = r.last.waitResp(ctx, req) - if err == nil && r.parseRetryErr != nil { - retryErr = r.parseRetryErr(resp) + if r.parseRetryErr != nil { + retryErr = r.parseRetryErr(resp, err) } } @@ -1098,7 +1114,6 @@ func (cl *Client) controller(ctx context.Context) (*broker, error) { func (cl *Client) forgetControllerID(id int32) { cl.controllerIDMu.Lock() defer cl.controllerIDMu.Unlock() - if cl.controllerID == id { cl.controllerID = unknownControllerID } @@ -1288,18 +1303,21 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) case errors.Is(err, kerr.CoordinatorNotAvailable), errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator): - - cl.coordinatorsMu.Lock() - delete(cl.coordinators, coordinatorKey{ - name: name, - typ: typ, - }) - cl.coordinatorsMu.Unlock() + cl.deleteStaleCoordinator(name, typ) return true } return false } +func (cl *Client) deleteStaleCoordinator(name string, typ int8) { + cl.coordinatorsMu.Lock() + defer cl.coordinatorsMu.Unlock() + delete(cl.coordinators, coordinatorKey{ + name: name, + typ: typ, + }) +} + type brokerOrErr struct { b *broker err error @@ -1325,7 +1343,14 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response cl.maybeDeleteMappedMetadata(topics...) } - r.parseRetryErr = func(resp kmsg.Response) error { + var d failDial + r.parseRetryErr = func(resp kmsg.Response, err error) error { + if err != nil { + if d.isRepeatedDialFail(err) { + cl.forgetControllerID(r.last.meta.NodeID) + } + return err + } var code int16 switch t := resp.(type) { case *kmsg.CreateTopicsResponse: @@ -1455,7 +1480,14 @@ func (cl *Client) handleReqWithCoordinator( req kmsg.Request, ) (*broker, kmsg.Response, error) { r := cl.retriableBrokerFn(coordinator) - r.parseRetryErr = func(resp kmsg.Response) error { + var d failDial + r.parseRetryErr = func(resp kmsg.Response, err error) error { + if err != nil { + if d.isRepeatedDialFail(err) { + cl.deleteStaleCoordinator(name, typ) + } + return err + } var code int16 switch t := resp.(type) { // TXN