Skip to content

Commit

Permalink
Add BrokerThrottleHook
Browse files Browse the repository at this point in the history
  • Loading branch information
akesle authored and twmb committed Jan 5, 2021
1 parent 63d3a60 commit 303186a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,11 @@ func (cxn *brokerCxn) handleResps() {
if throttleUntil > cxn.throttleUntil {
atomic.StoreInt64(&cxn.throttleUntil, throttleUntil)
}
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(BrokerThrottleHook); ok {
h.OnThrottle(cxn.b.meta, time.Duration(millis)*time.Millisecond, throttlesAfterResp)
}
})
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,19 @@ type BrokerReadHook interface {
// The bytes read does not count any tls overhead.
OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
}

// BrokerThrottleHook is called after a response to a request is read
// from a broker, and the response identifies throttling in effect.
type BrokerThrottleHook interface {
// OnThrottle is passed the broker metadata, the imposed throttling
// interval, and whether the throttle was applied before Kafka
// responded to them request or after.
//
// For Kafka < 2.0.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0.0, the throttle is applied after issuing a response.
//
// If throttledAfterResponse is false, then Kafka already applied the
// throttle. If it is true, the client internally will not send another
// request until the throttle deadline has passed.
OnThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool)
}

0 comments on commit 303186a

Please sign in to comment.