Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: reduce GetStore in hot-region-scheduler (#3870) #3910

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[readLeader],
regionRead,
Expand All @@ -204,12 +206,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
{ // update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[writeLeader],
regionWrite,
write, core.LeaderKind)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[writePeer],
regionWrite,
Expand Down Expand Up @@ -254,6 +258,7 @@ func (h *hotScheduler) gcRegionPendings() {
// summaryStoresLoad Load information of all available stores.
// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
storePendings map[uint64]Influence,
storeHotPeers map[uint64][]*statistics.HotPeerStat,
Expand All @@ -266,7 +271,12 @@ func summaryStoresLoad(
allKeySum := 0.0
allCount := 0.0

for id, loads := range storesLoads {
for _, store := range stores {
id := store.GetID()
loads, ok := storesLoads[id]
if !ok {
continue
}
var byteRate, keyRate float64
switch rwTy {
case read:
Expand Down Expand Up @@ -316,6 +326,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -563,10 +574,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -701,12 +708,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand All @@ -716,8 +720,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore),
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -729,9 +733,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters = append(filters, leaderFilter)
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -741,15 +745,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster.GetOpts(), store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Expect.ByteRate &&
detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Expect.KeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,20 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()
switch typ {
case read:
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]Influence{},
cluster.RegionReadStats(),
read, core.LeaderKind)
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
case write:
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]Influence{},
cluster.RegionWriteStats(),
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down