Skip to content

Commit

Permalink
broker: add two new connection types, cxnGroup and cxnSlow
Browse files Browse the repository at this point in the history
As a group consumer, we expect 1 active join group request, and then 1
active sync group request. Both of these requests can hang. If a client
wants to update metadata while either of these requests are hanging,
and the metadata happens to choose the same connection, then the
metadata request will be blocked. We do not want this.

If an admin issues a request that contains a TimeoutMillis field, it is
possible that the request will take a while to execute. We do not want
to be blocked if this is so.

To fix both of these scenarios, we add two new connection types:
cxnGroup, which specifically handles Join and Sync, and cxnSlow, which
handles anything with timeout millis.

For #92.
  • Loading branch information
twmb committed Oct 16, 2021
1 parent 2d6c1a8 commit fbf9239
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,14 @@ type broker struct {
// write goes to, but the write is expected to be fast whereas the wait
// for the response is expected to be slow.
//
// Produce requests go to cxnProduce, fetch to cxnFetch, and all others
// to cxnNormal.
// Produce requests go to cxnProduce, fetch to cxnFetch, join/sync go
// to cxnGroup, anything with TimeoutMillis goes to cxnSlow, and
// everything else goes to cxnNormal.
cxnNormal *brokerCxn
cxnProduce *brokerCxn
cxnFetch *brokerCxn
cxnGroup *brokerCxn
cxnSlow *brokerCxn

reapMu sync.Mutex // held when modifying a brokerCxn

Expand Down Expand Up @@ -199,6 +202,8 @@ func (b *broker) stopForever() {
b.cxnNormal.die()
b.cxnProduce.die()
b.cxnFetch.die()
b.cxnGroup.die()
b.cxnSlow.die()
}

// do issues a request to the broker, eventually calling the response
Expand Down Expand Up @@ -255,7 +260,7 @@ func (b *broker) handleReq(pr promisedReq) {
var cxn *brokerCxn
{
var err error
if cxn, err = b.loadConnection(pr.ctx, req.Key()); err != nil {
if cxn, err = b.loadConnection(pr.ctx, req); err != nil {
pr.promise(nil, err)
return
}
Expand Down Expand Up @@ -419,14 +424,23 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) }

// loadConection returns the broker's connection, creating it if necessary
// and returning an error of if that fails.
func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, error) {
pcxn := &b.cxnNormal
var isProduceCxn bool // see docs on brokerCxn.discard for why we do this
if reqKey == 0 {
func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerCxn, error) {
var (
pcxn = &b.cxnNormal
isProduceCxn bool // see docs on brokerCxn.discard for why we do this
reqKey = req.Key()
_, isTimeout = req.(kmsg.TimeoutRequest)
)
switch {
case reqKey == 0:
pcxn = &b.cxnProduce
isProduceCxn = true
} else if reqKey == 1 {
case reqKey == 1:
pcxn = &b.cxnFetch
case reqKey == 11 || reqKey == 14: // join || sync
pcxn = &b.cxnGroup
case isTimeout:
pcxn = &b.cxnSlow
}

if *pcxn != nil && atomic.LoadInt32(&(*pcxn).dead) == 0 {
Expand Down Expand Up @@ -506,6 +520,8 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {
b.cxnNormal,
b.cxnProduce,
b.cxnFetch,
b.cxnGroup,
b.cxnSlow,
} {
if cxn == nil || atomic.LoadInt32(&cxn.dead) == 1 {
continue
Expand Down

0 comments on commit fbf9239

Please sign in to comment.