Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: balance-hot-region-scheduler supports TiFlash hot-write #3900

Merged
merged 19 commits into from
Aug 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,15 @@ func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.Regio
}

// LoadRegion puts region info without leader
func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {
func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) {
// regions load from etcd will have no leader
r := mc.newMockRegionInfo(regionID, 0, followerIds...).Clone(core.WithLeader(nil))
r := mc.newMockRegionInfo(regionID, 0, peerStoreIDs...).Clone(core.WithLeader(nil))
mc.PutRegion(r)
}

// GetStoresLoads gets stores load statistics.
func (mc *Cluster) GetStoresLoads() map[uint64][]float64 {
mc.HotStat.FilterUnhealthyStore(mc)
return mc.HotStat.GetStoresLoads()
}

Expand Down Expand Up @@ -304,8 +305,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, followerStoreIDs...)
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(defaultRegionSize/mb), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
Expand All @@ -320,8 +321,8 @@ func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderStoreID uint64, f
}

// AddLeaderRegionWithRange adds region with specified leader, followers and key range.
func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) {
o := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, LeaderStoreID uint64, otherPeerStoreIDs ...uint64) {
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
o := mc.newMockRegionInfo(regionID, LeaderStoreID, otherPeerStoreIDs...)
r := o.Clone(
core.WithStartKey([]byte(startKey)),
core.WithEndKey([]byte(endKey)),
Expand All @@ -331,11 +332,11 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en

// AddRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddRegionWithReadInfo(
regionID uint64, leaderID uint64,
regionID uint64, LeaderStoreID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, LeaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand All @@ -356,9 +357,9 @@ func (mc *Cluster) AddRegionWithReadInfo(
}

// AddRegionWithPeerReadInfo adds region with specified peer read info.
func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderID, targetStoreID, readBytes, readKeys, reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, LeaderStoreID, targetStoreID, readBytes, readKeys, reportInterval uint64,
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, LeaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
Expand All @@ -379,11 +380,11 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderID, targetStoreID,

// AddRegionLeaderWithReadInfo add region leader read info
func (mc *Cluster) AddRegionLeaderWithReadInfo(
regionID uint64, leaderID uint64,
regionID uint64, LeaderStoreID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, LeaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand All @@ -405,11 +406,11 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(

// AddLeaderRegionWithWriteInfo adds region with specified leader and peers write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
regionID uint64, LeaderStoreID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, LeaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand Down Expand Up @@ -623,8 +624,17 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
var followerStoreIDs []uint64
var learnerStoreIDs []uint64
for _, storeID := range otherPeerStoreIDs {
if store := mc.GetStore(storeID); store != nil && core.IsTiFlashStore(store.GetMeta()) {
learnerStoreIDs = append(learnerStoreIDs, storeID)
} else {
followerStoreIDs = append(followerStoreIDs, storeID)
}
}
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, learnerStoreIDs, nil)
}

// CheckLabelProperty checks label property.
Expand Down Expand Up @@ -797,3 +807,9 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.
}
return items
}

// ObserveRegionsStats records the current stores stats from region stats.
func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}
12 changes: 8 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type hotScheduler struct {
// Every time Schedule will recalculate it.
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
stInfos map[uint64]*storeSummaryInfo
// temporary states but exported to API or metrics
// Every time Schedule will recalculate it.
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail

// config of hot scheduler
Expand Down Expand Up @@ -488,14 +488,18 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcDetail.Info.IsTiFlash != bs.best.dstDetail.Info.IsTiFlash {
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
schedulerCounter.WithLabelValues(bs.sche.GetName(), "not-same-engine").Inc()
return false
}
// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
case bs.rwTy == write && bs.opTy == movePeer:
if bs.best.srcDetail.Info.IsTiFlash || bs.best.dstDetail.Info.IsTiFlash {
if bs.best.srcDetail.Info.IsTiFlash {
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
} else {
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
Expand Down Expand Up @@ -734,9 +738,9 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
id := store.GetID()
if bs.checkDstByPriorityAndTolerance(detail.LoadPred.max(), &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc()
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (conf *hotRegionSchedulerConfig) GetEnableForTiFlash() bool {
return conf.EnableForTiFlash
}

func (conf *hotRegionSchedulerConfig) SetEnableForTiFlash(enable bool) {
conf.RLock()
defer conf.RUnlock()
conf.EnableForTiFlash = enable
}

func (conf *hotRegionSchedulerConfig) GetMinHotQueryRate() float64 {
conf.RLock()
defer conf.RUnlock()
Expand Down
Loading