diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 838d1696..899f093b 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -109,6 +109,12 @@ type broker struct { addr string // net.JoinHostPort(meta.Host, meta.Port) meta BrokerMetadata + // versions tracks the first load of an ApiVersions. We store this + // after the first connect, which helps speed things up on future + // reconnects (across any of the three broker connections) because we + // will never look up API versions for this broker again. + versions atomic.Value // *brokerVersions + // The cxn fields each manage a single tcp connection to one broker. // Each field is managed serially in handleReqs. This means that only // one write can happen at a time, regardless of which connection the @@ -132,6 +138,33 @@ type broker struct { dead int32 } +// brokerVersions is loaded once (and potentially a few times concurrently if +// multiple connections are opening at once) and then forever stored for a +// broker. +type brokerVersions struct { + versions [kmsg.MaxKey + 1]int16 +} + +func newBrokerVersions() *brokerVersions { + var v brokerVersions + for i := range &v.versions { + v.versions[i] = -1 + } + return &v +} + +func (v *brokerVersions) len() int { return kmsg.MaxKey + 1 } + +func (b *broker) loadVersions() *brokerVersions { + loaded := b.versions.Load() + if loaded == nil { + return nil + } + return loaded.(*brokerVersions) +} + +func (b *broker) storeVersions(v *brokerVersions) { b.versions.Store(v) } + const unknownControllerID = -1 var unknownBrokerMetadata = BrokerMetadata{ @@ -159,6 +192,7 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string) reqs: make(chan promisedReq, 10), } + go br.handleReqs() return br @@ -248,16 +282,17 @@ func (b *broker) handleReqs() { } } - if int(req.Key()) > len(cxn.versions[:]) || - b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) { + v := b.loadVersions() + + if int(req.Key()) > v.len() || b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) { pr.promise(nil, errUnknownRequestKey) continue } - // If cxn.versions[0] is non-negative, then we loaded API + // If v.versions[0] is non-negative, then we loaded API // versions. If the version for this request is negative, we // know the broker cannot handle this request. - if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 { + if v.versions[0] >= 0 && v.versions[req.Key()] < 0 { pr.promise(nil, errBrokerTooOld) continue } @@ -274,7 +309,7 @@ func (b *broker) handleReqs() { // versions because the client is pinned pre 0.10.0 and we // stick with our max. version := ourMax - if brokerMax := cxn.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax { + if brokerMax := v.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax { version = brokerMax } @@ -546,8 +581,7 @@ type brokerCxn struct { cl *Client b *broker - addr string - versions [kmsg.MaxKey + 1]int16 + addr string mechanism sasl.Mechanism expiry time.Time @@ -575,14 +609,17 @@ type brokerCxn struct { } func (cxn *brokerCxn) init(isProduceCxn bool) error { - for i := 0; i < len(cxn.versions[:]); i++ { - cxn.versions[i] = -1 - } - - if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { - if err := cxn.requestAPIVersions(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) - return err + hasVersions := cxn.b.loadVersions() != nil + if !hasVersions { + if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { + if err := cxn.requestAPIVersions(); err != nil { + cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) + return err + } + } else { + // We have a max versions, and it indicates no support + // for ApiVersions. We just store a default -1 set. + cxn.b.storeVersions(newBrokerVersions()) } } @@ -667,12 +704,14 @@ start: return errors.New("ApiVersions response invalidly contained no ApiKeys") } + v := newBrokerVersions() for _, key := range resp.ApiKeys { - if key.ApiKey > kmsg.MaxKey { + if key.ApiKey > kmsg.MaxKey || key.ApiKey < 0 { continue } - cxn.versions[key.ApiKey] = key.MaxVersion + v.versions[key.ApiKey] = key.MaxVersion } + cxn.b.storeVersions(v) return nil } @@ -684,11 +723,12 @@ func (cxn *brokerCxn) sasl() error { retried := false authenticate := false + v := cxn.b.loadVersions() req := new(kmsg.SASLHandshakeRequest) start: - if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { + if mechanism.Name() != "GSSAPI" && v.versions[req.Key()] >= 0 { req.Mechanism = mechanism.Name() - req.Version = cxn.versions[req.Key()] + req.Version = v.versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID)) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { @@ -776,7 +816,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { req := &kmsg.SASLAuthenticateRequest{ SASLAuthBytes: clientWrite, } - req.Version = cxn.versions[req.Key()] + req.Version = cxn.b.loadVersions().versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)