Skip to content

Commit

Permalink
*: move some functions from opt to schedule (#4517)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Dec 30, 2021
1 parent ae23d40 commit 632742f
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 248 deletions.
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id, ranges := range s.conf.StoreIDWitRanges {
region := cluster.RandLeaderRegion(id, ranges, opt.IsRegionHealthy)
region := cluster.RandLeaderRegion(id, ranges, schedule.IsRegionHealthy)
if region == nil {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand Down Expand Up @@ -264,7 +264,7 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R
regions := rc.ScanRegions(startKey, endKey, -1)
replicated := true
for _, region := range regions {
if !opt.IsRegionReplicated(rc, region) {
if !schedule.IsRegionReplicated(rc, region) {
replicated = false
break
}
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/id"
"github.com/tikv/pd/server/kv"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/versioninfo"
Expand Down Expand Up @@ -1036,10 +1036,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy)
region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy)
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy)
region = tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy)
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -1055,14 +1055,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy)
region := tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy)
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy), IsNil)
c.Assert(tc.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, opt.IsRegionHealthy), IsNil)
c.Assert(tc.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, schedule.IsRegionHealthy), IsNil)
}
}

Expand Down
5 changes: 3 additions & 2 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/kv"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/checker"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
Expand Down Expand Up @@ -62,7 +63,7 @@ type coordinator struct {
ctx context.Context
cancel context.CancelFunc
cluster *RaftCluster
checkers *schedule.CheckerController
checkers *checker.Controller
regionScatterer *schedule.RegionScatterer
regionSplitter *schedule.RegionSplitter
schedulers map[string]*scheduleController
Expand All @@ -79,7 +80,7 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre
ctx: ctx,
cancel: cancel,
cluster: cluster,
checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController),
checkers: checker.NewController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController),
regionScatterer: schedule.NewRegionScatterer(ctx, cluster),
regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),
schedulers: make(map[string]*scheduleController),
Expand Down
5 changes: 2 additions & 3 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -717,11 +716,11 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error
return ErrRegionNotFound(targetID)
}

