diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 2ca41546..2aa7b1eb 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -121,11 +121,14 @@ type broker struct { // write goes to, but the write is expected to be fast whereas the wait // for the response is expected to be slow. // - // Produce requests go to cxnProduce, fetch to cxnFetch, and all others - // to cxnNormal. + // Produce requests go to cxnProduce, fetch to cxnFetch, join/sync go + // to cxnGroup, anything with TimeoutMillis goes to cxnSlow, and + // everything else goes to cxnNormal. cxnNormal *brokerCxn cxnProduce *brokerCxn cxnFetch *brokerCxn + cxnGroup *brokerCxn + cxnSlow *brokerCxn reapMu sync.Mutex // held when modifying a brokerCxn @@ -199,6 +202,8 @@ func (b *broker) stopForever() { b.cxnNormal.die() b.cxnProduce.die() b.cxnFetch.die() + b.cxnGroup.die() + b.cxnSlow.die() } // do issues a request to the broker, eventually calling the response @@ -255,7 +260,7 @@ func (b *broker) handleReq(pr promisedReq) { var cxn *brokerCxn { var err error - if cxn, err = b.loadConnection(pr.ctx, req.Key()); err != nil { + if cxn, err = b.loadConnection(pr.ctx, req); err != nil { pr.promise(nil, err) return } @@ -419,14 +424,23 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) } // loadConection returns the broker's connection, creating it if necessary // and returning an error of if that fails. -func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, error) { - pcxn := &b.cxnNormal - var isProduceCxn bool // see docs on brokerCxn.discard for why we do this - if reqKey == 0 { +func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerCxn, error) { + var ( + pcxn = &b.cxnNormal + isProduceCxn bool // see docs on brokerCxn.discard for why we do this + reqKey = req.Key() + _, isTimeout = req.(kmsg.TimeoutRequest) + ) + switch { + case reqKey == 0: pcxn = &b.cxnProduce isProduceCxn = true - } else if reqKey == 1 { + case reqKey == 1: pcxn = &b.cxnFetch + case reqKey == 11 || reqKey == 14: // join || sync + pcxn = &b.cxnGroup + case isTimeout: + pcxn = &b.cxnSlow } if *pcxn != nil && atomic.LoadInt32(&(*pcxn).dead) == 0 { @@ -506,6 +520,8 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) { b.cxnNormal, b.cxnProduce, b.cxnFetch, + b.cxnGroup, + b.cxnSlow, } { if cxn == nil || atomic.LoadInt32(&cxn.dead) == 1 { continue