Skip to content

Commit

Permalink
Throttler: exempt apps via `UpdateThrottlerConfig --throttle-app-exem…
Browse files Browse the repository at this point in the history
…pt` (#13666)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Aug 1, 2023
1 parent b065394 commit 364209f
Show file tree
Hide file tree
Showing 16 changed files with 376 additions and 144 deletions.
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func init() {
UpdateThrottlerConfig.Flags().StringVar(&throttledAppRule.Name, "throttle-app", "", "an app name to throttle")
UpdateThrottlerConfig.Flags().Float64Var(&throttledAppRule.Ratio, "throttle-app-ratio", throttle.DefaultThrottleRatio, "ratio to throttle app (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().DurationVar(&throttledAppDuration, "throttle-app-duration", throttle.DefaultAppThrottleDuration, "duration after which throttled app rule expires (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().BoolVar(&throttledAppRule.Exempt, "throttle-app-exempt", throttledAppRule.Exempt, "exempt this app from being at all throttled. WARNING: use with extreme care, as this is likely to push metrics beyond the throttler's threshold, and starve other apps")

Root.AddCommand(UpdateThrottlerConfig)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestSchemaChange(t *testing.T) {
}
})
t.Run("updating throttler config", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, noCustomQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, noCustomQuery, nil)
require.NoError(t, err)
})

Expand Down
44 changes: 33 additions & 11 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -45,6 +46,7 @@ const (
onDemandHeartbeatDuration = 5 * time.Second
throttlerEnabledTimeout = 60 * time.Second
useDefaultQuery = ""
testAppName = "test"
)

var (
Expand Down Expand Up @@ -170,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s&s=%t", tablet.HTTPPort, checkAPIPath, testAppName, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down Expand Up @@ -245,7 +247,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("enabling throttler with very low threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with the new config.
Expand All @@ -257,7 +259,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("disabling throttler", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be disabled everywhere.
Expand All @@ -271,7 +273,7 @@ func TestInitialThrottler(t *testing.T) {
t.Run("enabling throttler, again", func(t *testing.T) {
// Enable the throttler again with the default query which also moves us back
// to the default threshold.
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere again with the default config.
Expand All @@ -283,7 +285,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("setting high threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new config.
Expand All @@ -295,7 +297,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("setting low threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new config.
Expand Down Expand Up @@ -392,6 +394,26 @@ func TestLag(t *testing.T) {
defer resp.Body.Close()
assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
})
t.Run("exempting test app", func(t *testing.T) {
appRule := &topodatapb.ThrottledAppRule{
Name: testAppName,
ExpiresAt: logutil.TimeToProto(time.Now().Add(time.Hour)),
Exempt: true,
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, appRule)
assert.NoError(t, err)
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("unexempting test app", func(t *testing.T) {
appRule := &topodatapb.ThrottledAppRule{
Name: testAppName,
ExpiresAt: logutil.TimeToProto(time.Now()),
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, appRule)
assert.NoError(t, err)
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})

t.Run("starting replication", func(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)
Expand Down Expand Up @@ -436,7 +458,7 @@ func TestCustomQuery(t *testing.T) {
defer cluster.PanicHandler(t)

t.Run("enabling throttler with custom query and threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new custom config.
Expand Down Expand Up @@ -504,7 +526,7 @@ func TestRestoreDefaultQuery(t *testing.T) {

// Validate going back from custom-query to default-query (replication lag) still works.
t.Run("enabling throttler with default query and threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be up and running everywhere again with the default config.
Expand Down
18 changes: 14 additions & 4 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand All @@ -57,7 +59,7 @@ var DefaultConfig = &Config{
// This retries the command until it succeeds or times out as the
// SrvKeyspace record may not yet exist for a newly created
// Keyspace that is still initializing before it becomes serving.
func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string) (result string, err error) {
func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string, appRule *topodatapb.ThrottledAppRule) (result string, err error) {
args := []string{}
args = append(args, "UpdateThrottlerConfig")
if enable {
Expand All @@ -75,6 +77,14 @@ func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, ke
} else {
args = append(args, "--check-as-check-shard")
}
if appRule != nil {
args = append(args, "--throttle-app", appRule.Name)
args = append(args, "--throttle-app-duration", logutil.ProtoToTime(appRule.ExpiresAt).Sub(time.Now()).String())
args = append(args, "--throttle-app-ratio", fmt.Sprintf("%f", appRule.Ratio))
if appRule.Exempt {
args = append(args, "--throttle-app-exempt")
}
}
args = append(args, keyspaceName)

ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout)
Expand All @@ -100,14 +110,14 @@ func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, ke
// This retries the command until it succeeds or times out as the
// SrvKeyspace record may not yet exist for a newly created
// Keyspace that is still initializing before it becomes serving.
func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string) (string, error) {
func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, appRule *topodatapb.ThrottledAppRule) (string, error) {
rec := concurrency.AllErrorRecorder{}
var (
err error
res strings.Builder
)
for _, ks := range clusterInstance.Keyspaces {
ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery)
ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery, appRule)
if err != nil {
rec.RecordError(err)
}
Expand Down Expand Up @@ -335,7 +345,7 @@ func WaitForThrottledApp(t *testing.T, tablet *cluster.Vttablet, throttlerApp th
// The throttler is configued to use the standard replication lag metric. The function waits until the throttler is confirmed
// to be running on all tablets.
func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.LocalProcessCluster, lag time.Duration) {
_, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "")
_, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "", nil)
require.NoError(t, err)

for _, ks := range clusterInstance.Keyspaces {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
}

log.Infof("Applying throttler config for keyspace %s", keyspace.Name)
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query)
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query, nil)
require.NoError(t, err, res)

cellsToWatch := ""
Expand Down
Loading

0 comments on commit 364209f

Please sign in to comment.