Skip to content

Commit

Permalink
scheduler: reduce GetStore in hot-region-scheduler (#3870) (#3909)
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <[email protected]>

Co-authored-by: HunDunDM <[email protected]>
  • Loading branch information
ti-chi-bot and HunDunDM authored Sep 6, 2021
1 parent f94e000 commit 38aaf9a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
43 changes: 23 additions & 20 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesStat := cluster.GetStoresStats()

minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
{ // update read statistics
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
storeKey := storesStat.GetStoresKeysReadStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, read)
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[readLeader],
Expand All @@ -213,6 +214,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
storeKey := storesStat.GetStoresKeysWriteStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, write)
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writeLeader],
Expand All @@ -222,6 +224,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
write, core.LeaderKind, mixed)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writePeer],
Expand Down Expand Up @@ -291,6 +294,7 @@ func (h *hotScheduler) gcRegionPendings() {

// Load information of all available stores.
func summaryStoresLoad(
stores []*core.StoreInfo,
storeByteRate map[uint64]float64,
storeKeyRate map[uint64]float64,
pendings map[uint64]Influence,
Expand All @@ -307,7 +311,12 @@ func summaryStoresLoad(
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, byteRate := range storeByteRate {
for _, store := range stores {
id := store.GetID()
byteRate, ok := storeByteRate[id]
if !ok {
continue
}
keyRate := storeKeyRate[id]

// Find all hot peers first
Expand Down Expand Up @@ -349,6 +358,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -611,10 +621,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -744,18 +750,15 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
switch bs.opTy {
case movePeer:
var scoreGuard filter.Filter
if bs.cluster.IsPlacementRulesEnabled() {
scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.cur.region, bs.cur.srcStoreID)
} else {
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.cur.region), srcStore)
}

Expand All @@ -767,8 +770,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
scoreGuard,
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -778,9 +781,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion),
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -790,15 +793,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster, store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Future.ExpKeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
}
balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesStats := cluster.GetStoresStats()
minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
switch typ {
case read:
hotRegionThreshold := getHotRegionThreshold(storesStats, read)
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesReadStat(),
storesStats.GetStoresKeysReadStat(),
map[uint64]Influence{},
Expand All @@ -149,6 +151,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []
case write:
hotRegionThreshold := getHotRegionThreshold(storesStats, write)
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesWriteStat(),
storesStats.GetStoresKeysWriteStat(),
map[uint64]Influence{},
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down

0 comments on commit 38aaf9a

Please sign in to comment.