Skip to content

Commit

Permalink
Merge pull request #293 from DataDog/jamie/autothrottle
Browse files Browse the repository at this point in the history
[autothrottle] path optimal throttle calculations
  • Loading branch information
jamiealquiza authored Mar 4, 2020
2 parents 30e49b7 + 3107638 commit 896a115
Show file tree
Hide file tree
Showing 16 changed files with 1,034 additions and 645 deletions.
4 changes: 2 additions & 2 deletions Dockerfile.registry
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ RUN apt-get install -y unzip curl git pkg-config software-properties-common apt-
WORKDIR /root

# Install Go.
RUN curl -sOL https://dl.google.com/go/go1.13.4.linux-amd64.tar.gz
RUN tar -C /usr/local -xzf go1.13.4.linux-amd64.tar.gz
RUN curl -sOL https://dl.google.com/go/go1.13.8.linux-amd64.tar.gz
RUN tar -C /usr/local -xzf go1.13.8.linux-amd64.tar.gz
ENV PATH=$PATH:/usr/local/go/bin:/go/bin
ENV GOPATH=/go

Expand Down
92 changes: 49 additions & 43 deletions cmd/autothrottle/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Overview
Autothrottle is a service that looks for reassigning partition events (as part of a recovery or routine data movement) and dynamically applies broker replication throttles. This is done to run replications as fast as possible without starving out bandwidth from Kafka consumer clients.

It does this by running in a loop that looks up all topics undergoing replication, maps source vs destination broker involved, fetches metrics for each broker from the Datadog API, and calculates a throttle based on a map of known bandwidth limits specific to each instance type being observed. An updated throttle rate is determined at each loop interval and continuously applies the throttle to adapt to changes in workloads. Rather than wrapping CLI tools, the throttle is applied directly in ZooKeeper by autothrottle, mirroring the internals of Kafka's tooling. When no replication is occurring, autothrottle will clear all throttles across the cluster.
It does this by running in a loop that looks up all topics undergoing replication, maps source vs destination broker involved, fetches metrics for each broker from the Datadog API, and calculates a throttle based on a map of known bandwidth limits specific to each instance type being observed. An updated throttle rate is determined at each loop interval and continuously applies the throttle to adapt to changes in workloads. Throttle rates are determined on a per replication path basis, as opposed to Kafka's out of the box tooling that sets a global inbound/outbound rate. Additionally, rather than wrapping the default Kafka CLI tools, the throttle is applied directly in ZooKeeper by autothrottle, mirroring the internals of Kafka's provided throttle rate mechanism. When no replication is occurring, autothrottle will clear all throttles across the cluster.

Additionally, autothrottle writes Datadog events at each check interval that detail what topics are undergoing replication, a list of all brokers involved, and throttle rates applied.

Expand All @@ -27,37 +27,44 @@ Tested with Go 1.10+ (required), Kafka 0.10.x, ZooKeeper 3.4.x.
Autothrottle prerequisites include:

- Datadog API and app key
- A metric string that returns the `system.net.bytes_sent` metric per host, scoped to the cluster that's being managed
- That each Kafka host is tagged with `instance-type` (included via the AWS integration) and a broker ID tag (configurable via `-broker-id-tag`, defaults to `broker_id`)
- A metric string that returns the `system.net.bytes_sent` and `system.net.bytes_recvd` metric per host, scoped to the cluster that's being managed
- That each Kafka host is tagged with `instance-type` (the Datadog AWS integration default) and a broker ID tag (configurable via `-broker-id-tag`, defaults to `broker_id`)
- A map of instance types and available bandwidth (in MB/s), supplied as a json string via the `--cap-map` parameter (e.g. `--cap-map '{"d2.2xlarge":120,"d2.4xlarge":240}'`)

Once running, autothrottle should clearly log what it's doing:

