diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9f1b05678ae..ef29f6c71a6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -573,20 +573,25 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RLock() - origin, err := c.core.PreCheckPutRegion(region) + hotStat := c.hotStat + storage := c.storage + coreCluster := c.core + c.RUnlock() + + origin, err := coreCluster.PreCheckPutRegion(region) if err != nil { - c.RUnlock() return err } - expiredStats := c.hotStat.ExpiredItems(region) + + expiredStats := hotStat.ExpiredItems(region) // Put expiredStats into read/write queue to update stats if len(expiredStats) > 0 { for _, stat := range expiredStats { item := statistics.NewExpiredStatItem(stat) if stat.Kind == statistics.WriteFlow { - c.hotStat.CheckWriteAsync(item) + hotStat.CheckWriteAsync(item) } else { - c.hotStat.CheckReadAsync(item) + hotStat.CheckReadAsync(item) } } } @@ -595,9 +600,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) item := statistics.NewPeerInfoItem(peerInfo, region) - c.hotStat.CheckWriteAsync(item) + hotStat.CheckWriteAsync(item) } - c.RUnlock() // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. @@ -659,8 +663,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } // Once flow has changed, will update the cache. // Because keys and bytes are strongly related, only bytes are judged. - if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || - region.GetRoundBytesRead() != origin.GetRoundBytesRead()) { + if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || + region.GetRoundBytesRead() != origin.GetRoundBytesRead() { saveCache, needSync = true, true } @@ -679,6 +683,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { time.Sleep(500 * time.Millisecond) }) + var overlaps []*core.RegionInfo c.Lock() if saveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, @@ -689,17 +694,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.Unlock() return err } - overlaps := c.core.PutRegion(region) - if c.storage != nil { - for _, item := range overlaps { - if err := c.storage.DeleteRegion(item.GetMeta()); err != nil { - log.Error("failed to delete region from storage", - zap.Uint64("region-id", item.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), - errs.ZapError(err)) - } - } - } + overlaps = c.core.PutRegion(region) for _, item := range overlaps { if c.regionStats != nil { c.regionStats.ClearDefunctRegion(item.GetID()) @@ -730,24 +725,38 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if c.regionStats != nil { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } + + changedRegions := c.changedRegions + c.Unlock() - // If there are concurrent heartbeats from the same region, the last write will win even if - // writes to storage in the critical area. So don't use mutex to protect it. - if saveKV && c.storage != nil { - if err := c.storage.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to storage is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - log.Error("failed to save region to storage", - zap.Uint64("region-id", region.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - errs.ZapError(err)) + if storage != nil { + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + // Not successfully saved to storage is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + for _, item := range overlaps { + if err := storage.DeleteRegion(item.GetMeta()); err != nil { + log.Error("failed to delete region from storage", + zap.Uint64("region-id", item.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), + errs.ZapError(err)) + } + } + if saveKV { + if err := storage.SaveRegion(region.GetMeta()); err != nil { + log.Error("failed to save region to storage", + zap.Uint64("region-id", region.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + errs.ZapError(err)) + } + regionEventCounter.WithLabelValues("update_kv").Inc() } - regionEventCounter.WithLabelValues("update_kv").Inc() } + if saveKV || needSync { select { - case c.changedRegions <- region: + case changedRegions <- region: default: } } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ed785d08609..d63a8370ae3 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -622,13 +622,6 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) { processRegions(regions) newRegion := cluster.GetRegion(region.GetID()) c.Assert(newRegion.GetBytesRead(), Equals, uint64(1000)) - - // do not trace the flow changes - cluster.traceRegionFlow = false - processRegions([]*core.RegionInfo{region}) - newRegion = cluster.GetRegion(region.GetID()) - c.Assert(region.GetBytesRead(), Equals, uint64(0)) - c.Assert(newRegion.GetBytesRead(), Not(Equals), uint64(0)) } func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { @@ -914,7 +907,7 @@ func (s *testRegionsInfoSuite) Test(c *C) { c.Assert(cache.SearchRegion(regionKey), IsNil) checkRegions(c, cache, regions[0:i]) - cache.AddRegion(region) + cache.SetRegion(region) checkRegion(c, cache.GetRegion(i), region) checkRegion(c, cache.SearchRegion(regionKey), region) checkRegions(c, cache, regions[0:(i+1)]) @@ -940,7 +933,7 @@ func (s *testRegionsInfoSuite) Test(c *C) { // Reset leader to peer 0. newRegion = region.Clone(core.WithLeader(region.GetPeers()[0])) regions[i] = newRegion - cache.AddRegion(newRegion) + cache.SetRegion(newRegion) checkRegion(c, cache.GetRegion(i), newRegion) checkRegions(c, cache, regions[0:(i+1)]) checkRegion(c, cache.SearchRegion(regionKey), newRegion) @@ -959,7 +952,7 @@ func (s *testRegionsInfoSuite) Test(c *C) { // check overlaps // clone it otherwise there are two items with the same key in the tree overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey())) - cache.AddRegion(overlapRegion) + cache.SetRegion(overlapRegion) c.Assert(cache.GetRegion(n-2), IsNil) c.Assert(cache.GetRegion(n-1), NotNil) diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index f5fbdd8bee0..4cdcab1bf94 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -424,8 +424,7 @@ func (c *coordinator) stop() { // Hack to retrieve info from scheduler. // TODO: remove it. type hasHotStatus interface { - GetHotReadStatus() *statistics.StoreHotPeersInfos - GetHotWriteStatus() *statistics.StoreHotPeersInfos + GetHotStatus(typ string) *statistics.StoreHotPeersInfos GetPendingInfluence() map[uint64]*schedulers.Influence } @@ -437,7 +436,7 @@ func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos { return nil } if h, ok := s.Scheduler.(hasHotStatus); ok { - return h.GetHotWriteStatus() + return h.GetHotStatus(schedulers.HotWriteRegionType) } return nil } @@ -450,7 +449,7 @@ func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos { return nil } if h, ok := s.Scheduler.(hasHotStatus); ok { - return h.GetHotReadStatus() + return h.GetHotStatus(schedulers.HotReadRegionType) } return nil } @@ -503,74 +502,65 @@ func (c *coordinator) collectHotSpotMetrics() { } c.RUnlock() stores := c.cluster.GetStores() - status := s.Scheduler.(hasHotStatus).GetHotWriteStatus() - pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence() - for _, s := range stores { - storeAddress := s.GetAddress() - storeID := s.GetID() - storeLabel := fmt.Sprintf("%d", storeID) - stat, ok := status.AsPeer[storeID] - if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionWriteBytes]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalLoads[statistics.RegionWriteKeys]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count)) - } else { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0) - } + // Collects hot write region metrics. + collectHotMetrics(s, stores, schedulers.HotWriteRegionType) + // Collects hot read region metrics. + collectHotMetrics(s, stores, schedulers.HotReadRegionType) + // Collects pending influence. + collectPendingInfluence(s, stores) +} - stat, ok = status.AsLeader[storeID] - if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionWriteBytes]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalLoads[statistics.RegionWriteKeys]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count)) - } else { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0) - } +func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ string) { + status := s.Scheduler.(hasHotStatus).GetHotStatus(typ) + var ( + kind string + byteTyp, keyTyp statistics.RegionStatKind + ) - // TODO: add to tidb-ansible after merging pending influence into operator influence. - if infl := pendings[storeID]; infl != nil { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count) - } + switch typ { + case schedulers.HotReadRegionType: + kind, byteTyp, keyTyp = "read", statistics.RegionReadBytes, statistics.RegionReadKeys + case schedulers.HotWriteRegionType: + kind, byteTyp, keyTyp = "write", statistics.RegionWriteBytes, statistics.RegionWriteKeys } - - // Collects hot read region metrics. - status = s.Scheduler.(hasHotStatus).GetHotReadStatus() for _, s := range stores { storeAddress := s.GetAddress() storeID := s.GetID() storeLabel := fmt.Sprintf("%d", storeID) stat, ok := status.AsLeader[storeID] if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionReadBytes]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalLoads[statistics.RegionReadKeys]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count)) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_leader").Set(stat.TotalLoads[byteTyp]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_leader").Set(stat.TotalLoads[keyTyp]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_leader").Set(float64(stat.Count)) } else { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_leader").Set(0) } stat, ok = status.AsPeer[storeID] if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionReadBytes]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_peer").Set(stat.TotalLoads[statistics.RegionReadKeys]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_peer").Set(float64(stat.Count)) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_peer").Set(stat.TotalLoads[byteTyp]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_peer").Set(stat.TotalLoads[keyTyp]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_peer").Set(float64(stat.Count)) } else { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_peer").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_peer").Set(0) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_peer").Set(0) } + } +} +func collectPendingInfluence(s *scheduleController, stores []*core.StoreInfo) { + pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence() + for _, s := range stores { + storeAddress := s.GetAddress() + storeID := s.GetID() + storeLabel := fmt.Sprintf("%d", storeID) if infl := pendings[storeID]; infl != nil { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim]) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_count").Set(infl.Count) } } } diff --git a/server/config/config.go b/server/config/config.go index d83486f534c..cf063ab8262 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -19,6 +19,7 @@ import ( "encoding/json" "flag" "fmt" + "math" "net/url" "os" "path/filepath" @@ -1076,9 +1077,9 @@ type PDServerConfig struct { MetricStorage string `toml:"metric-storage" json:"metric-storage"` // There are some values supported: "auto", "none", or a specific address, default: "auto" DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"` - // TraceRegionFlow the option to update flow information of regions - // TODO: deprecate - TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"` + // TraceRegionFlow the option to update flow information of regions. + // WARN: TraceRegionFlow is deprecated. + TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"` // FlowRoundByDigit used to discretization processing flow information. FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"` } @@ -1103,9 +1104,35 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("flow-round-by-digit") { adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) } + c.migrateConfigurationFromFile(meta) return c.Validate() } +func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error { + oldName, newName := "trace-region-flow", "flow-round-by-digit" + defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName) + switch { + case defineOld && defineNew: + if c.TraceRegionFlow && (c.FlowRoundByDigit == defaultFlowRoundByDigit) { + return errors.Errorf("config item %s and %s(deprecated) are conflict", newName, oldName) + } + case defineOld && !defineNew: + if !c.TraceRegionFlow { + c.FlowRoundByDigit = math.MaxInt8 + } + } + return nil +} + +// MigrateDeprecatedFlags updates new flags according to deprecated flags. +func (c *PDServerConfig) MigrateDeprecatedFlags() { + if !c.TraceRegionFlow { + c.FlowRoundByDigit = math.MaxInt8 + } + // json omity the false. next time will not persist to the kv. + c.TraceRegionFlow = false +} + // Clone returns a cloned PD server config. func (c *PDServerConfig) Clone() *PDServerConfig { runtimeServices := append(c.RuntimeServices[:0:0], c.RuntimeServices...) diff --git a/server/config/config_test.go b/server/config/config_test.go index be4fc887f00..2f36dcd244f 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -16,6 +16,7 @@ package config import ( "encoding/json" "fmt" + "math" "os" "path" "strings" @@ -284,6 +285,8 @@ func (s *testConfigSuite) TestMigrateFlags(c *C) { return cfg, err } cfg, err := load(` +[pd-server] +trace-region-flow = false [schedule] disable-remove-down-replica = true enable-make-up-replica = false @@ -291,6 +294,7 @@ disable-remove-extra-replica = true enable-remove-extra-replica = false `) c.Assert(err, IsNil) + c.Assert(cfg.PDServerCfg.FlowRoundByDigit, Equals, math.MaxInt8) c.Assert(cfg.Schedule.EnableReplaceOfflineReplica, IsTrue) c.Assert(cfg.Schedule.EnableRemoveDownReplica, IsFalse) c.Assert(cfg.Schedule.EnableMakeUpReplica, IsFalse) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 63a1b75f8c3..ca2660f620a 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -580,6 +580,7 @@ func (o *PersistOptions) Reload(storage *core.Storage) error { return err } o.adjustScheduleCfg(&cfg.Schedule) + cfg.PDServerCfg.MigrateDeprecatedFlags() if isExist { o.schedule.Store(&cfg.Schedule) o.replication.Store(&cfg.Replication) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 8086405fcb7..5bc22d04eb5 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -282,30 +282,34 @@ func (bc *BasicCluster) DeleteStore(store *StoreInfo) { bc.Stores.DeleteStore(store) } -// PreCheckPutRegion checks if the region is valid to put. -func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { +func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { bc.RLock() - origin := bc.Regions.GetRegion(region.GetID()) + defer bc.RUnlock() + origin = bc.Regions.GetRegion(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - for _, item := range bc.Regions.GetOverlaps(region) { - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { - bc.RUnlock() - return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) - } + overlaps = bc.Regions.GetOverlaps(region) + } + return +} + +// PreCheckPutRegion checks if the region is valid to put. +func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { + origin, overlaps := bc.getRelevantRegions(region) + for _, item := range overlaps { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { + return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) } } - bc.RUnlock() if origin == nil { return nil, nil } + r := region.GetRegionEpoch() o := origin.GetRegionEpoch() - // TiKV reports term after v3.0 isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() - // Region meta is stale, return an error. - if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() || isTermBehind { + if isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta()) } diff --git a/server/core/region.go b/server/core/region.go index 81ed7a4bb64..2bab9bf3fcb 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -458,23 +458,28 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio return r.replicationStatus } -// regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region. -type regionMap map[uint64]*RegionInfo +// regionMap wraps a map[uint64]*regionItem and supports randomly pick a region. They are the leaves of regionTree. +type regionMap map[uint64]*regionItem func newRegionMap() regionMap { - return make(map[uint64]*RegionInfo) + return make(map[uint64]*regionItem) } func (rm regionMap) Len() int { return len(rm) } -func (rm regionMap) Get(id uint64) *RegionInfo { +func (rm regionMap) Get(id uint64) *regionItem { return rm[id] } -func (rm regionMap) Put(region *RegionInfo) { - rm[region.GetID()] = region +// AddNew uses RegionInfo to generate a new regionItem. +// If the regionItem already exists, it will be overwritten. +// Note: Do not use this function when you only need to update the RegionInfo and do not need a new regionItem. +func (rm regionMap) AddNew(region *RegionInfo) *regionItem { + item := ®ionItem{region: region} + rm[region.GetID()] = item + return item } func (rm regionMap) Delete(id uint64) { @@ -505,24 +510,109 @@ func NewRegionsInfo() *RegionsInfo { // GetRegion returns the RegionInfo with regionID func (r *RegionsInfo) GetRegion(regionID uint64) *RegionInfo { - region := r.regions.Get(regionID) - if region == nil { - return nil + if item := r.regions.Get(regionID); item != nil { + return item.region } - return region + return nil } -// SetRegion sets the RegionInfo with regionID -func (r *RegionsInfo) SetRegion(region *RegionInfo) []*RegionInfo { - if origin := r.regions.Get(region.GetID()); origin != nil { - if !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - r.removeRegionFromTreeAndMap(origin) +// SetRegion sets the RegionInfo to regionTree and regionMap, also update leaders and followers by region peers +// overlaps: Other regions that overlap with the specified region, excluding itself. +func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) { + var item *regionItem // Pointer to the *RegionInfo of this ID. + var origin *RegionInfo // This is the original region information of this ID. + var rangeChanged bool // This Region is new, or its range has changed. + var peersChanged bool // This Region is new, or its peers have changed, including leader-change/pending/down. + + if item = r.regions.Get(region.GetID()); item != nil { + // If this ID already exists, use the existing regionItem and pick out the origin. + origin = item.region + rangeChanged = !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || + !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) + if rangeChanged { + // Delete itself in regionTree so that overlaps will not contain itself. + // Because the regionItem is reused, there is no need to delete it in the regionMap. + r.tree.remove(origin) + // A change in the range is equivalent to a change in all peers. + peersChanged = true + } else { + peersChanged = r.shouldRemoveFromSubTree(region, origin) } - if r.shouldRemoveFromSubTree(region, origin) { + // If the peers have changed, the sub regionTree needs to be cleaned up. + if peersChanged { + // TODO: Improve performance by deleting only the different peers. r.removeRegionFromSubTree(origin) } + // Update the RegionInfo in the regionItem. + item.region = region + } else { + // If this ID does not exist, generate a new regionItem and save it in the regionMap. + rangeChanged = true + peersChanged = true + item = r.regions.AddNew(region) + } + + if !rangeChanged { + // If the range is not changed, only the statistical on the regionTree needs to be updated. + r.tree.updateStat(origin, region) + } else { + // It has been removed and all information needs to be updated again. + overlaps = r.tree.update(item) + for _, old := range overlaps { + r.RemoveRegion(r.GetRegion(old.GetID())) + } + } + + if !peersChanged { + // If the peers are not changed, only the statistical on the sub regionTree needs to be updated. + r.updateSubTreeStat(origin, region) + } else { + // It has been removed and all information needs to be updated again. + + // Add to leaders and followers. + for _, peer := range region.GetVoters() { + storeID := peer.GetStoreId() + if peer.GetId() == region.leader.GetId() { + // Add leader peer to leaders. + store, ok := r.leaders[storeID] + if !ok { + store = newRegionTree() + r.leaders[storeID] = store + } + store.update(item) + } else { + // Add follower peer to followers. + store, ok := r.followers[storeID] + if !ok { + store = newRegionTree() + r.followers[storeID] = store + } + store.update(item) + } + } + // Add to learners. + for _, peer := range region.GetLearners() { + storeID := peer.GetStoreId() + store, ok := r.learners[storeID] + if !ok { + store = newRegionTree() + r.learners[storeID] = store + } + store.update(item) + } + // Add to PendingPeers + for _, peer := range region.GetPendingPeers() { + storeID := peer.GetStoreId() + store, ok := r.pendingPeers[storeID] + if !ok { + store = newRegionTree() + r.pendingPeers[storeID] = store + } + store.update(item) + } } - return r.AddRegion(region) + + return } // Len returns the RegionsInfo length @@ -535,98 +625,43 @@ func (r *RegionsInfo) TreeLen() int { return r.tree.length() } -// GetOverlaps returns the regions which are overlapped with the specified region range. -func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo { - return r.tree.getOverlaps(region) -} - -// AddRegion adds RegionInfo to regionTree and regionMap, also update leaders and followers by region peers -func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { - // the regions which are overlapped with the specified region range. - var overlaps []*RegionInfo - // when the value is true, add the region to the tree. otherwise use the region replace the origin region in the tree. - treeNeedAdd := true - if origin := r.GetRegion(region.GetID()); origin != nil { - if regionOld := r.tree.find(region); regionOld != nil { - // Update to tree. - if bytes.Equal(regionOld.region.GetStartKey(), region.GetStartKey()) && - bytes.Equal(regionOld.region.GetEndKey(), region.GetEndKey()) && - regionOld.region.GetID() == region.GetID() { - r.tree.updateStat(regionOld.region, region) - regionOld.region = region - treeNeedAdd = false - } - } - } - if treeNeedAdd { - // Add to tree. - overlaps = r.tree.update(region) - for _, item := range overlaps { - r.RemoveRegion(r.GetRegion(item.GetID())) - } - } - // Add to regions. - r.regions.Put(region) - - // Add to leaders and followers. +func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) { for _, peer := range region.GetVoters() { storeID := peer.GetStoreId() if peer.GetId() == region.leader.GetId() { - // Add leader peer to leaders. - store, ok := r.leaders[storeID] - if !ok { - store = newRegionTree() - r.leaders[storeID] = store + if tree, ok := r.leaders[storeID]; ok { + tree.updateStat(origin, region) } - store.update(region) } else { - // Add follower peer to followers. - store, ok := r.followers[storeID] - if !ok { - store = newRegionTree() - r.followers[storeID] = store + if tree, ok := r.followers[storeID]; ok { + tree.updateStat(origin, region) } - store.update(region) } } - - // Add to learners. for _, peer := range region.GetLearners() { - storeID := peer.GetStoreId() - store, ok := r.learners[storeID] - if !ok { - store = newRegionTree() - r.learners[storeID] = store + if tree, ok := r.learners[peer.GetStoreId()]; ok { + tree.updateStat(origin, region) } - store.update(region) } - - for _, peer := range region.pendingPeers { - storeID := peer.GetStoreId() - store, ok := r.pendingPeers[storeID] - if !ok { - store = newRegionTree() - r.pendingPeers[storeID] = store + for _, peer := range region.GetPendingPeers() { + if tree, ok := r.pendingPeers[peer.GetStoreId()]; ok { + tree.updateStat(origin, region) } - store.update(region) } +} - return overlaps +// GetOverlaps returns the regions which are overlapped with the specified region range. +func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo { + return r.tree.getOverlaps(region) } // RemoveRegion removes RegionInfo from regionTree and regionMap func (r *RegionsInfo) RemoveRegion(region *RegionInfo) { - // Remove from tree and regions. - r.removeRegionFromTreeAndMap(region) - // Remove from leaders and followers. - r.removeRegionFromSubTree(region) -} - -// removeRegionFromTreeAndMap removes RegionInfo from regionTree and regionMap -func (r *RegionsInfo) removeRegionFromTreeAndMap(region *RegionInfo) { // Remove from tree and regions. r.tree.remove(region) r.regions.Delete(region.GetID()) + // Remove from leaders and followers. + r.removeRegionFromSubTree(region) } // removeRegionFromSubTree removes RegionInfo from regionSubTrees @@ -724,8 +759,8 @@ func (r *RegionsInfo) SearchPrevRegion(regionKey []byte) *RegionInfo { // GetRegions gets all RegionInfo from regionMap func (r *RegionsInfo) GetRegions() []*RegionInfo { regions := make([]*RegionInfo, 0, r.regions.Len()) - for _, region := range r.regions { - regions = append(regions, region) + for _, item := range r.regions { + regions = append(regions, item.region) } return regions } @@ -768,8 +803,8 @@ func (r *RegionsInfo) GetStoreRegionSize(storeID uint64) int64 { // GetMetaRegions gets a set of metapb.Region from regionMap func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { regions := make([]*metapb.Region, 0, r.regions.Len()) - for _, region := range r.regions { - regions = append(regions, proto.Clone(region.meta).(*metapb.Region)) + for _, item := range r.regions { + regions = append(regions, proto.Clone(item.region.meta).(*metapb.Region)) } return regions } diff --git a/server/core/region_test.go b/server/core/region_test.go index 7219bb36702..29560605d3f 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -186,14 +186,14 @@ type testRegionMapSuite struct{} func (s *testRegionMapSuite) TestRegionMap(c *C) { rm := newRegionMap() s.check(c, rm) - rm.Put(s.regionInfo(1)) + rm.AddNew(s.regionInfo(1)) s.check(c, rm, 1) - rm.Put(s.regionInfo(2)) - rm.Put(s.regionInfo(3)) + rm.AddNew(s.regionInfo(2)) + rm.AddNew(s.regionInfo(3)) s.check(c, rm, 1, 2, 3) - rm.Put(s.regionInfo(3)) + rm.AddNew(s.regionInfo(3)) rm.Delete(4) s.check(c, rm, 1, 2, 3) @@ -201,7 +201,7 @@ func (s *testRegionMapSuite) TestRegionMap(c *C) { rm.Delete(1) s.check(c, rm, 2) - rm.Put(s.regionInfo(3)) + rm.AddNew(s.regionInfo(3)) s.check(c, rm, 2, 3) } @@ -218,7 +218,7 @@ func (s *testRegionMapSuite) regionInfo(id uint64) *RegionInfo { func (s *testRegionMapSuite) check(c *C, rm regionMap, ids ...uint64) { // Check Get. for _, id := range ids { - c.Assert(rm.Get(id).GetID(), Equals, id) + c.Assert(rm.Get(id).region.GetID(), Equals, id) } // Check Len. c.Assert(rm.Len(), Equals, len(ids)) @@ -229,7 +229,7 @@ func (s *testRegionMapSuite) check(c *C, rm regionMap, ids ...uint64) { } set1 := make(map[uint64]struct{}) for _, r := range rm { - set1[r.GetID()] = struct{}{} + set1[r.region.GetID()] = struct{}{} } c.Assert(set1, DeepEquals, expect) } @@ -439,7 +439,7 @@ func BenchmarkRandomRegion(b *testing.B) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), }, peer) - regions.AddRegion(region) + regions.SetRegion(region) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -492,6 +492,6 @@ func BenchmarkAddRegion(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - regions.AddRegion(items[i]) + regions.SetRegion(items[i]) } } diff --git a/server/core/region_tree.go b/server/core/region_tree.go index 2e582bf034d..4fcc9ee5334 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -97,10 +97,11 @@ func (t *regionTree) getOverlaps(region *RegionInfo) []*RegionInfo { // update updates the tree with the region. // It finds and deletes all the overlapped regions first, and then // insert the region. -func (t *regionTree) update(region *RegionInfo) []*RegionInfo { +func (t *regionTree) update(item *regionItem) []*RegionInfo { + region := item.region t.totalSize += region.approximateSize - overlaps := t.getOverlaps(region) + for _, old := range overlaps { log.Debug("overlapping region", zap.Uint64("region-id", old.GetID()), @@ -110,8 +111,7 @@ func (t *regionTree) update(region *RegionInfo) []*RegionInfo { t.totalSize -= old.approximateSize } - t.tree.ReplaceOrInsert(®ionItem{region: region}) - + t.tree.ReplaceOrInsert(item) return overlaps } diff --git a/server/core/region_tree_test.go b/server/core/region_tree_test.go index 2a3a8ed20ae..201605b1ea4 100644 --- a/server/core/region_tree_test.go +++ b/server/core/region_tree_test.go @@ -123,11 +123,11 @@ func (s *testRegionSuite) newRegionWithStat(start, end string, size, keys int64) func (s *testRegionSuite) TestRegionTreeStat(c *C) { tree := newRegionTree() c.Assert(tree.totalSize, Equals, int64(0)) - tree.update(s.newRegionWithStat("a", "b", 1, 2)) + updateNewItem(tree, s.newRegionWithStat("a", "b", 1, 2)) c.Assert(tree.totalSize, Equals, int64(1)) - tree.update(s.newRegionWithStat("b", "c", 3, 4)) + updateNewItem(tree, s.newRegionWithStat("b", "c", 3, 4)) c.Assert(tree.totalSize, Equals, int64(4)) - tree.update(s.newRegionWithStat("b", "e", 5, 6)) + updateNewItem(tree, s.newRegionWithStat("b", "e", 5, 6)) c.Assert(tree.totalSize, Equals, int64(6)) tree.remove(s.newRegionWithStat("a", "b", 1, 2)) c.Assert(tree.totalSize, Equals, int64(5)) @@ -137,10 +137,10 @@ func (s *testRegionSuite) TestRegionTreeStat(c *C) { func (s *testRegionSuite) TestRegionTreeMerge(c *C) { tree := newRegionTree() - tree.update(s.newRegionWithStat("a", "b", 1, 2)) - tree.update(s.newRegionWithStat("b", "c", 3, 4)) + updateNewItem(tree, s.newRegionWithStat("a", "b", 1, 2)) + updateNewItem(tree, s.newRegionWithStat("b", "c", 3, 4)) c.Assert(tree.totalSize, Equals, int64(4)) - tree.update(s.newRegionWithStat("a", "c", 5, 5)) + updateNewItem(tree, s.newRegionWithStat("a", "c", 5, 5)) c.Assert(tree.totalSize, Equals, int64(5)) } @@ -154,8 +154,8 @@ func (s *testRegionSuite) TestRegionTree(c *C) { regionC := NewTestRegionInfo([]byte("c"), []byte("d")) regionD := NewTestRegionInfo([]byte("d"), []byte{}) - tree.update(regionA) - tree.update(regionC) + updateNewItem(tree, regionA) + updateNewItem(tree, regionC) c.Assert(tree.search([]byte{}), IsNil) c.Assert(tree.search([]byte("a")), Equals, regionA) c.Assert(tree.search([]byte("b")), IsNil) @@ -167,13 +167,13 @@ func (s *testRegionSuite) TestRegionTree(c *C) { c.Assert(tree.searchPrev([]byte("b")), IsNil) c.Assert(tree.searchPrev([]byte("c")), IsNil) - tree.update(regionB) + updateNewItem(tree, regionB) // search previous region c.Assert(tree.searchPrev([]byte("c")), Equals, regionB) c.Assert(tree.searchPrev([]byte("b")), Equals, regionA) tree.remove(regionC) - tree.update(regionD) + updateNewItem(tree, regionD) c.Assert(tree.search([]byte{}), IsNil) c.Assert(tree.search([]byte("a")), Equals, regionA) c.Assert(tree.search([]byte("b")), Equals, regionB) @@ -196,7 +196,7 @@ func (s *testRegionSuite) TestRegionTree(c *C) { // region with the same range and different region id will not be delete. region0 := newRegionItem([]byte{}, []byte("a")).region - tree.update(region0) + updateNewItem(tree, region0) c.Assert(tree.search([]byte{}), Equals, region0) anotherRegion0 := newRegionItem([]byte{}, []byte("a")).region anotherRegion0.meta.Id = 123 @@ -205,7 +205,7 @@ func (s *testRegionSuite) TestRegionTree(c *C) { // overlaps with 0, A, B, C. region0D := newRegionItem([]byte(""), []byte("d")).region - tree.update(region0D) + updateNewItem(tree, region0D) c.Assert(tree.search([]byte{}), Equals, region0D) c.Assert(tree.search([]byte("a")), Equals, region0D) c.Assert(tree.search([]byte("b")), Equals, region0D) @@ -214,7 +214,7 @@ func (s *testRegionSuite) TestRegionTree(c *C) { // overlaps with D. regionE := newRegionItem([]byte("e"), []byte{}).region - tree.update(regionE) + updateNewItem(tree, regionE) c.Assert(tree.search([]byte{}), Equals, region0D) c.Assert(tree.search([]byte("a")), Equals, region0D) c.Assert(tree.search([]byte("b")), Equals, region0D) @@ -225,7 +225,7 @@ func (s *testRegionSuite) TestRegionTree(c *C) { func updateRegions(c *C, tree *regionTree, regions []*RegionInfo) { for _, region := range regions { - tree.update(region) + updateNewItem(tree, region) c.Assert(tree.search(region.GetStartKey()), Equals, region) if len(region.GetEndKey()) > 0 { end := region.GetEndKey()[0] @@ -271,16 +271,16 @@ func (s *testRegionSuite) TestRandomRegion(c *C) { c.Assert(r, IsNil) regionA := NewTestRegionInfo([]byte(""), []byte("g")) - tree.update(regionA) + updateNewItem(tree, regionA) ra := tree.RandomRegion([]KeyRange{NewKeyRange("", "")}) c.Assert(ra, DeepEquals, regionA) regionB := NewTestRegionInfo([]byte("g"), []byte("n")) regionC := NewTestRegionInfo([]byte("n"), []byte("t")) regionD := NewTestRegionInfo([]byte("t"), []byte("")) - tree.update(regionB) - tree.update(regionC) - tree.update(regionD) + updateNewItem(tree, regionB) + updateNewItem(tree, regionC) + updateNewItem(tree, regionD) rb := tree.RandomRegion([]KeyRange{NewKeyRange("g", "n")}) c.Assert(rb, DeepEquals, regionB) @@ -312,7 +312,7 @@ func (s *testRegionSuite) TestRandomRegionDiscontinuous(c *C) { // test for single region regionA := NewTestRegionInfo([]byte("c"), []byte("f")) - tree.update(regionA) + updateNewItem(tree, regionA) ra := tree.RandomRegion([]KeyRange{NewKeyRange("c", "e")}) c.Assert(ra, IsNil) ra = tree.RandomRegion([]KeyRange{NewKeyRange("c", "f")}) @@ -327,7 +327,7 @@ func (s *testRegionSuite) TestRandomRegionDiscontinuous(c *C) { c.Assert(ra, DeepEquals, regionA) regionB := NewTestRegionInfo([]byte("n"), []byte("x")) - tree.update(regionB) + updateNewItem(tree, regionB) rb := tree.RandomRegion([]KeyRange{NewKeyRange("g", "x")}) c.Assert(rb, DeepEquals, regionB) rb = tree.RandomRegion([]KeyRange{NewKeyRange("g", "y")}) @@ -338,17 +338,22 @@ func (s *testRegionSuite) TestRandomRegionDiscontinuous(c *C) { c.Assert(rb, IsNil) regionC := NewTestRegionInfo([]byte("z"), []byte("")) - tree.update(regionC) + updateNewItem(tree, regionC) rc := tree.RandomRegion([]KeyRange{NewKeyRange("y", "")}) c.Assert(rc, DeepEquals, regionC) regionD := NewTestRegionInfo([]byte(""), []byte("a")) - tree.update(regionD) + updateNewItem(tree, regionD) rd := tree.RandomRegion([]KeyRange{NewKeyRange("", "b")}) c.Assert(rd, DeepEquals, regionD) checkRandomRegion(c, tree, []*RegionInfo{regionA, regionB, regionC, regionD}, []KeyRange{NewKeyRange("", "")}) } +func updateNewItem(tree *regionTree, region *RegionInfo) { + item := ®ionItem{region: region} + tree.update(item) +} + func checkRandomRegion(c *C, tree *regionTree, regions []*RegionInfo, ranges []KeyRange) { keys := make(map[string]struct{}) for i := 0; i < 10000 && len(keys) < len(regions); i++ { @@ -376,7 +381,7 @@ func BenchmarkRegionTreeUpdate(b *testing.B) { tree := newRegionTree() for i := 0; i < b.N; i++ { item := &RegionInfo{meta: &metapb.Region{StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1))}} - tree.update(item) + updateNewItem(tree, item) } } @@ -401,6 +406,6 @@ func BenchmarkRegionTreeUpdateUnordered(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - tree.update(items[i]) + updateNewItem(tree, items[i]) } } diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index fafaa8b376c..3b1b95137d4 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -14,6 +14,9 @@ package checker import ( + "math" + "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -34,6 +37,7 @@ type RuleChecker struct { ruleManager *placement.RuleManager name string regionWaitingList cache.Cache + record *recorder } // NewRuleChecker creates a checker instance. @@ -43,6 +47,7 @@ func NewRuleChecker(cluster opt.Cluster, ruleManager *placement.RuleManager, reg ruleManager: ruleManager, name: "rule-checker", regionWaitingList: regionWaitingList, + record: newRecord(), } } @@ -55,7 +60,7 @@ func (c *RuleChecker) GetType() string { // fix it. func (c *RuleChecker) Check(region *core.RegionInfo) *operator.Operator { checkerCounter.WithLabelValues("rule_checker", "check").Inc() - + c.record.refresh(c.cluster) fit := c.cluster.FitRegion(region) if len(fit.RuleFits) == 0 { checkerCounter.WithLabelValues("rule_checker", "fix-range").Inc() @@ -105,11 +110,11 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region for _, peer := range rf.Peers { if c.isDownPeer(region, peer) { checkerCounter.WithLabelValues("rule_checker", "replace-down").Inc() - return c.replaceRulePeer(region, rf, peer, downStatus) + return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus) } if c.isOfflinePeer(peer) { checkerCounter.WithLabelValues("rule_checker", "replace-offline").Inc() - return c.replaceRulePeer(region, rf, peer, offlineStatus) + return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) } } // fix loose matched peers. @@ -143,7 +148,8 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit return op, nil } -func (c *RuleChecker) replaceRulePeer(region *core.RegionInfo, rf *placement.RuleFit, peer *metapb.Peer, status string) (*operator.Operator, error) { +// The peer's store may in Offline or Down, need to be replace. +func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { ruleStores := c.getRuleFitStores(rf) store := c.strategy(region, rf.Rule).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { @@ -152,10 +158,41 @@ func (c *RuleChecker) replaceRulePeer(region *core.RegionInfo, rf *placement.Rul return nil, errors.New("no store to replace peer") } newPeer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole()} - op, err := operator.CreateMovePeerOperator("replace-rule-"+status+"-peer", c.cluster, region, operator.OpReplica, peer.StoreId, newPeer) + // pick the smallest leader store to avoid the Offline store be snapshot generator bottleneck. + var newLeader *metapb.Peer + if region.GetLeader().GetId() == peer.GetId() { + minCount := uint64(math.MaxUint64) + for _, p := range region.GetPeers() { + count := c.record.getOfflineLeaderCount(p.GetStoreId()) + checkPeerhealth := func() bool { + if p.GetId() == peer.GetId() { + return true + } + if region.GetDownPeer(p.GetId()) != nil || region.GetPendingPeer(p.GetId()) != nil { + return false + } + return c.allowLeader(fit, p) + } + if minCount > count && checkPeerhealth() { + minCount = count + newLeader = p + } + } + } + + createOp := func() (*operator.Operator, error) { + if newLeader != nil && newLeader.GetId() != peer.GetId() { + return operator.CreateReplaceLeaderPeerOperator("replace-rule-"+status+"-leader-peer", c.cluster, region, operator.OpReplica, peer.StoreId, newPeer, newLeader) + } + return operator.CreateMovePeerOperator("replace-rule-"+status+"-peer", c.cluster, region, operator.OpReplica, peer.StoreId, newPeer) + } + op, err := createOp() if err != nil { return nil, err } + if newLeader != nil { + c.record.incOfflineLeaderCount(newLeader.GetStoreId()) + } op.SetPriorityLevel(core.HighPriority) return op, nil } @@ -295,3 +332,44 @@ func (c *RuleChecker) getRuleFitStores(rf *placement.RuleFit) []*core.StoreInfo } return stores } + +type recorder struct { + offlineLeaderCounter map[uint64]uint64 + lastUpdateTime time.Time +} + +func newRecord() *recorder { + return &recorder{ + offlineLeaderCounter: make(map[uint64]uint64), + lastUpdateTime: time.Now(), + } +} + +func (o *recorder) getOfflineLeaderCount(storeID uint64) uint64 { + return o.offlineLeaderCounter[storeID] +} + +func (o *recorder) incOfflineLeaderCount(storeID uint64) { + o.offlineLeaderCounter[storeID] += 1 + o.lastUpdateTime = time.Now() +} + +// Offline is triggered manually and only appears when the node makes some adjustments. here is an operator timeout / 2. +var offlineCounterTTL = 5 * time.Minute + +func (o *recorder) refresh(cluster opt.Cluster) { + // re-count the offlineLeaderCounter if the store is already tombstone or store is gone. + if len(o.offlineLeaderCounter) > 0 && time.Since(o.lastUpdateTime) > offlineCounterTTL { + needClean := false + for _, storeID := range o.offlineLeaderCounter { + store := cluster.GetStore(storeID) + if store == nil || store.IsTombstone() { + needClean = true + break + } + } + if needClean { + o.offlineLeaderCounter = make(map[uint64]uint64) + } + } +} diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index 9dfb04b695e..9893508fb76 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -149,6 +149,24 @@ func (s *testRuleCheckerSuite) TestFixPeer(c *C) { c.Assert(op.Desc(), Equals, "replace-rule-offline-peer") c.Assert(op.GetPriorityLevel(), Equals, core.HighPriority) c.Assert(op.Step(0), FitsTypeOf, add) + + s.cluster.SetStoreUp(2) + // leader store offline + s.cluster.SetStoreOffline(1) + r1 := s.cluster.GetRegion(1) + nr1 := r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(3)})) + s.cluster.PutRegion(nr1) + hasTransferLeader := false + for i := 0; i < 100; i++ { + op = s.rc.Check(s.cluster.GetRegion(1)) + c.Assert(op, NotNil) + if step, ok := op.Step(0).(operator.TransferLeader); ok { + c.Assert(step.FromStore, Equals, uint64(1)) + c.Assert(step.ToStore, Not(Equals), uint64(3)) + hasTransferLeader = true + } + } + c.Assert(hasTransferLeader, IsTrue) } func (s *testRuleCheckerSuite) TestFixOrphanPeers(c *C) { diff --git a/server/schedule/operator/create_operator.go b/server/schedule/operator/create_operator.go index dfc4b6d137f..35479392d5f 100644 --- a/server/schedule/operator/create_operator.go +++ b/server/schedule/operator/create_operator.go @@ -87,6 +87,15 @@ func CreateMovePeerOperator(desc string, cluster opt.Cluster, region *core.Regio Build(kind) } +// CreateReplaceLeaderPeerOperator creates an operator that replaces an old peer with a new peer, and move leader from old store firstly. +func CreateReplaceLeaderPeerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) { + return NewBuilder(desc, cluster, region). + RemovePeer(oldStore). + AddPeer(peer). + SetLeader(leader.GetStoreId()). + Build(kind) +} + // CreateMoveLeaderOperator creates an operator that replaces an old leader with a new leader. func CreateMoveLeaderOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, cluster, region). diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 061359b6eb5..9d51b3e4a20 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -32,7 +32,7 @@ type RangeCluster struct { func GenRangeCluster(cluster opt.Cluster, startKey, endKey []byte) *RangeCluster { subCluster := core.NewBasicCluster() for _, r := range cluster.ScanRegions(startKey, endKey, -1) { - subCluster.Regions.AddRegion(r) + subCluster.Regions.SetRegion(r) } return &RangeCluster{ Cluster: cluster, diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 9686a5adf33..b6e3b35ac92 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -629,6 +629,15 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { return nret } + union := bs.sortHotPeers(ret, maxPeerNum) + ret = make([]*statistics.HotPeerStat, 0, len(union)) + for peer := range union { + ret = appendItem(ret, peer) + } + return ret +} + +func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum int) map[*statistics.HotPeerStat]struct{} { byteSort := make([]*statistics.HotPeerStat, len(ret)) copy(byteSort, ret) sort.Slice(byteSort, func(i, j int) bool { @@ -639,7 +648,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { copy(keySort, ret) sort.Slice(keySort, func(i, j int) bool { k := getRegionStatKind(bs.rwTy, statistics.KeyDim) - return byteSort[i].GetLoad(k) > byteSort[j].GetLoad(k) + return keySort[i].GetLoad(k) > keySort[j].GetLoad(k) }) union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum) @@ -652,7 +661,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { break } } - for len(keySort) > 0 { + for len(union) < maxPeerNum && len(keySort) > 0 { peer := keySort[0] keySort = keySort[1:] if _, ok := union[peer]; !ok { @@ -661,11 +670,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { } } } - ret = make([]*statistics.HotPeerStat, 0, len(union)) - for peer := range union { - ret = appendItem(ret, peer) - } - return ret + return union } // isRegionAvailable checks whether the given region is not available to schedule. @@ -1083,32 +1088,22 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { return []*operator.Operator{op}, []Influence{infl} } -func (h *hotScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos { +func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos { h.RLock() defer h.RUnlock() - asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[readLeader])) - asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[readPeer])) - for id, detail := range h.stLoadInfos[readLeader] { - asLeader[id] = detail.toHotPeersStat() - } - for id, detail := range h.stLoadInfos[readPeer] { - asPeer[id] = detail.toHotPeersStat() - } - return &statistics.StoreHotPeersInfos{ - AsLeader: asLeader, - AsPeer: asPeer, - } -} - -func (h *hotScheduler) GetHotWriteStatus() *statistics.StoreHotPeersInfos { - h.RLock() - defer h.RUnlock() - asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[writeLeader])) - asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[writePeer])) - for id, detail := range h.stLoadInfos[writeLeader] { + var leaderTyp, peerTyp resourceType + switch typ { + case HotReadRegionType: + leaderTyp, peerTyp = readLeader, readPeer + case HotWriteRegionType: + leaderTyp, peerTyp = writeLeader, writePeer + } + asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[leaderTyp])) + asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[peerTyp])) + for id, detail := range h.stLoadInfos[leaderTyp] { asLeader[id] = detail.toHotPeersStat() } - for id, detail := range h.stLoadInfos[writePeer] { + for id, detail := range h.stLoadInfos[peerTyp] { asPeer[id] = detail.toHotPeersStat() } return &statistics.StoreHotPeersInfos{ diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 32b9ca32b3e..b40478676d8 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -1267,6 +1267,59 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { } } +func (s *testHotCacheSuite) TestSortHotPeer(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + tc.SetMaxReplicas(3) + tc.DisableFeature(versioninfo.JointConsensus) + sche, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder([]byte("null"))) + c.Assert(err, IsNil) + hb := sche.(*hotScheduler) + leaderSolver := newBalanceSolver(hb, tc, read, transferLeader) + + hotPeers := []*statistics.HotPeerStat{{ + RegionID: 1, + Loads: []float64{ + statistics.RegionReadBytes: 10, + statistics.RegionReadKeys: 1, + }, + }, { + RegionID: 2, + Loads: []float64{ + statistics.RegionReadBytes: 1, + statistics.RegionReadKeys: 10, + }, + }, { + RegionID: 3, + Loads: []float64{ + statistics.RegionReadBytes: 5, + statistics.RegionReadKeys: 6, + }, + }} + + u := leaderSolver.sortHotPeers(hotPeers, 1) + checkSortResult(c, []uint64{1}, u) + + u = leaderSolver.sortHotPeers(hotPeers, 2) + checkSortResult(c, []uint64{1, 2}, u) +} + +func checkSortResult(c *C, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) { + c.Assert(len(regions), Equals, len(hotPeers)) + for _, region := range regions { + in := false + for hotPeer := range hotPeers { + if hotPeer.RegionID == region { + in = true + break + } + } + c.Assert(in, IsTrue) + } +} + func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { originValue := schedulePeerPr defer func() { diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 16c58e828ed..7d8e88e389d 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -129,7 +129,7 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { break } } - tc.Regions.AddRegion(region) + tc.Regions.SetRegion(region) op = sl.Schedule(tc) testutil.CheckTransferLeader(c, op[0], operator.OpLeader, 1, 2) }