Skip to content

Commit

Permalink
Merge pull request #7319 from planetscale/throttler-replica-throttle-lag
Browse files Browse the repository at this point in the history
Tablet throttle: support "/throttle/check-self" available on all tablets
  • Loading branch information
shlomi-noach authored Jan 26, 2021
2 parents 627f092 + e20e0f7 commit 1514987
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 109 deletions.
16 changes: 16 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 {
return def
}

// AsFloat64 returns the named field as float64, or default value if nonexistent/error
func (r RowNamedValues) AsFloat64(fieldName string, def float64) float64 {
if v, err := r.ToFloat64(fieldName); err == nil {
return v
}
return def
}

// ToFloat64 returns the named field as float64
func (r RowNamedValues) ToFloat64(fieldName string) (float64, error) {
if v, ok := r[fieldName]; ok {
return v.ToFloat64()
}
return 0, ErrNoSuchField
}

// ToBool returns the named field as bool
func (r RowNamedValues) ToBool(fieldName string) (bool, error) {
if v, ok := r[fieldName]; ok {
Expand Down
106 changes: 79 additions & 27 deletions go/test/endtoend/tabletmanager/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (

var (
clusterInstance *cluster.LocalProcessCluster
masterTablet cluster.Vttablet
replicaTablet cluster.Vttablet
primaryTablet *cluster.Vttablet
replicaTablet *cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
Expand Down Expand Up @@ -65,8 +65,16 @@ var (
}
}`

httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
)

const (
throttlerInitWait = 10 * time.Second
accumulateLagWait = 2 * time.Second
throttlerRefreshIntervalWait = 12 * time.Second
replicationCatchUpWait = 5 * time.Second
)

func TestMain(m *testing.M) {
Expand All @@ -89,6 +97,7 @@ func TestMain(m *testing.M) {
"-watch_replication_stream",
"-enable_replication_reporter",
"-enable-lag-throttler",
"-throttle_threshold", "1s",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}
Expand All @@ -110,9 +119,9 @@ func TestMain(m *testing.M) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = *tablet
primaryTablet = tablet
} else if tablet.Type != "rdonly" {
replicaTablet = *tablet
replicaTablet = tablet
}
}

Expand All @@ -121,29 +130,47 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttleCheck() (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath))
func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath))
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
}

func TestThrottlerBeforeMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

// Immediately after startup, we expect this response:
// {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

time.Sleep(10 * time.Second)
time.Sleep(throttlerInitWait)
// By this time metrics will have been collected. We expect no lag, and something like:
// {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}

func TestLag(t *testing.T) {
Expand All @@ -153,22 +180,47 @@ func TestLag(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
time.Sleep(accumulateLagWait)
// Lag will have accumulated
// {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(5 * time.Second)
time.Sleep(replicationCatchUpWait)
// Restore
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}
}

Expand All @@ -178,20 +230,20 @@ func TestNoReplicas(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
time.Sleep(throttlerRefreshIntervalWait)
// This makes no REPLICA servers available. We expect something like:
// {"StatusCode":200,"Value":0,"Threshold":1,"Message":""}
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
time.Sleep(throttlerRefreshIntervalWait)
// Restore valid replica
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand Down
9 changes: 9 additions & 0 deletions go/timer/suspendable_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func (s *SuspendableTicker) Stop() {
s.ticker.Stop()
}

// TickNow generates a tick at this point in time. It may block
// if nothing consumes the tick.
func (s *SuspendableTicker) TickNow() {
if atomic.LoadInt64(&s.suspended) == 0 {
// not suspended
s.C <- time.Now()
}
}

func (s *SuspendableTicker) loop() {
for t := range s.ticker.C {
if atomic.LoadInt64(&s.suspended) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err

sm.ddle.Close()
sm.tableGC.Close()
sm.throttler.Close()
sm.messager.Close()
sm.tracker.Close()
sm.se.MakeNonMaster()
Expand All @@ -470,6 +469,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err
sm.te.AcceptReadOnly()
sm.rt.MakeNonMaster()
sm.watcher.Open()
sm.throttler.Open()
sm.setState(wantTabletType, StateServing)
return nil
}
Expand Down
40 changes: 20 additions & 20 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ func TestStateManagerServeNonMaster(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down Expand Up @@ -292,18 +292,18 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down
62 changes: 33 additions & 29 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,35 +1576,39 @@ func (tsv *TabletServer) registerMigrationStatusHandler() {
})
}

// registerThrottlerCheckHandler registers a throttler "check" request
func (tsv *TabletServer) registerThrottlerCheckHandler() {
tsv.exporter.HandleFunc("/throttler/check", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := tsv.lagThrottler.Check(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}
// registerThrottlerCheckHandlers registers throttler "check" requests
func (tsv *TabletServer) registerThrottlerCheckHandlers() {
handle := func(path string, checkResultFunc func(ctx context.Context, appName string, remoteAddr string, flags *throttle.CheckFlags) *throttle.CheckResult) {
tsv.exporter.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := checkResultFunc(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}

if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
}
handle("/throttler/check", tsv.lagThrottler.Check)
handle("/throttler/check-self", tsv.lagThrottler.CheckSelf)
}

// registerThrottlerStatusHandler registers a throttler "status" request
Expand All @@ -1619,7 +1623,7 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() {

// registerThrottlerHandlers registers all throttler handlers
func (tsv *TabletServer) registerThrottlerHandlers() {
tsv.registerThrottlerCheckHandler()
tsv.registerThrottlerCheckHandlers()
tsv.registerThrottlerStatusHandler()
}

Expand Down
Loading

0 comments on commit 1514987

Please sign in to comment.