```
2018/03/16 18:27:11 Autothrottle Running
2018/03/16 18:27:12 Admin API: localhost:8080
2018/03/16 18:27:13 No topics undergoing reassignment
<...>
2018/03/16 18:31:21 Topics with ongoing reassignments: [test_topic]
2018/03/16 18:31:21 Source brokers participating in replication: [1002 1003 1004 1005 1006 1007]
2018/03/16 18:31:21 Destination brokers participating in replication: [1002 1003 1004 1005 1006 1007]
2018/03/16 18:31:21 Most utilized source broker: [1005] net tx of 116.43MB/s (over 60s) with an existing throttle rate of 0.00MB/s
2018/03/16 18:31:21 Replication capacity (based on a 90% max free capacity utilization): 120.21MB/s
2018/03/16 18:31:21 Updated throttle to 120.21MB/s on broker 1005
2018/03/16 18:31:22 Updated throttle to 120.21MB/s on broker 1004
2018/03/16 18:31:22 Updated throttle to 120.21MB/s on broker 1003
2018/03/16 18:31:23 Updated throttle to 120.21MB/s on broker 1006
2018/03/16 18:31:23 Updated throttle to 120.21MB/s on broker 1002
2018/03/16 18:31:24 Updated throttle to 120.21MB/s on broker 1007
2018/03/16 18:32:24 Topics done reassigning: [test_topic]
2018/03/16 18:32:24 No topics undergoing reassignment
2018/03/16 18:32:29 Throttle removed on broker 1003
2018/03/16 18:32:30 Throttle removed on broker 1004
2018/03/16 18:32:30 Throttle removed on broker 1002
2018/03/16 18:32:31 Throttle removed on broker 1007
2018/03/16 18:32:31 Throttle removed on broker 1005
2018/03/16 18:32:32 Throttle removed on broker 1006
2018/03/16 18:33:32 No topics undergoing reassignment
2020/02/27 22:28:12 Autothrottle Running
2020/02/27 22:28:13 Admin API: localhost:8080
2020/02/27 22:28:13 Topics with ongoing reassignments: [test0]
2020/02/27 22:28:13 Source brokers participating in replication: [1037 1039]
2020/02/27 22:28:13 Destination brokers participating in replication: [1033 1041]
2020/02/27 22:28:14 Replication throttle rate for broker 1037 [leader] (based on a 90% max free capacity utilization): 139.83MB/s
2020/02/27 22:28:14 Updated throttle on broker 1037 [leader]
2020/02/27 22:28:15 Replication throttle rate for broker 1039 [leader] (based on a 90% max free capacity utilization): 147.24MB/s
2020/02/27 22:28:15 Updated throttle on broker 1039 [leader]
2020/02/27 22:28:15 Replication throttle rate for broker 1041 [follower] (based on a 90% max free capacity utilization): 179.75MB/s
2020/02/27 22:28:15 Updated throttle on broker 1041 [follower]
2020/02/27 22:28:15 Replication throttle rate for broker 1033 [follower] (based on a 90% max free capacity utilization): 181.88MB/s
2020/02/27 22:28:15 Updated throttle on broker 1033 [follower]
2020/02/27 22:28:28 Topics with ongoing reassignments: [test0]
2020/02/27 22:28:28 Source brokers participating in replication: [1037 1039]
2020/02/27 22:28:28 Destination brokers participating in replication: [1033 1041]
2020/02/27 22:28:28 Replication throttle rate for broker 1039 [leader] (based on a 90% max free capacity utilization): 225.00MB/s
2020/02/27 22:28:28 Updated throttle on broker 1039 [leader]
2020/02/27 22:28:28 Replication throttle rate for broker 1041 [follower] (based on a 90% max free capacity utilization): 225.00MB/s
2020/02/27 22:28:28 Updated throttle on broker 1041 [follower]
2020/02/27 22:28:29 Replication throttle rate for broker 1033 [follower] (based on a 90% max free capacity utilization): 225.00MB/s
2020/02/27 22:28:29 Updated throttle on broker 1033 [follower]
2020/02/27 22:28:29 Replication throttle rate for broker 1037 [leader] (based on a 90% max free capacity utilization): 225.00MB/s
2020/02/27 22:28:29 Updated throttle on broker 1037 [leader]
...
2020/02/27 22:35:58 Topics done reassigning: [test0]
2020/02/27 22:35:58 No topics undergoing reassignment
2020/02/27 22:36:09 Throttle removed on broker 1039
2020/02/27 22:36:10 Throttle removed on broker 1037
2020/02/27 22:36:11 Throttle removed on broker 1033
2020/02/27 22:36:12 Throttle removed on broker 1041
```

Overlaying autothrottle Datadog events on a recovery dashboard:
Expand Down Expand Up @@ -90,12 +97,16 @@ Usage of autothrottle:
Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1)
-interval int
Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180)
-max-rate float
Maximum replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RATE] (default 90)
-max-rx-rate float
Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90)
-max-tx-rate float
Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90)
-metrics-window int
Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120)
-min-rate float
Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10)
-net-rx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}")
-net-tx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}")
-version
Expand All @@ -110,39 +121,34 @@ Usage of autothrottle:

## Rate Calculations, Applying Throttles

