From 63d3a609faacc226134c288984501808a3c95493 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 5 Jan 2021 00:31:59 -0700 Subject: [PATCH] broker: use new & better kmsg.ThrottleResponse Before, we would throttle even if Kafka already applied the throttle. --- pkg/kgo/broker.go | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 6154acf0..331563d5 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -7,7 +7,6 @@ import ( "io" "math" "net" - "reflect" "strconv" "sync" "sync/atomic" @@ -841,27 +840,15 @@ func (cxn *brokerCxn) handleResps() { // If we had no error, we read the response successfully. // - // Any response that can cause throttling has a - // "ThrottleMillis" field. We check for that here. - // - // This is a bit magical by its usage of reflect, but is is - // thankfully generic (and benchmarks to be ~200ns). - // - // If the field exists and is non-zero, we save that we are - // being throttled, which will cause the next write to wait - // before writing. + // Any response that can cause throttling satisfies the + // kmsg.ThrottleResponse interface. We check that here. if readErr == nil { - v := reflect.Indirect(reflect.ValueOf(pr.resp)) - if v.Kind() == reflect.Struct { // should be yes, but just to be sure - v = v.FieldByName("ThrottleMillis") - var zero reflect.Value - if v != zero { - v := v.Interface() - if millis, ok := v.(int32); ok && millis > 0 { - throttleUntil := time.Now().Add(time.Millisecond * time.Duration(millis)).UnixNano() - if throttleUntil > cxn.throttleUntil { - atomic.StoreInt64(&cxn.throttleUntil, throttleUntil) - } + if throttleResponse, ok := pr.resp.(kmsg.ThrottleResponse); ok { + millis, throttlesAfterResp := throttleResponse.Throttle() + if throttlesAfterResp && millis > 0 { + throttleUntil := time.Now().Add(time.Millisecond * time.Duration(millis)).UnixNano() + if throttleUntil > cxn.throttleUntil { + atomic.StoreInt64(&cxn.throttleUntil, throttleUntil) } } }