Skip to content

Commit

Permalink
kgo: support forward & backward batch requests for FindCoordinator, O…
Browse files Browse the repository at this point in the history
…ffsetFetch

Previously, the client was only forward compatible. Opting into batching
was at your own risk. We now default to batching and split to the old
single behavior as needed.

To support this, we have a new internal request version pinner. We
always pin to batched version, and if we get errBrokerTooOld, we split.

loadCoordinators is a good bit more complex now, and we basically delete
loadCoordinator.

This is a bit complicated to describe in a single commit message.
  • Loading branch information
twmb committed Nov 8, 2022
1 parent ba55f7d commit 9ac6c97
Show file tree
Hide file tree
Showing 2 changed files with 406 additions and 202 deletions.
37 changes: 35 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ import (
"github.com/twmb/franz-go/pkg/sasl"
)

type pinReq struct {
kmsg.Request
min int16
max int16
pinMin bool
pinMax bool
}

func (p *pinReq) SetVersion(v int16) {
if p.pinMin && v < p.min {
v = p.min
}
if p.pinMax && v > p.max {
v = p.max
}
p.Request.SetVersion(v)
}

type promisedReq struct {
ctx context.Context
req kmsg.Request
Expand Down Expand Up @@ -303,18 +321,33 @@ func (b *broker) handleReq(pr promisedReq) {
version = brokerMax
}

minVersion := int16(-1)

// If the version now (after potential broker downgrading) is
// lower than we desire, we fail the request for the broker is
// too old.
if b.cl.cfg.minVersions != nil {
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
if minVersionExists && version < minVersion {
minVersion, _ = b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
if minVersion > -1 && version < minVersion {
pr.promise(nil, errBrokerTooOld)
return
}
}

req.SetVersion(version) // always go for highest version
setVersion := req.GetVersion()
if minVersion > -1 && setVersion < minVersion {
pr.promise(nil, fmt.Errorf("request key %d version returned %d below the user defined min of %d", req.Key(), setVersion, minVersion))
return
}
if version < setVersion {
// If we want to set an old version, but the request is pinned
// high, we need to fail with errBrokerTooOld. The broker wants
// an old version, we want a high version. We rely on this
// error in backcompat request sharding.
pr.promise(nil, errBrokerTooOld)
return
}

for reauthentications := 1; !cxn.expiry.IsZero() && time.Now().After(cxn.expiry); reauthentications++ {
// We allow 15 reauths, which is a lot. If a new lifetime is
Expand Down
Loading

0 comments on commit 9ac6c97

Please sign in to comment.