Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autothrottle Kafka native API support #399

Merged
merged 35 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9e834a9
[autothrottle] adds KafkaNativeMode config
jamiealquiza Apr 15, 2022
d1f3998
[autothrottle] kafka-native-mode flag
jamiealquiza Apr 15, 2022
b0b1ea6
[autothrottle] use ListReassignments
jamiealquiza Apr 15, 2022
2170081
[autothrottle] ListReassignments, revised loop ticker
jamiealquiza Apr 15, 2022
1a0481f
[autothrottle] ThrottleManager
jamiealquiza Apr 18, 2022
365f85d
[autothrottle/internal] adds api, throttlestore packages
jamiealquiza Apr 18, 2022
d591309
[autothrottle] move functions to methods
jamiealquiza Apr 18, 2022
363f879
[autothrottle] var name consistent with type
jamiealquiza Apr 18, 2022
ef0d176
[autothrottle] ThrottleManager.kafkaNativeMode
jamiealquiza Apr 18, 2022
f66cd8c
[autothrottle] InitKafkaAdmin
jamiealquiza Apr 19, 2022
2c6a991
[autothrottle] applyBrokerThrottles to ThrottleManager method
jamiealquiza Apr 19, 2022
86025f0
[autothrottle] applyTopicThrottles to ThrottleManager method
jamiealquiza Apr 19, 2022
a99b58f
[autothrottle] applyTopicThrottles conditionally uses KafkaAdmin, add…
jamiealquiza Apr 19, 2022
9e47aa6
[autothrottle] separating legacy throttle code
jamiealquiza Apr 19, 2022
ca0edba
[autothrottle] applyBrokerThrottles implements sequential KafkaAdmin …
jamiealquiza Apr 19, 2022
6645de8
[autothrottle] removeTopicThrottles KafkaAdmin
jamiealquiza Apr 20, 2022
b8edceb
[autothrottle] removeTopicThrottles KafkaAdmin topic lookup
jamiealquiza Apr 20, 2022
61cd360
[autothrottle] legacyRemoveBrokerThrottlesByID
jamiealquiza Apr 20, 2022
f343354
[autothrottle] only unset previous throttle upon success
jamiealquiza Apr 20, 2022
820d195
[autothrottle] removeBrokerThrottlesByID KafkaAdmin
jamiealquiza Apr 20, 2022
07a3be2
[autothrottle] getTopicsWithThrottledBrokers, getAllTopicStates to me…
jamiealquiza Apr 20, 2022
6cbdb61
[autothrottle] legacyGetTopicsWithThrottledBrokers
jamiealquiza Apr 21, 2022
6ccee4e
[autothrottle] comments unused code
jamiealquiza Apr 21, 2022
a49173f
[autothrottle] getTopicsWithThrottledBrokers KafkaAdmin
jamiealquiza Apr 21, 2022
a667b89
[kafkaadmin] adds stub package (for external testing)
jamiealquiza Apr 22, 2022
29c5206
[kafkaadmin] export TopicStatesFromMetadata
jamiealquiza Apr 22, 2022
4e8f2a8
[kafkaadmin] stub implements DescribeTopics
jamiealquiza Apr 22, 2022
015351a
[autothrottle] TestGetTopicsWithThrottledBrokers
jamiealquiza Apr 22, 2022
0105db0
[autothrottle] only init KafkaAdmin if needed
jamiealquiza Apr 25, 2022
685a1d3
[autothrottle] uninitialized map
jamiealquiza Apr 25, 2022
a3e2c65
[kafkaadmin] SetThrottle ignores 0 values
jamiealquiza Apr 25, 2022
2194650
[kafkaadmin] RemoveThrottle sequential updates
jamiealquiza Apr 26, 2022
cfa5656
[autothrottle] log broker throttle removal
jamiealquiza Apr 26, 2022
9b247fc
README
jamiealquiza Apr 26, 2022
3cc1bd8
[autothrottle] error handling scope
jamiealquiza May 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 50 additions & 42 deletions cmd/autothrottle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ Autothrottle does this by running a loop that discovers topics undergoing replic

In contrast to where Kafka's out of the box tooling allows users to manually set a static, global inbound and outbound rate, autothrottle determines path-optimal rates on an individual broker basis. It does this by examining a graph representation of replication flows and individually calculating optimal transfer rates for inbound and outbound transfers. This means that if a single broker is near saturation, it doesn't need to slow down the recovery pace of the entire replication group. This allows every broker to run at target network utilization thresholds in any direction, minimizing recovery time and hotspots.

