From 632742fd49030a55b0c50b9497137c8d5dc2c718 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 30 Dec 2021 16:05:50 +0800 Subject: [PATCH] *: move some functions from `opt` to `schedule` (#4517) Signed-off-by: Ryan Leung --- plugin/scheduler_example/evict_leader.go | 2 +- server/api/region.go | 4 +- server/cluster/cluster_test.go | 12 +- server/cluster/coordinator.go | 5 +- server/handler.go | 5 +- .../{ => checker}/checker_controller.go | 62 +++--- server/schedule/checker/merge_checker.go | 7 +- server/schedule/checker/merge_checker_test.go | 182 +++++++++--------- server/schedule/{opt => }/healthy.go | 20 +- server/schedule/{opt => }/healthy_test.go | 7 +- server/schedule/operator_controller_test.go | 69 ------- server/schedule/region_scatterer.go | 2 +- server/schedule/region_splitter.go | 2 +- server/schedulers/balance_leader.go | 4 +- server/schedulers/balance_region.go | 22 ++- server/schedulers/evict_leader.go | 2 +- server/schedulers/grant_leader.go | 2 +- server/schedulers/hot_region.go | 4 +- server/schedulers/random_merge.go | 6 +- server/schedulers/shuffle_leader.go | 2 +- server/schedulers/shuffle_region.go | 6 +- 21 files changed, 179 insertions(+), 248 deletions(-) rename server/schedule/{ => checker}/checker_controller.go (70%) rename server/schedule/{opt => }/healthy.go (67%) rename server/schedule/{opt => }/healthy_test.go (97%) diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index bc6e5d9bd77..40ad78a7944 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -217,7 +217,7 @@ func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id, ranges := range s.conf.StoreIDWitRanges { - region := cluster.RandLeaderRegion(id, ranges, opt.IsRegionHealthy) + region := cluster.RandLeaderRegion(id, ranges, schedule.IsRegionHealthy) if region == nil { continue } diff --git a/server/api/region.go b/server/api/region.go index bcf57ac3ac5..3c50ea2dbef 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -32,8 +32,8 @@ import ( "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/operator" - "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" "github.com/unrolled/render" "go.uber.org/zap" @@ -264,7 +264,7 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R regions := rc.ScanRegions(startKey, endKey, -1) replicated := true for _, region := range regions { - if !opt.IsRegionReplicated(rc, region) { + if !schedule.IsRegionReplicated(rc, region) { replicated = false break } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e10f7878c5a..d2ffea4931e 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -33,8 +33,8 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/id" "github.com/tikv/pd/server/kv" + "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/labeler" - "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/server/statistics" "github.com/tikv/pd/server/versioninfo" @@ -1036,10 +1036,10 @@ func (s *testRegionsInfoSuite) Test(c *C) { } for i := uint64(0); i < n; i++ { - region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy) + region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy) c.Assert(region.GetLeader().GetStoreId(), Equals, i) - region = tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy) + region = tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy) c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i) c.Assert(region.GetStorePeer(i), NotNil) @@ -1055,14 +1055,14 @@ func (s *testRegionsInfoSuite) Test(c *C) { // All regions will be filtered out if they have pending peers. for i := uint64(0); i < n; i++ { for j := 0; j < cache.GetStoreLeaderCount(i); j++ { - region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy) + region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy) newRegion := region.Clone(core.WithPendingPeers(region.GetPeers())) cache.SetRegion(newRegion) } - c.Assert(tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy), IsNil) + c.Assert(tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy), IsNil) } for i := uint64(0); i < n; i++ { - c.Assert(tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy), IsNil) + c.Assert(tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy), IsNil) } } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 23275336d16..1ee15584a90 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/kv" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/checker" "github.com/tikv/pd/server/schedule/hbstream" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedulers" @@ -62,7 +63,7 @@ type coordinator struct { ctx context.Context cancel context.CancelFunc cluster *RaftCluster - checkers *schedule.CheckerController + checkers *checker.Controller regionScatterer *schedule.RegionScatterer regionSplitter *schedule.RegionSplitter schedulers map[string]*scheduleController @@ -79,7 +80,7 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre ctx: ctx, cancel: cancel, cluster: cluster, - checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController), + checkers: checker.NewController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController), regionScatterer: schedule.NewRegionScatterer(ctx, cluster), regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)), schedulers: make(map[string]*scheduleController), diff --git a/server/handler.go b/server/handler.go index 46fc5c98455..4b50e4b4b5c 100644 --- a/server/handler.go +++ b/server/handler.go @@ -37,7 +37,6 @@ import ( "github.com/tikv/pd/server/core/storelimit" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/operator" - "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/statistics" @@ -717,11 +716,11 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error return ErrRegionNotFound(targetID) } - if !opt.IsRegionHealthy(region) || !opt.IsRegionReplicated(c, region) { + if !schedule.IsRegionHealthy(region) || !schedule.IsRegionReplicated(c, region) { return ErrRegionAbnormalPeer(regionID) } - if !opt.IsRegionHealthy(target) || !opt.IsRegionReplicated(c, target) { + if !schedule.IsRegionHealthy(target) || !schedule.IsRegionReplicated(c, target) { return ErrRegionAbnormalPeer(targetID) } diff --git a/server/schedule/checker_controller.go b/server/schedule/checker/checker_controller.go similarity index 70% rename from server/schedule/checker_controller.go rename to server/schedule/checker/checker_controller.go index cde06dbde29..1cffdb60b7e 100644 --- a/server/schedule/checker_controller.go +++ b/server/schedule/checker/checker_controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package checker import ( "context" @@ -21,7 +21,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/schedule/checker" + "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/labeler" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" @@ -31,42 +31,42 @@ import ( // DefaultCacheSize is the default length of waiting list. const DefaultCacheSize = 1000 -// CheckerController is used to manage all checkers. -type CheckerController struct { +// Controller is used to manage all checkers. +type Controller struct { cluster opt.Cluster opts *config.PersistOptions - opController *OperatorController - learnerChecker *checker.LearnerChecker - replicaChecker *checker.ReplicaChecker - ruleChecker *checker.RuleChecker - splitChecker *checker.SplitChecker - mergeChecker *checker.MergeChecker - jointStateChecker *checker.JointStateChecker - priorityInspector *checker.PriorityInspector + opController *schedule.OperatorController + learnerChecker *LearnerChecker + replicaChecker *ReplicaChecker + ruleChecker *RuleChecker + splitChecker *SplitChecker + mergeChecker *MergeChecker + jointStateChecker *JointStateChecker + priorityInspector *PriorityInspector regionWaitingList cache.Cache } -// NewCheckerController create a new CheckerController. +// NewController create a new Controller. // TODO: isSupportMerge should be removed. -func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *OperatorController) *CheckerController { +func NewController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller { regionWaitingList := cache.NewDefaultCache(DefaultCacheSize) - return &CheckerController{ + return &Controller{ cluster: cluster, opts: cluster.GetOpts(), opController: opController, - learnerChecker: checker.NewLearnerChecker(cluster), - replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList), - ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList), - splitChecker: checker.NewSplitChecker(cluster, ruleManager, labeler), - mergeChecker: checker.NewMergeChecker(ctx, cluster), - jointStateChecker: checker.NewJointStateChecker(cluster), - priorityInspector: checker.NewPriorityInspector(cluster), + learnerChecker: NewLearnerChecker(cluster), + replicaChecker: NewReplicaChecker(cluster, regionWaitingList), + ruleChecker: NewRuleChecker(cluster, ruleManager, regionWaitingList), + splitChecker: NewSplitChecker(cluster, ruleManager, labeler), + mergeChecker: NewMergeChecker(ctx, cluster), + jointStateChecker: NewJointStateChecker(cluster), + priorityInspector: NewPriorityInspector(cluster), regionWaitingList: regionWaitingList, } } // CheckRegion will check the region and add a new operator if needed. -func (c *CheckerController) CheckRegion(region *core.RegionInfo) []*operator.Operator { +func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { // If PD has restarted, it need to check learners added before and promote them. // Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote. opController := c.opController @@ -114,42 +114,42 @@ func (c *CheckerController) CheckRegion(region *core.RegionInfo) []*operator.Ope } // GetMergeChecker returns the merge checker. -func (c *CheckerController) GetMergeChecker() *checker.MergeChecker { +func (c *Controller) GetMergeChecker() *MergeChecker { return c.mergeChecker } // GetRuleChecker returns the rule checker. -func (c *CheckerController) GetRuleChecker() *checker.RuleChecker { +func (c *Controller) GetRuleChecker() *RuleChecker { return c.ruleChecker } // GetWaitingRegions returns the regions in the waiting list. -func (c *CheckerController) GetWaitingRegions() []*cache.Item { +func (c *Controller) GetWaitingRegions() []*cache.Item { return c.regionWaitingList.Elems() } // AddWaitingRegion returns the regions in the waiting list. -func (c *CheckerController) AddWaitingRegion(region *core.RegionInfo) { +func (c *Controller) AddWaitingRegion(region *core.RegionInfo) { c.regionWaitingList.Put(region.GetID(), nil) } // RemoveWaitingRegion removes the region from the waiting list. -func (c *CheckerController) RemoveWaitingRegion(id uint64) { +func (c *Controller) RemoveWaitingRegion(id uint64) { c.regionWaitingList.Remove(id) } // GetPriorityRegions returns the region in priority queue -func (c *CheckerController) GetPriorityRegions() []uint64 { +func (c *Controller) GetPriorityRegions() []uint64 { return c.priorityInspector.GetPriorityRegions() } // RemovePriorityRegions removes priority region from priority queue -func (c *CheckerController) RemovePriorityRegions(id uint64) { +func (c *Controller) RemovePriorityRegions(id uint64) { c.priorityInspector.RemovePriorityRegion(id) } // GetPauseController returns pause controller of the checker -func (c *CheckerController) GetPauseController(name string) (*checker.PauseController, error) { +func (c *Controller) GetPauseController(name string) (*PauseController, error) { switch name { case "learner": return &c.learnerChecker.PauseController, nil diff --git a/server/schedule/checker/merge_checker.go b/server/schedule/checker/merge_checker.go index 15175b2c538..fdd287eb231 100644 --- a/server/schedule/checker/merge_checker.go +++ b/server/schedule/checker/merge_checker.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/labeler" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" @@ -112,12 +113,12 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { } // skip region has down peers or pending peers - if !opt.IsRegionHealthy(region) { + if !schedule.IsRegionHealthy(region) { checkerCounter.WithLabelValues("merge_checker", "special-peer").Inc() return nil } - if !opt.IsRegionReplicated(m.cluster, region) { + if !schedule.IsRegionReplicated(m.cluster, region) { checkerCounter.WithLabelValues("merge_checker", "abnormal-replica").Inc() return nil } @@ -169,7 +170,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { return adjacent != nil && !m.splitCache.Exists(adjacent.GetID()) && !m.cluster.IsRegionHot(adjacent) && AllowMerge(m.cluster, region, adjacent) && checkPeerStore(m.cluster, region, adjacent) && - opt.IsRegionHealthy(adjacent) && opt.IsRegionReplicated(m.cluster, adjacent) + schedule.IsRegionHealthy(adjacent) && schedule.IsRegionReplicated(m.cluster, adjacent) } // AllowMerge returns true if two regions can be merged according to the key type. diff --git a/server/schedule/checker/merge_checker_test.go b/server/schedule/checker/merge_checker_test.go index 30dac3a6703..d461dd36815 100644 --- a/server/schedule/checker/merge_checker_test.go +++ b/server/schedule/checker/merge_checker_test.go @@ -26,6 +26,9 @@ import ( "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/core/storelimit" + "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/hbstream" "github.com/tikv/pd/server/schedule/labeler" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/placement" @@ -77,63 +80,10 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) { s.cluster.PutStoreWithLabels(storeID, labels...) } s.regions = []*core.RegionInfo{ - core.NewRegionInfo( - &metapb.Region{ - Id: 1, - StartKey: []byte(""), - EndKey: []byte("a"), - Peers: []*metapb.Peer{ - {Id: 101, StoreId: 1}, - {Id: 102, StoreId: 2}, - }, - }, - &metapb.Peer{Id: 101, StoreId: 1}, - core.SetApproximateSize(1), - core.SetApproximateKeys(1), - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 2, - StartKey: []byte("a"), - EndKey: []byte("t"), - Peers: []*metapb.Peer{ - {Id: 103, StoreId: 1}, - {Id: 104, StoreId: 4}, - {Id: 105, StoreId: 5}, - }, - }, - &metapb.Peer{Id: 104, StoreId: 4}, - core.SetApproximateSize(200), - core.SetApproximateKeys(200), - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 3, - StartKey: []byte("t"), - EndKey: []byte("x"), - Peers: []*metapb.Peer{ - {Id: 106, StoreId: 2}, - {Id: 107, StoreId: 5}, - {Id: 108, StoreId: 6}, - }, - }, - &metapb.Peer{Id: 108, StoreId: 6}, - core.SetApproximateSize(1), - core.SetApproximateKeys(1), - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 4, - StartKey: []byte("x"), - EndKey: []byte(""), - Peers: []*metapb.Peer{ - {Id: 109, StoreId: 4}, - }, - }, - &metapb.Peer{Id: 109, StoreId: 4}, - core.SetApproximateSize(1), - core.SetApproximateKeys(1), - ), + newRegionInfo(1, "", "a", 1, 1, []uint64{101, 1}, []uint64{101, 1}, []uint64{102, 2}), + newRegionInfo(2, "a", "t", 200, 200, []uint64{104, 4}, []uint64{103, 1}, []uint64{104, 4}, []uint64{105, 5}), + newRegionInfo(3, "t", "x", 1, 1, []uint64{108, 6}, []uint64{106, 2}, []uint64{107, 5}, []uint64{108, 6}), + newRegionInfo(4, "x", "", 1, 1, []uint64{109, 4}, []uint64{109, 4}), } for _, region := range s.regions { @@ -496,9 +446,73 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }) } -var _ = Suite(&testSplitMergeSuite{}) +func (s *testMergeCheckerSuite) TestStoreLimitWithMerge(c *C) { + cfg := config.NewTestOptions() + tc := mockcluster.NewCluster(s.ctx, cfg) + tc.SetMaxMergeRegionSize(2) + tc.SetMaxMergeRegionKeys(2) + tc.SetSplitMergeInterval(0) + regions := []*core.RegionInfo{ + newRegionInfo(1, "", "a", 1, 1, []uint64{101, 1}, []uint64{101, 1}, []uint64{102, 2}), + newRegionInfo(2, "a", "t", 200, 200, []uint64{104, 4}, []uint64{103, 1}, []uint64{104, 4}, []uint64{105, 5}), + newRegionInfo(3, "t", "x", 1, 1, []uint64{108, 6}, []uint64{106, 2}, []uint64{107, 5}, []uint64{108, 6}), + newRegionInfo(4, "x", "", 10, 10, []uint64{109, 4}, []uint64{109, 4}), + } + + for i := uint64(1); i <= 6; i++ { + tc.AddLeaderStore(i, 10) + } + + for _, region := range regions { + tc.PutRegion(region) + } -type testSplitMergeSuite struct{} + mc := NewMergeChecker(s.ctx, tc) + stream := hbstream.NewTestHeartbeatStreams(s.ctx, tc.ID, tc, false /* no need to run */) + oc := schedule.NewOperatorController(s.ctx, tc, stream) + + regions[2] = regions[2].Clone( + core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), + core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}), + ) + + // set to a small rate to reduce unstable possibility. + tc.SetAllStoresLimit(storelimit.AddPeer, 0.0000001) + tc.SetAllStoresLimit(storelimit.RemovePeer, 0.0000001) + tc.PutRegion(regions[2]) + // The size of Region is less or equal than 1MB. + for i := 0; i < 50; i++ { + ops := mc.Check(regions[2]) + c.Assert(ops, NotNil) + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + regions[2] = regions[2].Clone( + core.SetApproximateSize(2), + core.SetApproximateKeys(2), + ) + tc.PutRegion(regions[2]) + // The size of Region is more than 1MB but no more than 20MB. + for i := 0; i < 5; i++ { + ops := mc.Check(regions[2]) + c.Assert(ops, NotNil) + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + { + ops := mc.Check(regions[2]) + c.Assert(ops, NotNil) + c.Assert(oc.AddOperator(ops...), IsFalse) + } +} func (s *testMergeCheckerSuite) TestCache(c *C) { cfg := config.NewTestOptions() @@ -514,36 +528,8 @@ func (s *testMergeCheckerSuite) TestCache(c *C) { s.cluster.PutStoreWithLabels(storeID, labels...) } s.regions = []*core.RegionInfo{ - core.NewRegionInfo( - &metapb.Region{ - Id: 2, - StartKey: []byte("a"), - EndKey: []byte("t"), - Peers: []*metapb.Peer{ - {Id: 103, StoreId: 1}, - {Id: 104, StoreId: 4}, - {Id: 105, StoreId: 5}, - }, - }, - &metapb.Peer{Id: 104, StoreId: 4}, - core.SetApproximateSize(200), - core.SetApproximateKeys(200), - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 3, - StartKey: []byte("t"), - EndKey: []byte("x"), - Peers: []*metapb.Peer{ - {Id: 106, StoreId: 2}, - {Id: 107, StoreId: 5}, - {Id: 108, StoreId: 6}, - }, - }, - &metapb.Peer{Id: 108, StoreId: 6}, - core.SetApproximateSize(1), - core.SetApproximateKeys(1), - ), + newRegionInfo(2, "a", "t", 200, 200, []uint64{104, 4}, []uint64{103, 1}, []uint64{104, 4}, []uint64{105, 5}), + newRegionInfo(3, "t", "x", 1, 1, []uint64{108, 6}, []uint64{106, 2}, []uint64{107, 5}, []uint64{108, 6}), } for _, region := range s.regions { @@ -567,3 +553,21 @@ func makeKeyRanges(keys ...string) []interface{} { } return res } + +func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader []uint64, peers ...[]uint64) *core.RegionInfo { + prs := make([]*metapb.Peer, 0, len(peers)) + for _, peer := range peers { + prs = append(prs, &metapb.Peer{Id: peer[0], StoreId: peer[1]}) + } + return core.NewRegionInfo( + &metapb.Region{ + Id: id, + StartKey: []byte(startKey), + EndKey: []byte(endKey), + Peers: prs, + }, + &metapb.Peer{Id: leader[0], StoreId: leader[1]}, + core.SetApproximateSize(size), + core.SetApproximateKeys(keys), + ) +} diff --git a/server/schedule/opt/healthy.go b/server/schedule/healthy.go similarity index 67% rename from server/schedule/opt/healthy.go rename to server/schedule/healthy.go index dead904c646..a8333f9f0e6 100644 --- a/server/schedule/opt/healthy.go +++ b/server/schedule/healthy.go @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package opt +package schedule import ( "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule/opt" ) -// BalanceEmptyRegionThreshold is a threshold which allow balance the empty region if the region number is less than this threshold. -var balanceEmptyRegionThreshold = 50 - // IsRegionHealthy checks if a region is healthy for scheduling. It requires the // region does not have any down or pending peers. func IsRegionHealthy(region *core.RegionInfo) bool { @@ -33,20 +31,10 @@ func IsRegionHealthyAllowPending(region *core.RegionInfo) bool { return len(region.GetDownPeers()) == 0 } -// IsEmptyRegionAllowBalance checks if a region is an empty region and can be balanced. -func IsEmptyRegionAllowBalance(cluster Cluster, region *core.RegionInfo) bool { - return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < balanceEmptyRegionThreshold -} - -// AllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced. -func AllowBalanceEmptyRegion(cluster Cluster) func(*core.RegionInfo) bool { - return func(region *core.RegionInfo) bool { return IsEmptyRegionAllowBalance(cluster, region) } -} - // IsRegionReplicated checks if a region is fully replicated. When placement // rules is enabled, its peers should fit corresponding rules. When placement // rules is disabled, it should have enough replicas and no any learner peer. -func IsRegionReplicated(cluster Cluster, region *core.RegionInfo) bool { +func IsRegionReplicated(cluster opt.Cluster, region *core.RegionInfo) bool { if cluster.GetOpts().IsPlacementRulesEnabled() { return cluster.GetRuleManager().FitRegion(cluster, region).IsSatisfied() } @@ -54,6 +42,6 @@ func IsRegionReplicated(cluster Cluster, region *core.RegionInfo) bool { } // ReplicatedRegion returns a function that checks if a region is fully replicated. -func ReplicatedRegion(cluster Cluster) func(*core.RegionInfo) bool { +func ReplicatedRegion(cluster opt.Cluster) func(*core.RegionInfo) bool { return func(region *core.RegionInfo) bool { return IsRegionReplicated(cluster, region) } } diff --git a/server/schedule/opt/healthy_test.go b/server/schedule/healthy_test.go similarity index 97% rename from server/schedule/opt/healthy_test.go rename to server/schedule/healthy_test.go index 8f2e7feba04..8adb69d9ede 100644 --- a/server/schedule/opt/healthy_test.go +++ b/server/schedule/healthy_test.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package opt +package schedule import ( "context" - "testing" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -26,10 +25,6 @@ import ( "github.com/tikv/pd/server/core" ) -func TestOpt(t *testing.T) { - TestingT(t) -} - var _ = Suite(&testRegionHealthySuite{}) type testRegionHealthySuite struct { diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 1ac798494f6..03ca334d3b1 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -30,7 +30,6 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" - "github.com/tikv/pd/server/schedule/checker" "github.com/tikv/pd/server/schedule/hbstream" "github.com/tikv/pd/server/schedule/operator" ) @@ -592,74 +591,6 @@ func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) { } } -func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) { - cfg := config.NewTestOptions() - tc := mockcluster.NewCluster(t.ctx, cfg) - tc.SetMaxMergeRegionSize(2) - tc.SetMaxMergeRegionKeys(2) - tc.SetSplitMergeInterval(0) - regions := []*core.RegionInfo{ - newRegionInfo(1, "", "a", 1, 1, []uint64{101, 1}, []uint64{101, 1}, []uint64{102, 2}), - newRegionInfo(2, "a", "t", 200, 200, []uint64{104, 4}, []uint64{103, 1}, []uint64{104, 4}, []uint64{105, 5}), - newRegionInfo(3, "t", "x", 1, 1, []uint64{108, 6}, []uint64{106, 2}, []uint64{107, 5}, []uint64{108, 6}), - newRegionInfo(4, "x", "", 10, 10, []uint64{109, 4}, []uint64{109, 4}), - } - - for i := uint64(1); i <= 6; i++ { - tc.AddLeaderStore(i, 10) - } - - for _, region := range regions { - tc.PutRegion(region) - } - - mc := checker.NewMergeChecker(t.ctx, tc) - stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(t.ctx, tc, stream) - - regions[2] = regions[2].Clone( - core.SetPeers([]*metapb.Peer{ - {Id: 109, StoreId: 2}, - {Id: 110, StoreId: 3}, - {Id: 111, StoreId: 6}, - }), - core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}), - ) - - // set to a small rate to reduce unstable possibility. - tc.SetAllStoresLimit(storelimit.AddPeer, 0.0000001) - tc.SetAllStoresLimit(storelimit.RemovePeer, 0.0000001) - tc.PutRegion(regions[2]) - // The size of Region is less or equal than 1MB. - for i := 0; i < 50; i++ { - ops := mc.Check(regions[2]) - c.Assert(ops, NotNil) - c.Assert(oc.AddOperator(ops...), IsTrue) - for _, op := range ops { - oc.RemoveOperator(op) - } - } - regions[2] = regions[2].Clone( - core.SetApproximateSize(2), - core.SetApproximateKeys(2), - ) - tc.PutRegion(regions[2]) - // The size of Region is more than 1MB but no more than 20MB. - for i := 0; i < 5; i++ { - ops := mc.Check(regions[2]) - c.Assert(ops, NotNil) - c.Assert(oc.AddOperator(ops...), IsTrue) - for _, op := range ops { - oc.RemoveOperator(op) - } - } - { - ops := mc.Check(regions[2]) - c.Assert(ops, NotNil) - c.Assert(oc.AddOperator(ops...), IsFalse) - } -} - func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader []uint64, peers ...[]uint64) *core.RegionInfo { prs := make([]*metapb.Peer, 0, len(peers)) for _, peer := range peers { diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 408df09e697..c07facf2876 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -250,7 +250,7 @@ func (r *RegionScatterer) ScatterRegions(regions map[uint64]*core.RegionInfo, fa // Scatter relocates the region. If the group is defined, the regions' leader with the same group would be scattered // in a group level instead of cluster level. func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*operator.Operator, error) { - if !opt.IsRegionReplicated(r.cluster, region) { + if !IsRegionReplicated(r.cluster, region) { r.cluster.AddSuspectRegions(region.GetID()) scatterCounter.WithLabelValues("skip", "not-replicated").Inc() log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID())) diff --git a/server/schedule/region_splitter.go b/server/schedule/region_splitter.go index ce4d84e31dc..30ac286111f 100644 --- a/server/schedule/region_splitter.go +++ b/server/schedule/region_splitter.go @@ -166,7 +166,7 @@ func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool { if r.cluster.IsRegionHot(region) { return false } - if !opt.IsRegionReplicated(r.cluster, region) { + if !IsRegionReplicated(r.cluster, region) { r.cluster.AddSuspectRegions(region.GetID()) return false } diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 705082a1762..288d594c89d 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -203,7 +203,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operator.Operator { - plan.region = plan.cluster.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, opt.IsRegionHealthy) + plan.region = plan.cluster.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) if plan.region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID())) schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc() @@ -235,7 +235,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator.Operator { - plan.region = plan.cluster.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, opt.IsRegionHealthy) + plan.region = plan.cluster.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) if plan.region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID())) schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc() diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 135dcf72e24..cc9e8cec879 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -62,6 +62,8 @@ const ( BalanceRegionName = "balance-region-scheduler" // BalanceRegionType is balance region scheduler type. BalanceRegionType = "balance-region" + // BalanceEmptyRegionThreshold is a threshold which allow balance the empty region if the region number is less than this threshold. + balanceEmptyRegionThreshold = 50 ) type balanceRegionSchedulerConfig struct { @@ -160,7 +162,7 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera // allow empty region to be scheduled in range cluster allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true } default: - allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster) + allowBalanceEmptyRegion = isAllowBalanceEmptyRegion(cluster) } for _, plan.source = range stores { @@ -169,18 +171,18 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera schedulerCounter.WithLabelValues(s.GetName(), "total").Inc() // Priority pick the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. - plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.IsRegionHealthyAllowPending, opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthyAllowPending, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) if plan.region == nil { // Then pick the region that has a follower in the source store. - plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.IsRegionHealthy, opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Then pick the region has the leader in the source store. - plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.IsRegionHealthy, opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Finally pick learner. - plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.IsRegionHealthy, opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() @@ -260,3 +262,13 @@ func (s *balanceRegionScheduler) transferPeer(plan *balancePlan) *operator.Opera schedulerCounter.WithLabelValues(s.GetName(), "no-replacement").Inc() return nil } + +// isEmptyRegionAllowBalance checks if a region is an empty region and can be balanced. +func isEmptyRegionAllowBalance(cluster opt.Cluster, region *core.RegionInfo) bool { + return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < balanceEmptyRegionThreshold +} + +// isAllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced. +func isAllowBalanceEmptyRegion(cluster opt.Cluster) func(*core.RegionInfo) bool { + return func(region *core.RegionInfo) bool { return isEmptyRegionAllowBalance(cluster, region) } +} diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index c09d615cf7c..1862c337de8 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -279,7 +279,7 @@ func scheduleEvictLeaderOnce(name, typ string, cluster opt.Cluster, storeRanges ops := make([]*operator.Operator, 0, len(storeRanges)) for id, ranges := range storeRanges { var filters []filter.Filter - region := cluster.RandLeaderRegion(id, ranges, opt.IsRegionHealthy) + region := cluster.RandLeaderRegion(id, ranges, schedule.IsRegionHealthy) if region == nil { // try to pick unhealthy region region = cluster.RandLeaderRegion(id, ranges) diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index dabb299995a..997407d24be 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -232,7 +232,7 @@ func (s *grantLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato defer s.conf.mu.RUnlock() ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWithRanges)) for id, ranges := range s.conf.StoreIDWithRanges { - region := cluster.RandFollowerRegion(id, ranges, opt.IsRegionHealthy) + region := cluster.RandFollowerRegion(id, ranges, schedule.IsRegionHealthy) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() continue diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index e9e4ecc4cbe..176cce602c5 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -653,12 +653,12 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { } } - if !opt.IsRegionHealthyAllowPending(region) { + if !schedule.IsRegionHealthyAllowPending(region) { schedulerCounter.WithLabelValues(bs.sche.GetName(), "unhealthy-replica").Inc() return false } - if !opt.IsRegionReplicated(bs.cluster, region) { + if !schedule.IsRegionReplicated(bs.cluster, region) { log.Debug("region has abnormal replica count", zap.String("scheduler", bs.sche.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(bs.sche.GetName(), "abnormal-replica").Inc() return false diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 9643387fdfa..fff6cdc9a82 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -109,7 +109,7 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato schedulerCounter.WithLabelValues(s.GetName(), "no-source-store").Inc() return nil } - region := cluster.RandLeaderRegion(store.GetID(), s.conf.Ranges, opt.IsRegionHealthy) + region := cluster.RandLeaderRegion(store.GetID(), s.conf.Ranges, schedule.IsRegionHealthy) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() return nil @@ -139,10 +139,10 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato } func (s *randomMergeScheduler) allowMerge(cluster opt.Cluster, region, target *core.RegionInfo) bool { - if !opt.IsRegionHealthy(region) || !opt.IsRegionHealthy(target) { + if !schedule.IsRegionHealthy(region) || !schedule.IsRegionHealthy(target) { return false } - if !opt.IsRegionReplicated(cluster, region) || !opt.IsRegionReplicated(cluster, target) { + if !schedule.IsRegionReplicated(cluster, region) || !schedule.IsRegionReplicated(cluster, target) { return false } if cluster.IsRegionHot(region) || cluster.IsRegionHot(target) { diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index eb09af8bdc0..2c22be2da19 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -115,7 +115,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() return nil } - region := cluster.RandFollowerRegion(targetStore.GetID(), s.conf.Ranges, opt.IsRegionHealthy) + region := cluster.RandFollowerRegion(targetStore.GetID(), s.conf.Ranges, schedule.IsRegionHealthy) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() return nil diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index 7cab03ed55a..ea46312de24 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -135,13 +135,13 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core. for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = cluster.RandFollowerRegion(source.GetID(), s.conf.GetRanges(), opt.IsRegionHealthy, opt.ReplicatedRegion(cluster)) + region = cluster.RandFollowerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = cluster.RandLeaderRegion(source.GetID(), s.conf.GetRanges(), opt.IsRegionHealthy, opt.ReplicatedRegion(cluster)) + region = cluster.RandLeaderRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = cluster.RandLearnerRegion(source.GetID(), s.conf.GetRanges(), opt.IsRegionHealthy, opt.ReplicatedRegion(cluster)) + region = cluster.RandLearnerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) } if region != nil { return region, region.GetStorePeer(source.GetID())