if !opt.IsRegionHealthy(region) || !opt.IsRegionReplicated(c, region) {
if !schedule.IsRegionHealthy(region) || !schedule.IsRegionReplicated(c, region) {
return ErrRegionAbnormalPeer(regionID)
}

if !opt.IsRegionHealthy(target) || !opt.IsRegionReplicated(c, target) {
if !schedule.IsRegionHealthy(target) || !schedule.IsRegionReplicated(c, target) {
return ErrRegionAbnormalPeer(targetID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package checker

import (
"context"
Expand All @@ -21,7 +21,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/checker"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
Expand All @@ -31,42 +31,42 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

// CheckerController is used to manage all checkers.
type CheckerController struct {
// Controller is used to manage all checkers.
type Controller struct {
cluster opt.Cluster
opts *config.PersistOptions
opController *OperatorController
learnerChecker *checker.LearnerChecker
replicaChecker *checker.ReplicaChecker
ruleChecker *checker.RuleChecker
splitChecker *checker.SplitChecker
mergeChecker *checker.MergeChecker
jointStateChecker *checker.JointStateChecker
priorityInspector *checker.PriorityInspector
opController *schedule.OperatorController
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
ruleChecker *RuleChecker
splitChecker *SplitChecker
mergeChecker *MergeChecker
jointStateChecker *JointStateChecker
priorityInspector *PriorityInspector
regionWaitingList cache.Cache
}

// NewCheckerController create a new CheckerController.
// NewController create a new Controller.
// TODO: isSupportMerge should be removed.
func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *OperatorController) *CheckerController {
func NewController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &CheckerController{
return &Controller{
cluster: cluster,
opts: cluster.GetOpts(),
opController: opController,
learnerChecker: checker.NewLearnerChecker(cluster),
replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList),
ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList),
splitChecker: checker.NewSplitChecker(cluster, ruleManager, labeler),
mergeChecker: checker.NewMergeChecker(ctx, cluster),
jointStateChecker: checker.NewJointStateChecker(cluster),
priorityInspector: checker.NewPriorityInspector(cluster),
learnerChecker: NewLearnerChecker(cluster),
replicaChecker: NewReplicaChecker(cluster, regionWaitingList),
ruleChecker: NewRuleChecker(cluster, ruleManager, regionWaitingList),
splitChecker: NewSplitChecker(cluster, ruleManager, labeler),
mergeChecker: NewMergeChecker(ctx, cluster),
jointStateChecker: NewJointStateChecker(cluster),
priorityInspector: NewPriorityInspector(cluster),
regionWaitingList: regionWaitingList,
}
}

// CheckRegion will check the region and add a new operator if needed.
func (c *CheckerController) CheckRegion(region *core.RegionInfo) []*operator.Operator {
func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote.
opController := c.opController
Expand Down Expand Up @@ -114,42 +114,42 @@ func (c *CheckerController) CheckRegion(region *core.RegionInfo) []*operator.Ope
}

// GetMergeChecker returns the merge checker.
func (c *CheckerController) GetMergeChecker() *checker.MergeChecker {
func (c *Controller) GetMergeChecker() *MergeChecker {
return c.mergeChecker
}

// GetRuleChecker returns the rule checker.
func (c *CheckerController) GetRuleChecker() *checker.RuleChecker {
func (c *Controller) GetRuleChecker() *RuleChecker {
return c.ruleChecker
}

// GetWaitingRegions returns the regions in the waiting list.
func (c *CheckerController) GetWaitingRegions() []*cache.Item {
func (c *Controller) GetWaitingRegions() []*cache.Item {
return c.regionWaitingList.Elems()
}

// AddWaitingRegion returns the regions in the waiting list.
func (c *CheckerController) AddWaitingRegion(region *core.RegionInfo) {
func (c *Controller) AddWaitingRegion(region *core.RegionInfo) {
c.regionWaitingList.Put(region.GetID(), nil)
}

// RemoveWaitingRegion removes the region from the waiting list.
func (c *CheckerController) RemoveWaitingRegion(id uint64) {
func (c *Controller) RemoveWaitingRegion(id uint64) {
c.regionWaitingList.Remove(id)
}

// GetPriorityRegions returns the region in priority queue
func (c *CheckerController) GetPriorityRegions() []uint64 {
func (c *Controller) GetPriorityRegions() []uint64 {
return c.priorityInspector.GetPriorityRegions()
}

// RemovePriorityRegions removes priority region from priority queue
func (c *CheckerController) RemovePriorityRegions(id uint64) {
func (c *Controller) RemovePriorityRegions(id uint64) {
c.priorityInspector.RemovePriorityRegion(id)
}

// GetPauseController returns pause controller of the checker
func (c *CheckerController) GetPauseController(name string) (*checker.PauseController, error) {
func (c *Controller) GetPauseController(name string) (*PauseController, error) {
switch name {
case "learner":
return &c.learnerChecker.PauseController, nil
Expand Down
7 changes: 4 additions & 3 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
Expand Down Expand Up @@ -112,12 +113,12 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
}

// skip region has down peers or pending peers
if !opt.IsRegionHealthy(region) {
if !schedule.IsRegionHealthy(region) {
checkerCounter.WithLabelValues("merge_checker", "special-peer").Inc()
return nil
}

if !opt.IsRegionReplicated(m.cluster, region) {
if !schedule.IsRegionReplicated(m.cluster, region) {
checkerCounter.WithLabelValues("merge_checker", "abnormal-replica").Inc()
return nil
}
Expand Down Expand Up @@ -169,7 +170,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
return adjacent != nil && !m.splitCache.Exists(adjacent.GetID()) && !m.cluster.IsRegionHot(adjacent) &&
AllowMerge(m.cluster, region, adjacent) && checkPeerStore(m.cluster, region, adjacent) &&
opt.IsRegionHealthy(adjacent) && opt.IsRegionReplicated(m.cluster, adjacent)
schedule.IsRegionHealthy(adjacent) && schedule.IsRegionReplicated(m.cluster, adjacent)
}

// AllowMerge returns true if two regions can be merged according to the key type.
Expand Down
Loading

0 comments on commit 632742f

Please sign in to comment.