All throttle control logic is applied directly to the cluster through ZooKeeper; autothrottle natively mirrors Kafka's throttle control implementation rather than wrapping CLI tools.
Historically, all throttle control logic has been applied directly to the cluster through ZooKeeper; autothrottle natively ports Kafka's throttle control implementation and manages it directly through the cluster state metadata housed in ZooKeeper. For newer versions of Kafka, autothrottle now supports throttle management via dynamic configurations using the Kafka Admin API, along with KIP-455 compatible reassignment lookups. This feature is enabled with the `--kafka-native-mode` flag and marks the continued support for eventual removal of ZooKeeper as a Kafka dependency (KIP-500).

Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or even outright disabled. This allows users to quickly revert to using other tools if desired.
Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or outright disabled. This allows users to quickly revert to using other tools if desired.

**Additional features**:
- Configurable portion of free headroom available for use by replication (`--max-rate`)
Expand Down Expand Up @@ -82,46 +82,54 @@ The variables in brackets are optional env var overrides.

```
Usage of autothrottle:
-api-key string
Datadog API key [AUTOTHROTTLE_API_KEY]
-api-listen string
Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080")
-app-key string
Datadog app key [AUTOTHROTTLE_APP_KEY]
-broker-id-tag string
Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id")
-cap-map string
JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP]
-change-threshold float
Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10)
-cleanup-after int
Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60)
-dd-event-tags string
Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS]
-failure-threshold int
Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1)
-interval int
Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180)
-max-rx-rate float
Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90)
-max-tx-rate float
Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90)
-metrics-window int
Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120)
-min-rate float
Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10)
-net-rx-query string
Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}")
-net-tx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}")
-version
version [AUTOTHROTTLE_VERSION]
-zk-addr string
ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181")
-zk-config-prefix string
ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle")
-zk-prefix string
ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX]
-api-key string
Datadog API key [AUTOTHROTTLE_API_KEY]
-api-listen string
Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080")
-app-key string
Datadog app key [AUTOTHROTTLE_APP_KEY]
-bootstrap-servers string
Kafka bootstrap servers [AUTOTHROTTLE_BOOTSTRAP_SERVERS] (default "localhost:9092")
-broker-id-tag string
Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id")
-cap-map string
JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP]
-change-threshold float
Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10)
-cleanup-after int
Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60)
-dd-event-tags string
Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS]
-failure-threshold int
Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1)
-instance-type-tag string
Datadog tag for instance type [AUTOTHROTTLE_INSTANCE_TYPE_TAG] (default "instance-type")
-interval int
Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180)
-kafka-api-request-timeout int
Kafka API request timeout (seconds) [AUTOTHROTTLE_KAFKA_API_REQUEST_TIMEOUT] (default 15)
-kafka-native-mode
Favor native Kafka RPCs over ZooKeeper metadata access [AUTOTHROTTLE_KAFKA_NATIVE_MODE]
-max-rx-rate float
Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90)
-max-tx-rate float
Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90)
-metrics-window int
Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120)
-min-rate float
Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10)
-net-rx-query string
Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}")
-net-tx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}")
-version
version [AUTOTHROTTLE_VERSION]
-zk-addr string
ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181")
-zk-config-prefix string
ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle")
-zk-prefix string
ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX]
```

## Detailed: Rate Calculations, Applying Throttles
Expand Down
120 changes: 0 additions & 120 deletions cmd/autothrottle/api_deprecated.go

This file was deleted.

2 changes: 1 addition & 1 deletion cmd/autothrottle/capacities.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r replicationCapacityByBroker) reset() {
// in the reassignment. For each broker, it determines whether the broker is
// a leader (source) or a follower (destination), and calculates a throttle
// accordingly, returning a replicationCapacityByBroker and error.
func brokerReplicationCapacities(rtc *ReplicationThrottleConfigs, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) {
func brokerReplicationCapacities(rtc *ThrottleManager, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) {
capacities := replicationCapacityByBroker{}

// For each broker, check whether the it's a source and/or destination,
Expand Down
2 changes: 1 addition & 1 deletion cmd/autothrottle/capacities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestBrokerReplicationCapacities(t *testing.T) {
CapacityMap: map[string]float64{"stub": 200.00},
})

rtc := &ReplicationThrottleConfigs{
rtc := &ThrottleManager{
reassignments: reassignments,
previouslySetThrottles: replicationCapacityByBroker{1000: throttleByRole{float64ptr(20)}},
limits: lim,
Expand Down
Loading