The throttle rate is calculated by building a map of destination (brokers where partitions are being replicated to) and source brokers (brokers where partitions are being replicated from) and determining a suitable rate based on outbound network utilization on source brokers. The most saturated source broker is used to determine the throttle rate for all replicating brokers (this is done for simplicity as a per-path rate is more complex than it sounds). Autothrottle references the provided `-cap-map` to lookup the network capacity. Autothrottle compares the amount of ongoing network throughput against the capacity (subtracting any amount already allocated for replication) to determine headroom. If more headroom is available, the throttle will be raised to consume the `-max-rate` (defaults to 90%) percent of what's available. If it's negative (throughput exceeds the configured capacity), the throttle will be lowered.
The throttle rate is calculated by building a map of destination (brokers where partitions are being replicated to) and source brokers (brokers where partitions are being replicated from) and determining a per-path rate based on the appropriate network utilization for the broker's role; source brokers (those sending out data) receive an outbound throttle based on their outbound network utilization and destination brokers (those receiving data) receive an inbound throttle based on their inbound network utilization. Autothrottle references the provided `-cap-map` to lookup the network capacity. Autothrottle compares the amount of ongoing network throughput against the capacity (subtracting any amount already allocated for replication in previous intervals) to determine headroom. If more headroom is available, the throttle will be raised to consume the `-max-{tx,rx}-rate` (defaults to 90%) percent of what's available. If it's negative (throughput exceeds the configured capacity), the throttle will be lowered.

Autothrottle fetches metrics and performs this check every `-interval` seconds. In order to reduce propagating updated throttles to brokers too aggressively, a new throttle won't be applied unless it deviates more than `-change-threshold` (defaults to 10%) percent from the previous throttle. Any time a throttle change is applied, topics are done replicating, or throttle rates cleared, autothrottle will write Datadog events tagged with `name:autothrottle` along with any additionally defined tags (via the `-dd-event-tags` param).

Autothrottle is also designed to fail-safe and avoid any unspecified decision modes. If fetching metrics fails or returns partial data, autothrottle will log what's missing and revert brokers to a safety throttle rate of `-min-rate` (defaults to 10MB/s). In order to prevent flapping, a configurable number of sequential failures before reverting to the minimum rate can be set with the `-failure-threshold` param (defaults to 1).

Some considerations:
- This works best with clusters using a single instance type.
- A single throttle rate that applies to an entire group of replicating brokers tends to work quite well, but per-path rates is planned as an eventual feature.

## Operations Notes

- Autothrottle currently assumes that exactly one instance is running per cluster. Multi-node / HA support is planned.
- Autothrottle is safe to arbitrarily restart. If restarted, the first iteration may temporarily lower an existing throttle since it doesn't have a known rate to use as a compensation value in calculating headroom.
- Autothrottle is effectively stateless and safe to restart at any time. If restarted, the first iteration may temporarily lower an existing throttle since it doesn't have a known rate to use as a compensation value in calculating headroom.
- Autothrottle is safe to stop using at any time. All operations mimic existing internals/functionality of Kafka. Autothrottle intends to be a layer of metrics driven decision autonomy.
- It's easy to accidentally leave throttles applied when performing manual reassignments. Autothrottle automatically clears previously applied throttles when no replications are running, and does a global throttle clearing every `-cleanup-after` iterations.

## Admin API

The administrative API allows overrides to be set. If an override is set, Datadog metrics will not be fetched.

