Skip to content

Commit

Permalink
scheduler: add some log and metrics (#3895)
Browse files Browse the repository at this point in the history
* add some log and metrics

Signed-off-by: lhy1024 <[email protected]>

* fix test

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
lhy1024 and ti-chi-bot authored Jul 27, 2021
1 parent 0bcadb6 commit d02f0a9
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 28 deletions.
2 changes: 1 addition & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.O
}
op.Counters = append(op.Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
balanceDirectionCounter.WithLabelValues(l.GetName(), plan.SourceMetricLabel(), plan.TargetMetricLabel()),
)
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(l.GetName(), plan.SourceMetricLabel(), plan.TargetMetricLabel()),
l.counter.WithLabelValues("move-leader", plan.SourceMetricLabel()+"-out"),
l.counter.WithLabelValues("move-leader", plan.TargetMetricLabel()+"-in"),
)
Expand Down
4 changes: 1 addition & 3 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ func (s *balanceRegionScheduler) transferPeer(plan *balancePlan) *operator.Opera
}
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
op.Counters = append(op.Counters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
)
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
s.counter.WithLabelValues("move-peer", sourceLabel+"-out"),
s.counter.WithLabelValues("move-peer", targetLabel+"-in"),
)
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestSingleRangeBalance(c *C) {
ops := lb.Schedule(s.tc)
c.Assert(ops, NotNil)
c.Assert(ops, HasLen, 1)
c.Assert(ops[0].Counters, HasLen, 3)
c.Assert(ops[0].FinishedCounters, HasLen, 2)
c.Assert(ops[0].Counters, HasLen, 2)
c.Assert(ops[0].FinishedCounters, HasLen, 3)
lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"}))
c.Assert(err, IsNil)
c.Assert(lb.Schedule(s.tc), IsNil)
Expand Down
41 changes: 30 additions & 11 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -406,6 +405,9 @@ type balanceSolver struct {
maxSrc *storeLoad
minDst *storeLoad
rankStep *storeLoad

byteIsBetter bool
keyIsBetter bool
}

type solution struct {
Expand Down Expand Up @@ -790,12 +792,16 @@ func (bs *balanceSolver) calcProgressiveRank() {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, both byte rate and key rate will be more balanced, the best choice.
rank = -3
bs.byteIsBetter = true
bs.keyIsBetter = true
case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be not worsened, key rate will be more balanced.
rank = -2
bs.keyIsBetter = true
case byteHot && byteDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be more balanced, ignore the key rate.
rank = -1
bs.byteIsBetter = true
}
}
log.Debug("calcProgressiveRank",
Expand Down Expand Up @@ -974,15 +980,19 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
return nil, nil
}
var (
counters []prometheus.Counter
err error
err error
typ string
sourceLabel string
targetLabel string
)

switch bs.opTy {
case movePeer:
srcPeer := bs.cur.region.GetStorePeer(bs.cur.srcStoreID) // checked in getRegionAndSrcPeer
dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, Role: srcPeer.Role}
typ := "move-peer"
sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10)
targetLabel = strconv.FormatUint(dstPeer.GetStoreId(), 10)

if bs.rwTy == read && bs.cur.region.GetLeader().StoreId == bs.cur.srcStoreID { // move read leader
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
Expand All @@ -994,6 +1004,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
typ = "move-leader"
} else {
desc := "move-hot-" + bs.rwTy.String() + "-peer"
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
desc,
bs.cluster,
Expand All @@ -1002,24 +1013,20 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
bs.cur.srcStoreID,
dstPeer)
}
counters = append(counters,
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStoreID, 10), "out"),
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), strconv.FormatUint(dstPeer.GetStoreId(), 10), "in"))
case transferLeader:
if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil {
return nil, nil
}
desc := "transfer-hot-" + bs.rwTy.String() + "-leader"
sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10)
targetLabel = strconv.FormatUint(bs.cur.dstStoreID, 10)
op, err = operator.CreateTransferLeaderOperator(
desc,
bs.cluster,
bs.cur.region,
bs.cur.srcStoreID,
bs.cur.dstStoreID,
operator.OpHotRegion)
counters = append(counters,
hotDirectionCounter.WithLabelValues("transfer-leader", bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStoreID, 10), "out"),
hotDirectionCounter.WithLabelValues("transfer-leader", bs.rwTy.String(), strconv.FormatUint(bs.cur.dstStoreID, 10), "in"))
}

if err != nil {
Expand All @@ -1028,8 +1035,20 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
return nil, nil
}

dim := ""
if bs.byteIsBetter && bs.keyIsBetter {
dim = "both"
} else if bs.byteIsBetter {
dim = "byte"
} else if bs.keyIsBetter {
dim = "key"
}

