Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttle: support "/throttle/check-self" available on all tablets #7319

Merged
Merged
16 changes: 16 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 {
return def
}

// AsFloat64 returns the named field as float64, or default value if nonexistent/error
func (r RowNamedValues) AsFloat64(fieldName string, def float64) float64 {
if v, err := r.ToFloat64(fieldName); err == nil {
return v
}
return def
}

// ToFloat64 returns the named field as float64
func (r RowNamedValues) ToFloat64(fieldName string) (float64, error) {
if v, ok := r[fieldName]; ok {
return v.ToFloat64()
}
return 0, ErrNoSuchField
}

// ToBool returns the named field as bool
func (r RowNamedValues) ToBool(fieldName string) (bool, error) {
if v, ok := r[fieldName]; ok {
Expand Down
88 changes: 66 additions & 22 deletions go/test/endtoend/tabletmanager/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (

var (
clusterInstance *cluster.LocalProcessCluster
masterTablet cluster.Vttablet
replicaTablet cluster.Vttablet
primaryTablet *cluster.Vttablet
replicaTablet *cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
Expand Down Expand Up @@ -65,8 +65,9 @@ var (
}
}`

httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -110,9 +111,9 @@ func TestMain(m *testing.M) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = *tablet
primaryTablet = tablet
} else if tablet.Type != "rdonly" {
replicaTablet = *tablet
replicaTablet = tablet
}
}

Expand All @@ -121,18 +122,24 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttleCheck() (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath))
func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath))
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
}

func TestThrottlerBeforeMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

// Immediately after startup, we expect this response:
// {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
Expand All @@ -141,9 +148,21 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
time.Sleep(10 * time.Second)
// By this time metrics will have been collected. We expect no lag, and something like:
// {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}

func TestLag(t *testing.T) {
Expand All @@ -156,19 +175,44 @@ func TestLag(t *testing.T) {
time.Sleep(2 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these durations used in the tests (2 seconds, 10 seconds, 5 seconds) based on values configured elsewhere? Just wondering if these can become flaky if a configuration is changed elsewhere.

Copy link
Contributor Author

@shlomi-noach shlomi-noach Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent question. There are some hard coded intervals here:

leaderCheckInterval = 5 * time.Second
mysqlCollectInterval = 100 * time.Millisecond
mysqlDormantCollectInterval = 5 * time.Second
mysqlRefreshInterval = 10 * time.Second
mysqlAggregateInterval = 100 * time.Millisecond

Basically:

  • after a cluster starts, it takes ~5sec for throttler to know about all replicas and collect data from those replicas and aggregate that data -- meaning an API /throttler/check can provide a reliable answer. Hence waiting for 10sec (can reduce that to e.g.7s but I feel 10sec is much safer against flakyness.
  • user apps will cache throttler results for some 250ms (e.g. see in the vreplication PRs), so a 1sec sleep can ensure the cache is cleared
  • The default lag threshold is 1s, hence the 2 * time.Second sleep after StopReplication(), to make sure when we next check for lag, there is a throttle-grade lag. Having said that, you are right that this is overridable -- so I just added
			"-throttle_threshold", "1s",

to this test's VtTabletExtraArgs to ensure the threshold is 1s.

  • The sleep for 5 * time.Second after StartReplication is just heuristic to allow the replication to catch up, and does not depend on the throttler configuration. Again, catch up is likely to happen in less than 1s but I feel 5s is great against flakyness.
  • The sleep for 10 * time.Second after ChangeTabletType is because it will take that amount of time for the throttler to identify the new roster. It's hard coded in mysqlRefreshInterval = 10 * time.Second, and now I've actually uppsed the test to sleep for 12 * time.Second to avoid flakyness.

I've moreover now made these numbers as constants in the test. Now the waits are named, and its clearer what each wait means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new code:

const (
	throttlerInitWait        = 10 * time.Second
	accumulateLagWait        = 2 * time.Second
	mysqlRefreshIntervalWait = 12 * time.Second
	replicationCatchUpWait   = 5 * time.Second
)

...

		time.Sleep(mysqlRefreshIntervalWait)

// Lag will have accumulated
// {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(5 * time.Second)
// Restore
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}
}

Expand All @@ -181,7 +225,7 @@ func TestNoReplicas(t *testing.T) {
time.Sleep(10 * time.Second)
// This makes no REPLICA servers available. We expect something like:
// {"StatusCode":200,"Value":0,"Threshold":1,"Message":""}
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand All @@ -191,7 +235,7 @@ func TestNoReplicas(t *testing.T) {

time.Sleep(10 * time.Second)
// Restore valid replica
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand Down
9 changes: 9 additions & 0 deletions go/timer/suspendable_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func (s *SuspendableTicker) Stop() {
s.ticker.Stop()
}

// TickNow generates a tick at this point in time. It may block
// if nothing consumes the tick.
func (s *SuspendableTicker) TickNow() {
if atomic.LoadInt64(&s.suspended) == 0 {
// not suspended
s.C <- time.Now()
}
}

func (s *SuspendableTicker) loop() {
for t := range s.ticker.C {
if atomic.LoadInt64(&s.suspended) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err

sm.ddle.Close()
sm.tableGC.Close()
sm.throttler.Close()
sm.messager.Close()
sm.tracker.Close()
sm.se.MakeNonMaster()
Expand All @@ -470,6 +469,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err
sm.te.AcceptReadOnly()
sm.rt.MakeNonMaster()
sm.watcher.Open()
sm.throttler.Open()
sm.setState(wantTabletType, StateServing)
return nil
}
Expand Down
40 changes: 20 additions & 20 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ func TestStateManagerServeNonMaster(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down Expand Up @@ -292,18 +292,18 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down
62 changes: 33 additions & 29 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,35 +1576,39 @@ func (tsv *TabletServer) registerMigrationStatusHandler() {
})
}

// registerThrottlerCheckHandler registers a throttler "check" request
func (tsv *TabletServer) registerThrottlerCheckHandler() {
tsv.exporter.HandleFunc("/throttler/check", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := tsv.lagThrottler.Check(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}
// registerThrottlerCheckHandlers registers throttler "check" requests
func (tsv *TabletServer) registerThrottlerCheckHandlers() {
handle := func(path string, checkResultFunc func(ctx context.Context, appName string, remoteAddr string, flags *throttle.CheckFlags) *throttle.CheckResult) {
tsv.exporter.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := checkResultFunc(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}

if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
}
handle("/throttler/check", tsv.lagThrottler.Check)
handle("/throttler/check-self", tsv.lagThrottler.CheckSelf)
}

// registerThrottlerStatusHandler registers a throttler "status" request
Expand All @@ -1619,7 +1623,7 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() {

// registerThrottlerHandlers registers all throttler handlers
func (tsv *TabletServer) registerThrottlerHandlers() {
tsv.registerThrottlerCheckHandler()
tsv.registerThrottlerCheckHandlers()
tsv.registerThrottlerStatusHandler()
}

Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type InstanceKey struct {
Port int
}

// SelfInstanceKey is a special indicator for "this instance", e.g. denoting the MySQL server associated with local tablet
// The values of this key are immaterial and are intentionally descriptive
var SelfInstanceKey = &InstanceKey{Hostname: "(self)", Port: 1}

// newRawInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
// It expects such format and returns with error if input differs in format
func newRawInstanceKey(hostPort string) (*InstanceKey, error) {
Expand Down Expand Up @@ -70,6 +74,14 @@ func (i *InstanceKey) IsValid() bool {
return len(i.Hostname) > 0 && i.Port > 0
}

// IsSelf checks if this is the special "self" instance key
func (i *InstanceKey) IsSelf() bool {
if SelfInstanceKey == i {
return true
}
return SelfInstanceKey.Equals(i)
}

// StringCode returns an official string representation of this key
func (i *InstanceKey) StringCode() string {
return fmt.Sprintf("%s:%d", i.Hostname, i.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) {
return metric.Value, metric.Err
}

// ReadThrottleMetric returns replication lag for a given connection config; either by explicit query
// ReadThrottleMetric returns a metrics for the given probe. Either by explicit query
shlomi-noach marked this conversation as resolved.
Show resolved Hide resolved
// or via SHOW SLAVE STATUS
func ReadThrottleMetric(probe *Probe, clusterName string) (mySQLThrottleMetric *MySQLThrottleMetric) {
func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) {
if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil {
return mySQLThrottleMetric
// On cached results we avoid taking latency metrics
Expand All @@ -90,6 +90,11 @@ func ReadThrottleMetric(probe *Probe, clusterName string) (mySQLThrottleMetric *
}()
}(mySQLThrottleMetric, started)

if overrideGetMetricFunc != nil {
mySQLThrottleMetric = overrideGetMetricFunc()
return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric)
}

dbURI := probe.GetDBUri("information_schema")
db, fromCache, err := sqlutils.GetDB(dbURI)

Expand Down
Loading