```
$ curl localhost:8080/get_throttle
No throttle is set
When setting a throttle, an optional `autoremove` bool parameter can be specified. If set, the throttle override will be removed

$ curl -XPOST "localhost:8080/set_throttle?rate=100"
Throttle successfully set to 100MB/s
```
$ curl -XPOST "localhost:8080/set_throttle?rate=200&autoremove=true"
throttle successfully set to 200MB/s, autoremove==true
$ curl localhost:8080/get_throttle
A throttle override is configured at 100MB/s
$ curl -XPOST "localhost:8080/get_throttle"
a throttle override is configured at 200MB/s, autoremove==true
$ curl -XPOST localhost:8080/remove_throttle
Throttle successfully removed
throttle successfully removed
```

# Diagrams
Expand Down
105 changes: 105 additions & 0 deletions cmd/autothrottle/brokers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"testing"

"github.com/DataDog/kafka-kit/kafkametrics"
)

func TestIncompleteBrokerMetrics(t *testing.T) {
bm := mockBrokerMetrics()

ids := []int{1001, 1002, 1003}

if incompleteBrokerMetrics(ids, bm) {
t.Errorf("Expected false return val")
}

ids = append(ids, 1020)

if !incompleteBrokerMetrics(ids, bm) {
t.Errorf("Expected true return val")
}
}

func mockBrokerMetrics() kafkametrics.BrokerMetrics {
return kafkametrics.BrokerMetrics{
1000: &kafkametrics.Broker{
ID: 1000,
Host: "host0",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1001: &kafkametrics.Broker{
ID: 1001,
Host: "host1",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1002: &kafkametrics.Broker{
ID: 1002,
Host: "host2",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1003: &kafkametrics.Broker{
ID: 1003,
Host: "host3",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1004: &kafkametrics.Broker{
ID: 1004,
Host: "host4",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1005: &kafkametrics.Broker{
ID: 1005,
Host: "host5",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 180.00,
},
1006: &kafkametrics.Broker{
ID: 1006,
Host: "host6",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1007: &kafkametrics.Broker{
ID: 1007,
Host: "host7",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1008: &kafkametrics.Broker{
ID: 1008,
Host: "host8",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1009: &kafkametrics.Broker{
ID: 1009,
Host: "host9",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 80.00,
},
1010: &kafkametrics.Broker{
ID: 1010,
Host: "host10",
InstanceType: "mock",
NetTX: 80.00,
NetRX: 120.00,
},
}
}
60 changes: 37 additions & 23 deletions cmd/autothrottle/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ type Limits map[string]float64
type NewLimitsConfig struct {
// Min throttle rate in MB/s.
Minimum float64
// Max throttle rate as a portion of capacity.
Maximum float64
// Max source broker throttle rate as a portion of capacity.
SourceMaximum float64
// Max destination broker throttle rate as a portion of capacity.
DestinationMaximum float64
// Map of instance-type to total network capacity in MB/s.
CapacityMap map[string]float64
}
Expand All @@ -27,49 +29,61 @@ func NewLimits(c NewLimitsConfig) (Limits, error) {
switch {
case c.Minimum <= 0:
return nil, errors.New("minimum must be > 0")
case c.Maximum <= 0 || c.Maximum > 100:
return nil, errors.New("maximum must be > 0 and < 100")
case c.SourceMaximum <= 0 || c.SourceMaximum >= 100:
return nil, errors.New("source maximum must be > 0 and < 100")
case c.DestinationMaximum <= 0 || c.DestinationMaximum >= 100:
return nil, errors.New("destination maximum must be > 0 and < 100")
}

// Populate the min/max vals into the Limits map.
lim := Limits{
// Min. config.
"minimum": c.Minimum,
"maximum": c.Maximum,
"srcMax": c.SourceMaximum,
"dstMax": c.DestinationMaximum,
}

// Update with provided
// capacity map.
// Update with provided capacity map.
for k, v := range c.CapacityMap {
lim[k] = v
}

return lim, nil
}

// headroom takes a *kafkametrics.Broker and last set throttle rate and
// returns the headroom based on utilization vs capacity. Headroom is
// determined by subtracting the current throttle rate from the current
// outbound network utilization. This yields a crude approximation of
// how much non-replication throughput is currently being demanded.
// The non-replication throughput is then subtracted from the total
// network capacity available. This value suggests what headroom is
// available for replication. We then use the greater of:
// replicationHeadroom takes a *kafkametrics.Broker, what type of replica role
// it's fulfilling, and the last set throttle rate. A replication headroom value
// is returned based on utilization vs capacity. Headroom is determined by
// subtracting the current throttle rate from the current network utilization.
// This yields a crude approximation of how much non-replication throughput is
// currently being demanded. The non-replication throughput is then subtracted
// from the total network capacity available. This value suggests what headroom
// is available for replication. We then use the greater of:
// - this value * the configured portion of free bandwidth eligible for replication
// - the configured minimum replication rate in MB/s
func (l Limits) headroom(b *kafkametrics.Broker, t float64) (float64, error) {
if b == nil {
return l["minimum"], errors.New("Nil broker provided")
func (l Limits) replicationHeadroom(b *kafkametrics.Broker, rt replicaType, prevThrottle float64) (float64, error) {
var currNetUtilization float64
var maxRatio float64

switch rt {
case "leader":
currNetUtilization = b.NetTX
maxRatio = l["srcMax"]
case "follower":
currNetUtilization = b.NetRX
maxRatio = l["dstMax"]
default:
return 0.00, errors.New("invalid replica type")
}

if capacity, exists := l[b.InstanceType]; exists {
nonThrottleUtil := math.Max(b.NetTX-t, 0.00)
nonThrottleUtil := math.Max(currNetUtilization-prevThrottle, 0.00)
// Determine if/how far over the target capacity
// we are. This is also subtracted from the available
// headroom.
overCap := math.Max(b.NetTX-capacity, 0.00)
overCap := math.Max(currNetUtilization-capacity, 0.00)

return math.Max((capacity-nonThrottleUtil-overCap)*(l["maximum"]/100), l["minimum"]), nil
return math.Max((capacity-nonThrottleUtil-overCap)*(maxRatio/100), l["minimum"]), nil
}

return l["minimum"], errors.New("Unknown instance type")
return l["minimum"], errors.New("unknown instance type")
}
Loading

0 comments on commit 896a115

Please sign in to comment.