op.SetPriorityLevel(core.HighPriority)
op.Counters = append(op.Counters, counters...)
op.FinishedCounters = append(op.FinishedCounters,
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), sourceLabel, "out", dim),
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), targetLabel, "in", dim),
balanceDirectionCounter.WithLabelValues(bs.sche.GetName(), sourceLabel, targetLabel))
op.Counters = append(op.Counters,
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var hotDirectionCounter = prometheus.NewCounterVec(
Subsystem: "scheduler",
Name: "hot_region_direction",
Help: "Counter of hot region scheduler.",
}, []string{"type", "rw", "store", "direction"})
}, []string{"type", "rw", "store", "direction", "dim"})

var scatterRangeLeaderCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
5 changes: 3 additions & 2 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package statistics

import (
"context"

"github.com/tikv/pd/server/core"
)

Expand All @@ -41,8 +42,8 @@ func NewHotCache(ctx context.Context, quit <-chan struct{}) *HotCache {
quit: quit,
readFlowQueue: make(chan FlowItemTask, queueCap),
writeFlowQueue: make(chan FlowItemTask, queueCap),
writeFlow: NewHotStoresStats(WriteFlow),
readFlow: NewHotStoresStats(ReadFlow),
writeFlow: NewHotPeerCache(WriteFlow),
readFlow: NewHotPeerCache(ReadFlow),
}
go w.updateItems(w.readFlowQueue, w.runReadTask)
go w.updateItems(w.writeFlowQueue, w.runWriteTask)
Expand Down
4 changes: 2 additions & 2 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type hotPeerCache struct {
reportIntervalSecs int
}

// NewHotStoresStats creates a HotStoresStats
func NewHotStoresStats(kind FlowKind) *hotPeerCache {
// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(kind FlowKind) *hotPeerCache {
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
Expand Down
10 changes: 5 additions & 5 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Suite(&testHotPeerCache{})
type testHotPeerCache struct{}

func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) {
cache := NewHotStoresStats(WriteFlow)
cache := NewHotPeerCache(WriteFlow)
peers := newPeers(3,
func(i int) uint64 { return uint64(10000 + i) },
func(i int) uint64 { return uint64(i) })
Expand Down Expand Up @@ -92,7 +92,7 @@ func testCache(c *C, t *testCacheCase) {
ReadFlow: 3, // all peers
WriteFlow: 3, // all peers
}
cache := NewHotStoresStats(t.kind)
cache := NewHotPeerCache(t.kind)
region := buildRegion(nil, nil, t.kind)
checkAndUpdate(c, cache, region, defaultSize[t.kind])
checkHit(c, cache, region, t.kind, false) // all peers are new
Expand Down Expand Up @@ -226,7 +226,7 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer {
}

func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) {
cache := NewHotStoresStats(ReadFlow)
cache := NewHotPeerCache(ReadFlow)

// skip interval=0
newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: ReadFlow}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) {
}

func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) {
cache := NewHotStoresStats(ReadFlow)
cache := NewHotPeerCache(ReadFlow)
storeID := uint64(1)
c.Assert(byteRate, GreaterEqual, minHotThresholds[RegionReadBytes])
for i := uint64(1); i < TopNN+10; i++ {
Expand Down Expand Up @@ -323,7 +323,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold
}

func BenchmarkCheckRegionFlow(b *testing.B) {
cache := NewHotStoresStats(ReadFlow)
cache := NewHotPeerCache(ReadFlow)
region := core.NewRegionInfo(&metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
Expand Down
10 changes: 9 additions & 1 deletion server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,15 @@ func collect(records []*pdpb.RecordPair) float64 {
func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
statInterval := stats.GetInterval()
interval := time.Duration(statInterval.GetEndTimestamp()-statInterval.GetStartTimestamp()) * time.Second
log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", interval), zap.Uint64("store-id", stats.GetStoreId()))
log.Debug("update store stats",
zap.Uint64("key-write", stats.KeysWritten),
zap.Uint64("bytes-write", stats.BytesWritten),
zap.Uint64("key-read", stats.KeysRead),
zap.Uint64("bytes-read", stats.BytesRead),
zap.Uint64("query-write", core.GetWriteQueryNum(stats.QueryStats)),
zap.Uint64("query-read", core.GetReadQueryNum(stats.QueryStats)),
zap.Duration("interval", interval),
zap.Uint64("store-id", stats.GetStoreId()))
r.Lock()
defer r.Unlock()
readQueryNum, writeQueryNum := core.GetReadQueryNum(stats.QueryStats), core.GetWriteQueryNum(stats.QueryStats)
Expand Down

0 comments on commit d02f0a9

Please sign in to comment.