diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 7129676d578e..7bffd0748cbf 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -77,17 +77,18 @@ type hotScheduler struct { types []rwType r *rand.Rand - // states across multiple `Schedule` calls - pendings map[*pendingInfluence]struct{} - // regionPendings stores regionID -> Operator + // regionPendings stores regionID -> pendingInfluence // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. - regionPendings map[uint64]*operator.Operator + regionPendings map[uint64]*pendingInfluence + // store information, including pending Influence by resource type + // Every time Schedule will recalculate it. + stInfos map[uint64]*storeInfo // temporary states but exported to API or metrics + // Every time Schedule will recalculate it. stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail - // This stores the pending Influence for each store by resource type. - pendingSums map[uint64]*Influence + // config of hot scheduler conf *hotRegionSchedulerConfig } @@ -99,8 +100,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS BaseScheduler: base, types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), - pendings: map[*pendingInfluence]struct{}{}, - regionPendings: make(map[uint64]*operator.Operator), + regionPendings: make(map[uint64]*pendingInfluence), conf: conf, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { @@ -159,23 +159,20 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { + h.stInfos = summaryStoreInfos(cluster) h.summaryPendingInfluence() - - stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() { // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( - stores, + h.stInfos, storesLoads, - h.pendingSums, regionRead, read, core.LeaderKind) h.stLoadInfos[readPeer] = summaryStoresLoad( - stores, + h.stInfos, storesLoads, - h.pendingSums, regionRead, read, core.RegionKind) } @@ -183,16 +180,13 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { { // update write statistics regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( - stores, + h.stInfos, storesLoads, - h.pendingSums, regionWrite, write, core.LeaderKind) - h.stLoadInfos[writePeer] = summaryStoresLoad( - stores, + h.stInfos, storesLoads, - h.pendingSums, regionWrite, write, core.RegionKind) } @@ -200,143 +194,37 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. +// It makes each key/byte rate or count become (1+w) times to the origin value while f is the function to provide w(weight). func (h *hotScheduler) summaryPendingInfluence() { - h.pendingSums = summaryPendingInfluence(h.pendings, h.calcPendingWeight) - h.gcRegionPendings() -} - -// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have -// ended operator -func (h *hotScheduler) gcRegionPendings() { - for regionID, op := range h.regionPendings { - if op != nil && op.IsEnd() { - if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) { - log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration())) - schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() - delete(h.regionPendings, regionID) - } + storeZombieDur := h.conf.GetStoreStatZombieDuration() + regionsZombieDur := h.conf.GetRegionsStatZombieDuration() + + for id, p := range h.regionPendings { + from := h.stInfos[p.from] + to := h.stInfos[p.to] + maxZombieDur := storeZombieDur + if (from != nil && from.IsTiFlash) || (to != nil && to.IsTiFlash) { + maxZombieDur = regionsZombieDur } - } -} - -// summaryStoresLoad Load information of all available stores. -// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store -func summaryStoresLoad( - stores []*core.StoreInfo, - storesLoads map[uint64][]float64, - storePendings map[uint64]*Influence, - storeHotPeers map[uint64][]*statistics.HotPeerStat, - rwTy rwType, - kind core.ResourceKind, -) map[uint64]*storeLoadDetail { - // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) - loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) - allLoadSum := make([]float64, statistics.DimLen) - allCount := 0.0 - - // Stores without byte rate statistics is not available to schedule. - for _, store := range stores { - id := store.GetID() - storeLoads, ok := storesLoads[id] - if !ok { + weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) + + if needGC { + delete(h.regionPendings, id) + log.Debug("gc pending influence in hot region scheduler", + zap.Uint64("region-id", id), + zap.Time("create", p.op.GetCreateTime()), + zap.Time("now", time.Now()), + zap.Duration("zombie", maxZombieDur)) continue } - loads := make([]float64, statistics.DimLen) - switch rwTy { - case read: - loads[statistics.ByteDim] = storeLoads[statistics.StoreReadBytes] - loads[statistics.KeyDim] = storeLoads[statistics.StoreReadKeys] - case write: - loads[statistics.ByteDim] = storeLoads[statistics.StoreWriteBytes] - loads[statistics.KeyDim] = storeLoads[statistics.StoreWriteKeys] - } - // Find all hot peers first - var hotPeers []*statistics.HotPeerStat - { - peerLoadSum := make([]float64, statistics.DimLen) - // TODO: To remove `filterHotPeers`, we need to: - // HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader. - for _, peer := range filterHotPeers(kind, storeHotPeers[id]) { - for i := range peerLoadSum { - peerLoadSum[i] += peer.GetLoad(getRegionStatKind(rwTy, i)) - } - hotPeers = append(hotPeers, peer.Clone()) - } - // Use sum of hot peers to estimate leader-only byte rate. - // For write requests, Write{Bytes, Keys} is applied to all Peers at the same time, while the Leader and Follower are under different loads (usually the Leader consumes more CPU). - // But none of the current dimension reflect this difference, so we create a new dimension to reflect it. - if kind == core.LeaderKind && rwTy == write { - loads = peerLoadSum - } - // Metric for debug. - { - ty := "byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.ByteDim]) - } - { - ty := "key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim]) - } - } - for i := range allLoadSum { - allLoadSum[i] += loads[i] - } - allCount += float64(len(hotPeers)) - - // Build store load prediction from current load and pending influence. - stLoadPred := (&storeLoad{ - Loads: loads, - Count: float64(len(hotPeers)), - }).ToLoadPred(rwTy, storePendings[id]) - - // Construct store load info. - loadDetail[id] = &storeLoadDetail{ - Store: store, - LoadPred: stLoadPred, - HotPeers: hotPeers, - } - } - storeLen := float64(len(storesLoads)) - // store expectation byte/key rate and count for each store-load detail. - for id, detail := range loadDetail { - expectLoads := make([]float64, len(allLoadSum)) - for i := range expectLoads { - expectLoads[i] = allLoadSum[i] / storeLen + if from != nil && weight > 0 { + from.addInfluence(&p.origin, -weight) } - expectCount := allCount / storeLen - detail.LoadPred.Expect.Loads = expectLoads - detail.LoadPred.Expect.Count = expectCount - // Debug - { - ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.ByteDim]) - } - { - ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.KeyDim]) - } - { - ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount) - } - } - return loadDetail -} - -// filterHotPeers filter the peer whose hot degree is less than minHotDegress -func filterHotPeers( - kind core.ResourceKind, - peers []*statistics.HotPeerStat, -) []*statistics.HotPeerStat { - ret := make([]*statistics.HotPeerStat, 0, len(peers)) - for _, peer := range peers { - if kind == core.LeaderKind && !peer.IsLeader() { - continue + if to != nil && weight > 0 { + to.addInfluence(&p.origin, weight) } - ret = append(ret, peer) } - return ret } func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool { @@ -348,8 +236,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS } influence := newPendingInfluence(op, srcStore, dstStore, infl) - h.pendings[influence] = struct{}{} - h.regionPendings[regionID] = op + h.regionPendings[regionID] = influence schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() return true @@ -538,13 +425,26 @@ func (bs *balanceSolver) solve() []*operator.Operator { // its expectation * ratio, the store would be selected as hot source store func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail) + confSrcToleranceRatio := bs.sche.conf.GetSrcToleranceRatio() + confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for id, detail := range bs.stLoadDetail { + srcToleranceRatio := confSrcToleranceRatio + if detail.Info.IsTiFlash { + if !confEnableForTiFlash { + continue + } + if bs.rwTy != write || bs.opTy != movePeer { + continue + } + srcToleranceRatio = 0 + } if len(detail.HotPeers) == 0 { continue } + minLoad := detail.LoadPred.min() if slice.AllOf(minLoad.Loads, func(i int) bool { - return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.Loads[i] + return minLoad.Loads[i] > srcToleranceRatio*detail.LoadPred.Expect.Loads[i] }) { ret[id] = detail hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() @@ -629,10 +529,11 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { return false } - if op, ok := bs.sche.regionPendings[region.GetID()]; ok { + if influence, ok := bs.sche.regionPendings[region.GetID()]; ok { if bs.opTy == transferLeader { return false } + op := influence.op if op.Kind()&operator.OpRegion != 0 || (op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) { return false @@ -684,7 +585,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filters []filter.Filter candidates []*storeLoadDetail ) - srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store + srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Info.Store switch bs.opTy { case movePeer: filters = []filter.Filter{ @@ -721,9 +622,20 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail, len(candidates)) - dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() + confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() + confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for _, detail := range candidates { - store := detail.Store + store := detail.Info.Store + dstToleranceRatio := confDstToleranceRatio + if detail.Info.IsTiFlash { + if !confEnableForTiFlash { + continue + } + if bs.rwTy != write || bs.opTy != movePeer { + continue + } + dstToleranceRatio = 0 + } if filter.Target(bs.cluster.GetOpts(), store, filters) { maxLoads := detail.LoadPred.max().Loads if slice.AllOf(maxLoads, func(i int) bool { @@ -1057,40 +969,52 @@ func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos { func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence { h.RLock() defer h.RUnlock() - ret := make(map[uint64]*Influence, len(h.pendingSums)) - for id, infl := range h.pendingSums { - ret[id] = infl.add(infl, 0) // copy + ret := make(map[uint64]*Influence, len(h.stInfos)) + for id, info := range h.stInfos { + if info.PendingSum != nil { + ret[id] = info.PendingSum + } } return ret } -// calcPendingWeight return the calculate weight of one Operator, the value will between [0,1] -func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 { - if op.CheckExpired() || op.CheckTimeout() { - return 0 - } +// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1] +func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) { status := op.Status() + switch { + case op.CheckExpired(): + status = operator.EXPIRED + case op.CheckTimeout(): + status = operator.TIMEOUT + } + if !operator.IsEndStatus(status) { - return 1 + return 1, false + } + + zombieDur := time.Since(op.GetReachTimeOf(status)) + if zombieDur >= maxZombieDur { + weight = 0 + } else { + // TODO: use store statistics update time to make a more accurate estimation + // weight = float64(maxZombieDur-zombieDur) / float64(maxZombieDur) + weight = 1 } + switch status { case operator.SUCCESS: - zombieDur := time.Since(op.GetReachTimeOf(status)) - maxZombieDur := h.conf.GetMaxZombieDuration() - if zombieDur >= maxZombieDur { - return 0 - } - // TODO: use store statistics update time to make a more accurate estimation - return float64(maxZombieDur-zombieDur) / float64(maxZombieDur) + return weight, weight == 0 + case operator.CANCELED, operator.REPLACED, operator.TIMEOUT: + return 0, weight == 0 + case operator.EXPIRED: + fallthrough default: - return 0 + return 0, true } } func (h *hotScheduler) clearPendingInfluence() { - h.pendings = map[*pendingInfluence]struct{}{} - h.pendingSums = nil - h.regionPendings = make(map[uint64]*operator.Operator) + h.regionPendings = make(map[uint64]*pendingInfluence) } // rwType : the perspective of balance diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index afaa261cce2a..17bf1af3e525 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -36,14 +36,15 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MinHotByteRate: 100, MinHotKeyRate: 10, MaxZombieRounds: 3, + MaxPeerNum: 1000, ByteRateRankStepRatio: 0.05, KeyRateRankStepRatio: 0.05, CountRankStepRatio: 0.01, GreatDecRatio: 0.95, MinorDecRatio: 0.99, - MaxPeerNum: 1000, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference + EnableForTiFlash: true, } } @@ -65,6 +66,9 @@ type hotRegionSchedulerConfig struct { MinorDecRatio float64 `json:"minor-dec-ratio"` SrcToleranceRatio float64 `json:"src-tolerance-ratio"` DstToleranceRatio float64 `json:"dst-tolerance-ratio"` + + // Separately control whether to start hotspot scheduling for TiFlash + EnableForTiFlash bool `json:"enable-for-tiflash"` } func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { @@ -73,12 +77,19 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { return schedule.EncodeConfig(conf) } -func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second } +func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration { + conf.RLock() + defer conf.RUnlock() + // Since RegionsStatsRollingWindowsSize is very large, multiply by 2. + return time.Duration(conf.MaxZombieRounds) * 2 * statistics.RegionHeartBeatReportInterval * time.Second +} + func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { conf.RLock() defer conf.RUnlock() @@ -151,6 +162,12 @@ func (conf *hotRegionSchedulerConfig) GetMinHotByteRate() float64 { return conf.MinHotByteRate } +func (conf *hotRegionSchedulerConfig) GetEnableForTiFlash() bool { + conf.RLock() + defer conf.RUnlock() + return conf.EnableForTiFlash +} + func (conf *hotRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() router.HandleFunc("/list", conf.handleGetConfig).Methods("GET") @@ -214,5 +231,4 @@ func (conf *hotRegionSchedulerConfig) persist() error { } return conf.storage.SaveScheduleConfig(HotRegionName, data) - } diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 24448bc310f1..334c3210183a 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -82,7 +82,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { c.Assert(err, IsNil) hb := sche.(*hotScheduler) - notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { + notDoneOp := func(region *core.RegionInfo, ty opType) *pendingInfluence { var op *operator.Operator var err error switch ty { @@ -93,40 +93,40 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } c.Assert(err, IsNil) c.Assert(op, NotNil) - return op + return newPendingInfluence(op, 2, 4, Influence{}) } - doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { - op := notDoneOp(region, ty) - op.Cancel() - return op + doneOp := func(region *core.RegionInfo, ty opType) *pendingInfluence { + infl := notDoneOp(region, ty) + infl.op.Cancel() + return infl } - shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator { - op := doneOp(region, ty) - operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) - return op + shouldRemoveOp := func(region *core.RegionInfo, ty opType) *pendingInfluence { + infl := doneOp(region, ty) + operator.SetOperatorStatusReachTime(infl.op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) + return infl } - opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp} + opCreaters := [3]func(region *core.RegionInfo, ty opType) *pendingInfluence{shouldRemoveOp, notDoneOp, doneOp} typs := []opType{movePeer, transferLeader} - for i := 0; i < len(opCreaters); i++ { + for i, opCreator := range opCreaters { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) + regionID := uint64(i*len(typs) + j + 1) region := newTestRegion(regionID) - hb.regionPendings[regionID] = opCreaters[i](region, typ) + hb.regionPendings[regionID] = opCreator(region, typ) } } - hb.gcRegionPendings() + hb.summaryPendingInfluence() // Calling this function will GC. - for i := 0; i < len(opCreaters); i++ { + for i := range opCreaters { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) + regionID := uint64(i*len(typs) + j + 1) if i < 1 { // shouldRemoveOp c.Assert(hb.regionPendings, Not(HasKey), regionID) } else { // notDoneOp, doneOp c.Assert(hb.regionPendings, HasKey, regionID) - kind := hb.regionPendings[regionID].Kind() + kind := hb.regionPendings[regionID].op.Kind() switch typ { case transferLeader: c.Assert(kind&operator.OpLeader != 0, IsTrue) @@ -1373,15 +1373,15 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { op := hb.Schedule(tc)[0] c.Assert(op, NotNil) hb.(*hotScheduler).summaryPendingInfluence() - pendingInfluence := hb.(*hotScheduler).pendingSums - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionWriteKeys], -0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionWriteBytes], -0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionWriteKeys], 0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionWriteBytes], 0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionReadKeys], -0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionReadBytes], -0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionReadKeys], 0.5*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionReadBytes], 0.5*MB), IsTrue) + stInfos := hb.(*hotScheduler).stInfos + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteKeys], -0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteBytes], -0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionWriteKeys], 0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionWriteBytes], 0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionReadKeys], -0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionReadBytes], -0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionReadKeys], 0.5*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionReadBytes], 0.5*MB), IsTrue) addRegionInfo(tc, write, []testRegionInfo{ {2, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, @@ -1399,16 +1399,16 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { op = hb.Schedule(tc)[0] c.Assert(op, NotNil) hb.(*hotScheduler).summaryPendingInfluence() - pendingInfluence = hb.(*hotScheduler).pendingSums + stInfos = hb.(*hotScheduler).stInfos // assert read/write influence is the sum of write peer and write leader - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionWriteKeys], -1.2*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionWriteBytes], -1.2*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[3].Loads[statistics.RegionWriteKeys], 0.7*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[3].Loads[statistics.RegionWriteBytes], 0.7*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionReadKeys], -1.2*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[1].Loads[statistics.RegionReadBytes], -1.2*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[3].Loads[statistics.RegionReadKeys], 0.7*MB), IsTrue) - c.Assert(nearlyAbout(pendingInfluence[3].Loads[statistics.RegionReadBytes], 0.7*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteKeys], -1.2*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteBytes], -1.2*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[3].PendingSum.Loads[statistics.RegionWriteKeys], 0.7*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[3].PendingSum.Loads[statistics.RegionWriteBytes], 0.7*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionReadKeys], -1.2*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionReadBytes], -1.2*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[3].PendingSum.Loads[statistics.RegionReadKeys], 0.7*MB), IsTrue) + c.Assert(nearlyAbout(stInfos[3].PendingSum.Loads[statistics.RegionReadBytes], 0.7*MB), IsTrue) } func nearlyAbout(f1, f2 float64) bool { diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 8d7b749ba029..7f997a6b2964 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -132,22 +132,20 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator { - stores := cluster.GetStores() + storeInfos := summaryStoreInfos(cluster) storesLoads := cluster.GetStoresLoads() switch typ { case read: s.stLoadInfos[readLeader] = summaryStoresLoad( - stores, + storeInfos, storesLoads, - map[uint64]*Influence{}, cluster.RegionReadStats(), read, core.LeaderKind) return s.randomSchedule(cluster, s.stLoadInfos[readLeader]) case write: s.stLoadInfos[writeLeader] = summaryStoresLoad( - stores, + storeInfos, storesLoads, - map[uint64]*Influence{}, cluster.RegionWriteStats(), write, core.LeaderKind) return s.randomSchedule(cluster, s.stLoadInfos[writeLeader]) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index bb9d3a7654dd..e2a76ecd0fe4 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -14,6 +14,7 @@ package schedulers import ( + "fmt" "math" "net/url" "strconv" @@ -207,16 +208,6 @@ type Influence struct { Count float64 } -func (lhs *Influence) add(rhs *Influence, w float64) *Influence { - var infl Influence - for i := range lhs.Loads { - infl.Loads = append(infl.Loads, lhs.Loads[i]+rhs.Loads[i]*w) - } - infl.Count = infl.Count + rhs.Count*w - return &infl -} - -// TODO: merge it into OperatorInfluence. type pendingInfluence struct { op *operator.Operator from, to uint64 @@ -232,31 +223,7 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) } } -// summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence -// It makes each key/byte rate or count become (1+w) times to the origin value while f is the function to provide w(weight) -func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence { - ret := make(map[uint64]*Influence) - for p := range pendings { - w := f(p.op) - if w == 0 { - delete(pendings, p) - } - if _, ok := ret[p.to]; !ok { - ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))} - } - ret[p.to] = ret[p.to].add(&p.origin, w) - if _, ok := ret[p.from]; !ok { - ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))} - } - ret[p.from] = ret[p.from].add(&p.origin, -w) - } - return ret -} - -type storeLoad struct { - Loads []float64 - Count float64 -} +type storeLoad Influence func (load storeLoad) ToLoadPred(rwTy rwType, infl *Influence) *storeLoadPred { future := storeLoad{ @@ -407,8 +374,44 @@ func maxLoad(a, b *storeLoad) *storeLoad { } } +type storeInfo struct { + Store *core.StoreInfo + IsTiFlash bool + PendingSum *Influence +} + +func summaryStoreInfos(cluster opt.Cluster) map[uint64]*storeInfo { + stores := cluster.GetStores() + infos := make(map[uint64]*storeInfo, len(stores)) + for _, store := range stores { + info := &storeInfo{ + Store: store, + IsTiFlash: core.IsTiFlashStore(store.GetMeta()), + PendingSum: nil, + } + infos[store.GetID()] = info + } + return infos +} + +func (s *storeInfo) addInfluence(infl *Influence, w float64) { + if infl == nil || w == 0 { + return + } + if s.PendingSum == nil { + s.PendingSum = &Influence{ + Loads: make([]float64, len(infl.Loads)), + Count: 0, + } + } + for i, load := range infl.Loads { + s.PendingSum.Loads[i] += load * w + } + s.PendingSum.Count += infl.Count * w +} + type storeLoadDetail struct { - Store *core.StoreInfo + Info *storeInfo LoadPred *storeLoadPred HotPeers []*statistics.HotPeerStat } @@ -466,3 +469,134 @@ func toHotPeerStatShow(p *statistics.HotPeerStat, kind rwType) statistics.HotPee LastUpdateTime: p.LastUpdateTime, } } + +// summaryStoresLoad Load information of all available stores. +// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store +func summaryStoresLoad( + storeInfos map[uint64]*storeInfo, + storesLoads map[uint64][]float64, + storeHotPeers map[uint64][]*statistics.HotPeerStat, + rwTy rwType, + kind core.ResourceKind, +) map[uint64]*storeLoadDetail { + // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) + loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) + allLoadSum := make([]float64, statistics.DimLen) + allCount := 0.0 + + // Stores without byte rate statistics is not available to schedule. + for _, info := range storeInfos { + store := info.Store + id := store.GetID() + storeLoads, ok := storesLoads[id] + if !ok { + continue + } + isTiFlash := core.IsTiFlashStore(store.GetMeta()) + loads := make([]float64, statistics.DimLen) + switch rwTy { + case read: + loads[statistics.ByteDim] = storeLoads[statistics.StoreReadBytes] + loads[statistics.KeyDim] = storeLoads[statistics.StoreReadKeys] + case write: + if isTiFlash { + loads[statistics.ByteDim] = storeLoads[statistics.StoreRegionsWriteBytes] + loads[statistics.KeyDim] = storeLoads[statistics.StoreRegionsWriteKeys] + } else { + loads[statistics.ByteDim] = storeLoads[statistics.StoreWriteBytes] + loads[statistics.KeyDim] = storeLoads[statistics.StoreWriteKeys] + } + } + + // Find all hot peers first + var hotPeers []*statistics.HotPeerStat + { + peerLoadSum := make([]float64, statistics.DimLen) + // TODO: To remove `filterHotPeers`, we need to: + // HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader. + for _, peer := range filterHotPeers(kind, storeHotPeers[id]) { + for i := range peerLoadSum { + peerLoadSum[i] += peer.GetLoad(getRegionStatKind(rwTy, i)) + } + hotPeers = append(hotPeers, peer.Clone()) + } + // Use sum of hot peers to estimate leader-only byte rate. + // For write requests, Write{Bytes, Keys} is applied to all Peers at the same time, while the Leader and Follower are under different loads (usually the Leader consumes more CPU). + // But none of the current dimension reflect this difference, so we create a new dimension to reflect it. + if kind == core.LeaderKind && rwTy == write { + loads = peerLoadSum + } + + // Metric for debug. + { + ty := "byte-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.ByteDim]) + } + { + ty := "key-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim]) + } + } + + if !isTiFlash { + // The TiFlash flow is isolated from TiKV, so it is not counted in the sum. + for i := range allLoadSum { + allLoadSum[i] += loads[i] + } + } + allCount += float64(len(hotPeers)) + + // Build store load prediction from current load and pending influence. + stLoadPred := (&storeLoad{ + Loads: loads, + Count: float64(len(hotPeers)), + }).ToLoadPred(rwTy, info.PendingSum) + + // Construct store load info. + loadDetail[id] = &storeLoadDetail{ + Info: info, + LoadPred: stLoadPred, + HotPeers: hotPeers, + } + } + storeLen := float64(len(storesLoads)) + // store expectation byte/key rate and count for each store-load detail. + for id, detail := range loadDetail { + expectLoads := make([]float64, len(allLoadSum)) + for i := range expectLoads { + expectLoads[i] = allLoadSum[i] / storeLen + } + expectCount := allCount / storeLen + detail.LoadPred.Expect.Loads = expectLoads + detail.LoadPred.Expect.Count = expectCount + // Debug + { + ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.ByteDim]) + } + { + ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.KeyDim]) + } + { + ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount) + } + } + return loadDetail +} + +// filterHotPeers filter the peer whose hot degree is less than minHotDegress +func filterHotPeers( + kind core.ResourceKind, + peers []*statistics.HotPeerStat, +) []*statistics.HotPeerStat { + ret := make([]*statistics.HotPeerStat, 0, len(peers)) + for _, peer := range peers { + if kind == core.LeaderKind && !peer.IsLeader() { + continue + } + ret = append(ret, peer) + } + return ret +} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 805cdfa6ce65..629b5c3aa61b 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -268,6 +268,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { "minor-dec-ratio": 0.99, "src-tolerance-ratio": 1.05, "dst-tolerance-ratio": 1.05, + "enable-for-tiflash": true, } c.Assert(conf, DeepEquals, expected1) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil)