Skip to content

Commit

Permalink
broker: use new & better kmsg.ThrottleResponse
Browse files Browse the repository at this point in the history
Before, we would throttle even if Kafka already applied the throttle.
  • Loading branch information
twmb committed Jan 5, 2021
1 parent 2c777b4 commit 63d3a60
Showing 1 changed file with 8 additions and 21 deletions.
29 changes: 8 additions & 21 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"math"
"net"
"reflect"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -841,27 +840,15 @@ func (cxn *brokerCxn) handleResps() {

// If we had no error, we read the response successfully.
//
// Any response that can cause throttling has a
// "ThrottleMillis" field. We check for that here.
//
// This is a bit magical by its usage of reflect, but is is
// thankfully generic (and benchmarks to be ~200ns).
//
// If the field exists and is non-zero, we save that we are
// being throttled, which will cause the next write to wait
// before writing.
// Any response that can cause throttling satisfies the
// kmsg.ThrottleResponse interface. We check that here.
if readErr == nil {
v := reflect.Indirect(reflect.ValueOf(pr.resp))
if v.Kind() == reflect.Struct { // should be yes, but just to be sure
v = v.FieldByName("ThrottleMillis")
var zero reflect.Value
if v != zero {
v := v.Interface()
if millis, ok := v.(int32); ok && millis > 0 {
throttleUntil := time.Now().Add(time.Millisecond * time.Duration(millis)).UnixNano()
if throttleUntil > cxn.throttleUntil {
atomic.StoreInt64(&cxn.throttleUntil, throttleUntil)
}
if throttleResponse, ok := pr.resp.(kmsg.ThrottleResponse); ok {
millis, throttlesAfterResp := throttleResponse.Throttle()
if throttlesAfterResp && millis > 0 {
throttleUntil := time.Now().Add(time.Millisecond * time.Duration(millis)).UnixNano()
if throttleUntil > cxn.throttleUntil {
atomic.StoreInt64(&cxn.throttleUntil, throttleUntil)
}
}
}
Expand Down

0 comments on commit 63d3a60

Please sign in to comment.