diff --git a/go/sqltypes/named_result.go b/go/sqltypes/named_result.go index 7a67b5d4489..6d1621a8f1a 100644 --- a/go/sqltypes/named_result.go +++ b/go/sqltypes/named_result.go @@ -78,6 +78,22 @@ func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 { return def } +// AsFloat64 returns the named field as float64, or default value if nonexistent/error +func (r RowNamedValues) AsFloat64(fieldName string, def float64) float64 { + if v, err := r.ToFloat64(fieldName); err == nil { + return v + } + return def +} + +// ToFloat64 returns the named field as float64 +func (r RowNamedValues) ToFloat64(fieldName string) (float64, error) { + if v, ok := r[fieldName]; ok { + return v.ToFloat64() + } + return 0, ErrNoSuchField +} + // ToBool returns the named field as bool func (r RowNamedValues) ToBool(fieldName string) (bool, error) { if v, ok := r[fieldName]; ok { diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index cc4cfb7612d..5a5697e615b 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -32,8 +32,8 @@ import ( var ( clusterInstance *cluster.LocalProcessCluster - masterTablet cluster.Vttablet - replicaTablet cluster.Vttablet + primaryTablet *cluster.Vttablet + replicaTablet *cluster.Vttablet hostname = "localhost" keyspaceName = "ks" cell = "zone1" @@ -65,8 +65,16 @@ var ( } }` - httpClient = base.SetupHTTPClient(time.Second) - checkAPIPath = "throttler/check" + httpClient = base.SetupHTTPClient(time.Second) + checkAPIPath = "throttler/check" + checkSelfAPIPath = "throttler/check-self" +) + +const ( + throttlerInitWait = 10 * time.Second + accumulateLagWait = 2 * time.Second + throttlerRefreshIntervalWait = 12 * time.Second + replicationCatchUpWait = 5 * time.Second ) func TestMain(m *testing.M) { @@ -89,6 +97,7 @@ func TestMain(m *testing.M) { "-watch_replication_stream", "-enable_replication_reporter", "-enable-lag-throttler", + "-throttle_threshold", "1s", "-heartbeat_enable", "-heartbeat_interval", "250ms", } @@ -110,9 +119,9 @@ func TestMain(m *testing.M) { tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets for _, tablet := range tablets { if tablet.Type == "master" { - masterTablet = *tablet + primaryTablet = tablet } else if tablet.Type != "rdonly" { - replicaTablet = *tablet + replicaTablet = tablet } } @@ -121,8 +130,12 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func throttleCheck() (*http.Response, error) { - return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath)) +func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) { + return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath)) +} + +func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { + return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) } func TestThrottlerBeforeMetricsCollected(t *testing.T) { @@ -130,20 +143,34 @@ func TestThrottlerBeforeMetricsCollected(t *testing.T) { // Immediately after startup, we expect this response: // {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"} - resp, err := throttleCheck() - assert.NoError(t, err) - assert.Equal(t, http.StatusNotFound, resp.StatusCode) + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + } } func TestThrottlerAfterMetricsCollected(t *testing.T) { defer cluster.PanicHandler(t) - time.Sleep(10 * time.Second) + time.Sleep(throttlerInitWait) // By this time metrics will have been collected. We expect no lag, and something like: // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} - resp, err := throttleCheck() - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } } func TestLag(t *testing.T) { @@ -153,22 +180,47 @@ func TestLag(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) - time.Sleep(2 * time.Second) + time.Sleep(accumulateLagWait) // Lag will have accumulated // {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"} - resp, err := throttleCheck() - assert.NoError(t, err) - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + // self (on primary) is unaffected by replication lag + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + } } { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) assert.NoError(t, err) - time.Sleep(5 * time.Second) + time.Sleep(replicationCatchUpWait) // Restore - resp, err := throttleCheck() - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } } } @@ -178,10 +230,10 @@ func TestNoReplicas(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY") assert.NoError(t, err) - time.Sleep(10 * time.Second) + time.Sleep(throttlerRefreshIntervalWait) // This makes no REPLICA servers available. We expect something like: // {"StatusCode":200,"Value":0,"Threshold":1,"Message":""} - resp, err := throttleCheck() + resp, err := throttleCheck(primaryTablet) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) } @@ -189,9 +241,9 @@ func TestNoReplicas(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") assert.NoError(t, err) - time.Sleep(10 * time.Second) + time.Sleep(throttlerRefreshIntervalWait) // Restore valid replica - resp, err := throttleCheck() + resp, err := throttleCheck(primaryTablet) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) } diff --git a/go/timer/suspendable_ticker.go b/go/timer/suspendable_ticker.go index 2d971c69eb0..5257626b85f 100644 --- a/go/timer/suspendable_ticker.go +++ b/go/timer/suspendable_ticker.go @@ -61,6 +61,15 @@ func (s *SuspendableTicker) Stop() { s.ticker.Stop() } +// TickNow generates a tick at this point in time. It may block +// if nothing consumes the tick. +func (s *SuspendableTicker) TickNow() { + if atomic.LoadInt64(&s.suspended) == 0 { + // not suspended + s.C <- time.Now() + } +} + func (s *SuspendableTicker) loop() { for t := range s.ticker.C { if atomic.LoadInt64(&s.suspended) == 0 { diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 7aef0bbd083..c1498e43f44 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -458,7 +458,6 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err sm.ddle.Close() sm.tableGC.Close() - sm.throttler.Close() sm.messager.Close() sm.tracker.Close() sm.se.MakeNonMaster() @@ -470,6 +469,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err sm.te.AcceptReadOnly() sm.rt.MakeNonMaster() sm.watcher.Open() + sm.throttler.Open() sm.setState(wantTabletType, StateServing) return nil } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 4349212ffec..577dbc8f6ad 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -101,18 +101,18 @@ func TestStateManagerServeNonMaster(t *testing.T) { verifySubcomponent(t, 1, sm.ddle, testStateClosed) verifySubcomponent(t, 2, sm.tableGC, testStateClosed) - verifySubcomponent(t, 3, sm.throttler, testStateClosed) - verifySubcomponent(t, 4, sm.messager, testStateClosed) - verifySubcomponent(t, 5, sm.tracker, testStateClosed) + verifySubcomponent(t, 3, sm.messager, testStateClosed) + verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonMaster) - verifySubcomponent(t, 6, sm.se, testStateOpen) - verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 8, sm.qe, testStateOpen) - verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 10, sm.te, testStateNonMaster) - verifySubcomponent(t, 11, sm.rt, testStateNonMaster) - verifySubcomponent(t, 12, sm.watcher, testStateOpen) + verifySubcomponent(t, 5, sm.se, testStateOpen) + verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 7, sm.qe, testStateOpen) + verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 9, sm.te, testStateNonMaster) + verifySubcomponent(t, 10, sm.rt, testStateNonMaster) + verifySubcomponent(t, 11, sm.watcher, testStateOpen) + verifySubcomponent(t, 12, sm.throttler, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -292,18 +292,18 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) { verifySubcomponent(t, 1, sm.ddle, testStateClosed) verifySubcomponent(t, 2, sm.tableGC, testStateClosed) - verifySubcomponent(t, 3, sm.throttler, testStateClosed) - verifySubcomponent(t, 4, sm.messager, testStateClosed) - verifySubcomponent(t, 5, sm.tracker, testStateClosed) + verifySubcomponent(t, 3, sm.messager, testStateClosed) + verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonMaster) - verifySubcomponent(t, 6, sm.se, testStateOpen) - verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 8, sm.qe, testStateOpen) - verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 10, sm.te, testStateNonMaster) - verifySubcomponent(t, 11, sm.rt, testStateNonMaster) - verifySubcomponent(t, 12, sm.watcher, testStateOpen) + verifySubcomponent(t, 5, sm.se, testStateOpen) + verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 7, sm.qe, testStateOpen) + verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 9, sm.te, testStateNonMaster) + verifySubcomponent(t, 10, sm.rt, testStateNonMaster) + verifySubcomponent(t, 11, sm.watcher, testStateOpen) + verifySubcomponent(t, 12, sm.throttler, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ccb412bd183..be1d56b79d5 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1576,35 +1576,39 @@ func (tsv *TabletServer) registerMigrationStatusHandler() { }) } -// registerThrottlerCheckHandler registers a throttler "check" request -func (tsv *TabletServer) registerThrottlerCheckHandler() { - tsv.exporter.HandleFunc("/throttler/check", func(w http.ResponseWriter, r *http.Request) { - ctx := tabletenv.LocalContext() - remoteAddr := r.Header.Get("X-Forwarded-For") - if remoteAddr == "" { - remoteAddr = r.RemoteAddr - remoteAddr = strings.Split(remoteAddr, ":")[0] - } - appName := r.URL.Query().Get("app") - if appName == "" { - appName = throttle.DefaultAppName - } - flags := &throttle.CheckFlags{ - LowPriority: (r.URL.Query().Get("p") == "low"), - } - checkResult := tsv.lagThrottler.Check(ctx, appName, remoteAddr, flags) - if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists { - checkResult.StatusCode = http.StatusOK // 200 - } +// registerThrottlerCheckHandlers registers throttler "check" requests +func (tsv *TabletServer) registerThrottlerCheckHandlers() { + handle := func(path string, checkResultFunc func(ctx context.Context, appName string, remoteAddr string, flags *throttle.CheckFlags) *throttle.CheckResult) { + tsv.exporter.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + ctx := tabletenv.LocalContext() + remoteAddr := r.Header.Get("X-Forwarded-For") + if remoteAddr == "" { + remoteAddr = r.RemoteAddr + remoteAddr = strings.Split(remoteAddr, ":")[0] + } + appName := r.URL.Query().Get("app") + if appName == "" { + appName = throttle.DefaultAppName + } + flags := &throttle.CheckFlags{ + LowPriority: (r.URL.Query().Get("p") == "low"), + } + checkResult := checkResultFunc(ctx, appName, remoteAddr, flags) + if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists { + checkResult.StatusCode = http.StatusOK // 200 + } - if r.Method == http.MethodGet { - w.Header().Set("Content-Type", "application/json") - } - w.WriteHeader(checkResult.StatusCode) - if r.Method == http.MethodGet { - json.NewEncoder(w).Encode(checkResult) - } - }) + if r.Method == http.MethodGet { + w.Header().Set("Content-Type", "application/json") + } + w.WriteHeader(checkResult.StatusCode) + if r.Method == http.MethodGet { + json.NewEncoder(w).Encode(checkResult) + } + }) + } + handle("/throttler/check", tsv.lagThrottler.Check) + handle("/throttler/check-self", tsv.lagThrottler.CheckSelf) } // registerThrottlerStatusHandler registers a throttler "status" request @@ -1619,7 +1623,7 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() { // registerThrottlerHandlers registers all throttler handlers func (tsv *TabletServer) registerThrottlerHandlers() { - tsv.registerThrottlerCheckHandler() + tsv.registerThrottlerCheckHandlers() tsv.registerThrottlerStatusHandler() } diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go index d86d1317606..adcd6f422fb 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go @@ -17,6 +17,10 @@ type InstanceKey struct { Port int } +// SelfInstanceKey is a special indicator for "this instance", e.g. denoting the MySQL server associated with local tablet +// The values of this key are immaterial and are intentionally descriptive +var SelfInstanceKey = &InstanceKey{Hostname: "(self)", Port: 1} + // newRawInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 // It expects such format and returns with error if input differs in format func newRawInstanceKey(hostPort string) (*InstanceKey, error) { @@ -70,6 +74,14 @@ func (i *InstanceKey) IsValid() bool { return len(i.Hostname) > 0 && i.Port > 0 } +// IsSelf checks if this is the special "self" instance key +func (i *InstanceKey) IsSelf() bool { + if SelfInstanceKey == i { + return true + } + return SelfInstanceKey.Equals(i) +} + // StringCode returns an official string representation of this key func (i *InstanceKey) StringCode() string { return fmt.Sprintf("%s:%d", i.Hostname, i.Port) diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 99f130efadd..3e02c2dd589 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -67,9 +67,9 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) { return metric.Value, metric.Err } -// ReadThrottleMetric returns replication lag for a given connection config; either by explicit query +// ReadThrottleMetric returns a metric for the given probe. Either by explicit query // or via SHOW SLAVE STATUS -func ReadThrottleMetric(probe *Probe, clusterName string) (mySQLThrottleMetric *MySQLThrottleMetric) { +func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) { if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { return mySQLThrottleMetric // On cached results we avoid taking latency metrics @@ -90,6 +90,11 @@ func ReadThrottleMetric(probe *Probe, clusterName string) (mySQLThrottleMetric * }() }(mySQLThrottleMetric, started) + if overrideGetMetricFunc != nil { + mySQLThrottleMetric = overrideGetMetricFunc() + return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) + } + dbURI := probe.GetDBUri("information_schema") db, fromCache, err := sqlutils.GetDB(dbURI) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 123ea09d23e..6bf22c33cf1 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -54,7 +54,8 @@ const ( maxPasswordLength = 32 - localStoreName = "local" + shardStoreName = "shard" + selfStoreName = "self" ) var throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for throttling") @@ -71,7 +72,7 @@ var ( sqlGrantThrottlerUser = []string{ `GRANT SELECT ON _vt.heartbeat TO %s`, } - replicationLagQuery = `select unix_timestamp(now(6))-max(ts/1000000000) from _vt.heartbeat` + replicationLagQuery = `select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from _vt.heartbeat` ) func init() { @@ -165,10 +166,14 @@ func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topo throttler.initThrottleTabletTypes() throttler.ThrottleApp("abusing-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio) throttler.check = NewThrottlerCheck(throttler) + throttler.initConfig("") + throttler.check.SelfChecks(context.Background()) return throttler } +// initThrottleTabletTypes reads the user supplied throttle_tablet_types and sets these +// for the duration of this tablet's lifetime func (throttler *Throttler) initThrottleTabletTypes() { throttler.throttleTabletTypesMap = make(map[topodatapb.TabletType]bool) @@ -199,18 +204,26 @@ func (throttler *Throttler) initConfig(password string) { Stores: config.StoresSettings{ MySQL: config.MySQLConfigurationSettings{ IgnoreDialTCPErrors: true, - Clusters: map[string](*config.MySQLClusterConfigurationSettings){ - localStoreName: &config.MySQLClusterConfigurationSettings{ - User: throttlerUser, - Password: password, - ThrottleThreshold: throttleThreshold.Seconds(), - MetricQuery: replicationLagQuery, - IgnoreHostsCount: 0, - }, - }, + Clusters: map[string](*config.MySQLClusterConfigurationSettings){}, }, }, } + config.Instance.Stores.MySQL.Clusters[selfStoreName] = &config.MySQLClusterConfigurationSettings{ + User: "", // running on local tablet server, will use vttablet DBA user + Password: "", // running on local tablet server, will use vttablet DBA user + ThrottleThreshold: throttleThreshold.Seconds(), + MetricQuery: replicationLagQuery, + IgnoreHostsCount: 0, + } + if password != "" { + config.Instance.Stores.MySQL.Clusters[shardStoreName] = &config.MySQLClusterConfigurationSettings{ + User: throttlerUser, + Password: password, + ThrottleThreshold: throttleThreshold.Seconds(), + MetricQuery: replicationLagQuery, + IgnoreHostsCount: 0, + } + } } // Open opens database pool and initializes the schema @@ -227,6 +240,8 @@ func (throttler *Throttler) Open() error { for _, t := range throttler.tickers { t.Resume() + // since we just resume now, speed up the tickers by forcng an immediate tick + go t.TickNow() } return nil @@ -308,6 +323,37 @@ func (throttler *Throttler) createThrottlerUser(ctx context.Context) (password s return password, nil } +// readSelfMySQLThrottleMetric reads the mysql metric from thi very tablet's backend mysql. +func (throttler *Throttler) readSelfMySQLThrottleMetric() *mysql.MySQLThrottleMetric { + metric := &mysql.MySQLThrottleMetric{ + ClusterName: selfStoreName, + Key: *mysql.SelfInstanceKey, + Value: 3.14, + Err: nil, + } + ctx := context.Background() + conn, err := throttler.pool.Get(ctx) + if err != nil { + metric.Err = err + return metric + } + defer conn.Recycle() + + tm, err := conn.Exec(ctx, replicationLagQuery, 1, true) + if err != nil { + metric.Err = err + return metric + } + row := tm.Named().Row() + if row == nil { + metric.Err = fmt.Errorf("no results for ReadSelfMySQLThrottleMetric") + return metric + } + metric.Value, metric.Err = row.ToFloat64("replication_lag") + + return metric +} + // ThrottledAppsSnapshot returns a snapshot (a copy) of current throttled apps func (throttler *Throttler) ThrottledAppsSnapshot() map[string]cache.Item { return throttler.throttledApps.Items() @@ -371,6 +417,8 @@ func (throttler *Throttler) Operate(ctx context.Context) { if err == nil { throttler.initConfig(password) shouldCreateThrottlerUser = false + // transitioned into leadership, let's speed up the next 'refresh' and 'collect' ticks + go mysqlRefreshTicker.TickNow() } else { log.Errorf("Error creating throttler account: %+v", err) } @@ -379,7 +427,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlCollectTicker.C: { - if atomic.LoadInt64(&throttler.isLeader) > 0 { + if atomic.LoadInt64(&throttler.isOpen) > 0 { // frequent if !throttler.isDormant() { throttler.collectMySQLMetrics(ctx) @@ -388,7 +436,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlDormantCollectTicker.C: { - if atomic.LoadInt64(&throttler.isLeader) > 0 { + if atomic.LoadInt64(&throttler.isOpen) > 0 { // infrequent if throttler.isDormant() { throttler.collectMySQLMetrics(ctx) @@ -403,7 +451,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { case <-mysqlRefreshTicker.C: { // sparse - if atomic.LoadInt64(&throttler.isLeader) > 0 { + if atomic.LoadInt64(&throttler.isOpen) > 0 { go throttler.refreshMySQLInventory(ctx) } } @@ -414,13 +462,13 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlAggregateTicker.C: { - if atomic.LoadInt64(&throttler.isLeader) > 0 { + if atomic.LoadInt64(&throttler.isOpen) > 0 { throttler.aggregateMySQLMetrics(ctx) } } case <-throttledAppsTicker.C: { - if atomic.LoadInt64(&throttler.isLeader) > 0 { + if atomic.LoadInt64(&throttler.isOpen) > 0 { go throttler.expireThrottledApps() } } @@ -445,7 +493,14 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error { return } defer atomic.StoreInt64(&probe.QueryInProgress, 0) - throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName) + + // Apply an override to metrics read, if this is the special "self" cluster + // (where we incidentally know there's a single probe) + overrideGetMySQLThrottleMetricFunc := throttler.readSelfMySQLThrottleMetric + if clusterName != selfStoreName { + overrideGetMySQLThrottleMetricFunc = nil + } + throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, overrideGetMySQLThrottleMetricFunc) throttler.mysqlThrottleMetricChan <- throttleMetrics }() } @@ -454,8 +509,7 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error { return nil } -// refreshMySQLInventory will re-structure the inventory based on reading config settings, and potentially -// re-querying dynamic data such as HAProxy list of hosts +// refreshMySQLInventory will re-structure the inventory based on reading config settings func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { log.Infof("refreshing MySQL inventory") @@ -466,7 +520,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { return } } - if !key.IsValid() { + if !key.IsValid() && !key.IsSelf() { log.Infof("Throttler: read invalid instance key: [%+v] for cluster %+v", key, clusterName) return } @@ -488,18 +542,30 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { // config may dynamically change, but internal structure (config.Settings().Stores.MySQL.Clusters in our case) // is immutable and can only be _replaced_. Hence, it's safe to read in a goroutine: go func() { - err := func() error { - throttler.mysqlClusterThresholds.Set(clusterName, clusterSettings.ThrottleThreshold, cache.DefaultExpiration) + throttler.mysqlClusterThresholds.Set(clusterName, clusterSettings.ThrottleThreshold, cache.DefaultExpiration) + clusterProbes := &mysql.ClusterProbes{ + ClusterName: clusterName, + IgnoreHostsCount: clusterSettings.IgnoreHostsCount, + InstanceProbes: mysql.NewProbes(), + } + if clusterName == selfStoreName { + // special case: just looking at this tablet's MySQL server + // We will probe this "cluster" (of one server) is a special way. + addInstanceKey(mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes) + throttler.mysqlClusterProbesChan <- clusterProbes + return + } + if atomic.LoadInt64(&throttler.isLeader) == 0 { + // not the leader (primary tablet)? Then no more work for us. + return + } + // The primary tablet is also in charge of collecting the shard's metrics + err := func() error { tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard) if err != nil { return err } - clusterProbes := &mysql.ClusterProbes{ - ClusterName: clusterName, - IgnoreHostsCount: clusterSettings.IgnoreHostsCount, - InstanceProbes: mysql.NewProbes(), - } for _, tabletAlias := range tabletAliases { tablet, err := throttler.ts.GetTablet(ctx, tabletAlias) if err != nil { @@ -707,12 +773,22 @@ func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName return metricResultFunc() } -// Check is the main serving function of the throttler, and returns a check result for this cluster's lag -func (throttler *Throttler) Check(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { +// checkStore checks the aggregated value of given MySQL store +func (throttler *Throttler) checkStore(ctx context.Context, appName string, storeName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { if !throttler.env.Config().EnableLagThrottler { return okMetricCheckResult } - return throttler.check.Check(ctx, appName, "mysql", localStoreName, remoteAddr, flags) + return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) +} + +// Check is the main serving function of the throttler, and returns a check result for this cluster's lag; it is only applicable on a Primary tablet. +func (throttler *Throttler) Check(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { + return throttler.checkStore(ctx, appName, shardStoreName, remoteAddr, flags) +} + +// CheckSelf is checks the mysql/self metric, and is available on each tablet +func (throttler *Throttler) CheckSelf(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { + return throttler.checkStore(ctx, appName, selfStoreName, remoteAddr, flags) } // Status exports a status breakdown