diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 3cd21460d48..6b80ecf6fb9 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -281,9 +281,9 @@ func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.O } op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(l.GetName(), "new-operator"), - balanceDirectionCounter.WithLabelValues(l.GetName(), plan.SourceMetricLabel(), plan.TargetMetricLabel()), ) op.FinishedCounters = append(op.FinishedCounters, + balanceDirectionCounter.WithLabelValues(l.GetName(), plan.SourceMetricLabel(), plan.TargetMetricLabel()), l.counter.WithLabelValues("move-leader", plan.SourceMetricLabel()+"-out"), l.counter.WithLabelValues("move-leader", plan.TargetMetricLabel()+"-in"), ) diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 06b119e18d5..27e039f8b21 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -229,10 +229,8 @@ func (s *balanceRegionScheduler) transferPeer(plan *balancePlan) *operator.Opera } sourceLabel := strconv.FormatUint(sourceID, 10) targetLabel := strconv.FormatUint(targetID, 10) - op.Counters = append(op.Counters, - balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel), - ) op.FinishedCounters = append(op.FinishedCounters, + balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel), s.counter.WithLabelValues("move-peer", sourceLabel+"-out"), s.counter.WithLabelValues("move-peer", targetLabel+"-in"), ) diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index b5b14c9de4e..62f1b93ccc9 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -544,8 +544,8 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestSingleRangeBalance(c *C) { ops := lb.Schedule(s.tc) c.Assert(ops, NotNil) c.Assert(ops, HasLen, 1) - c.Assert(ops[0].Counters, HasLen, 3) - c.Assert(ops[0].FinishedCounters, HasLen, 2) + c.Assert(ops[0].Counters, HasLen, 2) + c.Assert(ops[0].FinishedCounters, HasLen, 3) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) c.Assert(err, IsNil) c.Assert(lb.Schedule(s.tc), IsNil) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index cc6ef31ebcd..49e50aaaca6 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" @@ -406,6 +405,9 @@ type balanceSolver struct { maxSrc *storeLoad minDst *storeLoad rankStep *storeLoad + + byteIsBetter bool + keyIsBetter bool } type solution struct { @@ -790,12 +792,16 @@ func (bs *balanceSolver) calcProgressiveRank() { case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio: // If belong to the case, both byte rate and key rate will be more balanced, the best choice. rank = -3 + bs.byteIsBetter = true + bs.keyIsBetter = true case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio: // If belong to the case, byte rate will be not worsened, key rate will be more balanced. rank = -2 + bs.keyIsBetter = true case byteHot && byteDecRatio <= greatDecRatio: // If belong to the case, byte rate will be more balanced, ignore the key rate. rank = -1 + bs.byteIsBetter = true } } log.Debug("calcProgressiveRank", @@ -974,15 +980,19 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence return nil, nil } var ( - counters []prometheus.Counter - err error + err error + typ string + sourceLabel string + targetLabel string ) switch bs.opTy { case movePeer: srcPeer := bs.cur.region.GetStorePeer(bs.cur.srcStoreID) // checked in getRegionAndSrcPeer dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, Role: srcPeer.Role} - typ := "move-peer" + sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10) + targetLabel = strconv.FormatUint(dstPeer.GetStoreId(), 10) + if bs.rwTy == read && bs.cur.region.GetLeader().StoreId == bs.cur.srcStoreID { // move read leader op, err = operator.CreateMoveLeaderOperator( "move-hot-read-leader", @@ -994,6 +1004,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence typ = "move-leader" } else { desc := "move-hot-" + bs.rwTy.String() + "-peer" + typ = "move-peer" op, err = operator.CreateMovePeerOperator( desc, bs.cluster, @@ -1002,14 +1013,13 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence bs.cur.srcStoreID, dstPeer) } - counters = append(counters, - hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStoreID, 10), "out"), - hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), strconv.FormatUint(dstPeer.GetStoreId(), 10), "in")) case transferLeader: if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil { return nil, nil } desc := "transfer-hot-" + bs.rwTy.String() + "-leader" + sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10) + targetLabel = strconv.FormatUint(bs.cur.dstStoreID, 10) op, err = operator.CreateTransferLeaderOperator( desc, bs.cluster, @@ -1017,9 +1027,6 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence bs.cur.srcStoreID, bs.cur.dstStoreID, operator.OpHotRegion) - counters = append(counters, - hotDirectionCounter.WithLabelValues("transfer-leader", bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStoreID, 10), "out"), - hotDirectionCounter.WithLabelValues("transfer-leader", bs.rwTy.String(), strconv.FormatUint(bs.cur.dstStoreID, 10), "in")) } if err != nil { @@ -1028,8 +1035,20 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence return nil, nil } + dim := "" + if bs.byteIsBetter && bs.keyIsBetter { + dim = "both" + } else if bs.byteIsBetter { + dim = "byte" + } else if bs.keyIsBetter { + dim = "key" + } + op.SetPriorityLevel(core.HighPriority) - op.Counters = append(op.Counters, counters...) + op.FinishedCounters = append(op.FinishedCounters, + hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), sourceLabel, "out", dim), + hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), targetLabel, "in", dim), + balanceDirectionCounter.WithLabelValues(bs.sche.GetName(), sourceLabel, targetLabel)) op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) diff --git a/server/schedulers/metrics.go b/server/schedulers/metrics.go index f98769a0cd2..1f8ce0c308b 100644 --- a/server/schedulers/metrics.go +++ b/server/schedulers/metrics.go @@ -93,7 +93,7 @@ var hotDirectionCounter = prometheus.NewCounterVec( Subsystem: "scheduler", Name: "hot_region_direction", Help: "Counter of hot region scheduler.", - }, []string{"type", "rw", "store", "direction"}) + }, []string{"type", "rw", "store", "direction", "dim"}) var scatterRangeLeaderCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index f085257ae1f..2f614a54eae 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -15,6 +15,7 @@ package statistics import ( "context" + "github.com/tikv/pd/server/core" ) @@ -41,8 +42,8 @@ func NewHotCache(ctx context.Context, quit <-chan struct{}) *HotCache { quit: quit, readFlowQueue: make(chan FlowItemTask, queueCap), writeFlowQueue: make(chan FlowItemTask, queueCap), - writeFlow: NewHotStoresStats(WriteFlow), - readFlow: NewHotStoresStats(ReadFlow), + writeFlow: NewHotPeerCache(WriteFlow), + readFlow: NewHotPeerCache(ReadFlow), } go w.updateItems(w.readFlowQueue, w.runReadTask) go w.updateItems(w.writeFlowQueue, w.runWriteTask) diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 4f806e97730..47886050d2b 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -62,8 +62,8 @@ type hotPeerCache struct { reportIntervalSecs int } -// NewHotStoresStats creates a HotStoresStats -func NewHotStoresStats(kind FlowKind) *hotPeerCache { +// NewHotPeerCache creates a hotPeerCache +func NewHotPeerCache(kind FlowKind) *hotPeerCache { c := &hotPeerCache{ kind: kind, peersOfStore: make(map[uint64]*TopN), diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 74e4d177135..1f8c826d397 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -29,7 +29,7 @@ var _ = Suite(&testHotPeerCache{}) type testHotPeerCache struct{} func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { - cache := NewHotStoresStats(WriteFlow) + cache := NewHotPeerCache(WriteFlow) peers := newPeers(3, func(i int) uint64 { return uint64(10000 + i) }, func(i int) uint64 { return uint64(i) }) @@ -92,7 +92,7 @@ func testCache(c *C, t *testCacheCase) { ReadFlow: 3, // all peers WriteFlow: 3, // all peers } - cache := NewHotStoresStats(t.kind) + cache := NewHotPeerCache(t.kind) region := buildRegion(nil, nil, t.kind) checkAndUpdate(c, cache, region, defaultSize[t.kind]) checkHit(c, cache, region, t.kind, false) // all peers are new @@ -226,7 +226,7 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { } func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { - cache := NewHotStoresStats(ReadFlow) + cache := NewHotPeerCache(ReadFlow) // skip interval=0 newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: ReadFlow} @@ -289,7 +289,7 @@ func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { } func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) { - cache := NewHotStoresStats(ReadFlow) + cache := NewHotPeerCache(ReadFlow) storeID := uint64(1) c.Assert(byteRate, GreaterEqual, minHotThresholds[RegionReadBytes]) for i := uint64(1); i < TopNN+10; i++ { @@ -323,7 +323,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold } func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotStoresStats(ReadFlow) + cache := NewHotPeerCache(ReadFlow) region := core.NewRegionInfo(&metapb.Region{ Id: 1, Peers: []*metapb.Peer{ diff --git a/server/statistics/store.go b/server/statistics/store.go index 24591c65824..a8f8c18bb62 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -156,7 +156,15 @@ func collect(records []*pdpb.RecordPair) float64 { func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) { statInterval := stats.GetInterval() interval := time.Duration(statInterval.GetEndTimestamp()-statInterval.GetStartTimestamp()) * time.Second - log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", interval), zap.Uint64("store-id", stats.GetStoreId())) + log.Debug("update store stats", + zap.Uint64("key-write", stats.KeysWritten), + zap.Uint64("bytes-write", stats.BytesWritten), + zap.Uint64("key-read", stats.KeysRead), + zap.Uint64("bytes-read", stats.BytesRead), + zap.Uint64("query-write", core.GetWriteQueryNum(stats.QueryStats)), + zap.Uint64("query-read", core.GetReadQueryNum(stats.QueryStats)), + zap.Duration("interval", interval), + zap.Uint64("store-id", stats.GetStoreId())) r.Lock() defer r.Unlock() readQueryNum, writeQueryNum := core.GetReadQueryNum(stats.QueryStats), core.GetWriteQueryNum(stats.QueryStats)