diff --git a/artifacts/deploy/karmada-scheduler.yaml b/artifacts/deploy/karmada-scheduler.yaml index 1dbedec23987..a3792343f648 100644 --- a/artifacts/deploy/karmada-scheduler.yaml +++ b/artifacts/deploy/karmada-scheduler.yaml @@ -28,7 +28,7 @@ spec: - --kubeconfig=/etc/kubeconfig - --bind-address=0.0.0.0 - --secure-port=10351 - - --failover=true + - --features=SchedulerFailover=true - --enable-scheduler-estimator=true - --v=4 volumeMounts: diff --git a/charts/templates/karmada_scheduler.yaml b/charts/templates/karmada_scheduler.yaml index eca2ec6e1066..74e3e28e9a74 100644 --- a/charts/templates/karmada_scheduler.yaml +++ b/charts/templates/karmada_scheduler.yaml @@ -55,7 +55,7 @@ spec: - --kubeconfig=/etc/kubeconfig - --bind-address=0.0.0.0 - --secure-port=10351 - - --failover=true + - --features=SchedulerFailover=true volumeMounts: {{- include "karmada.kubeconfig.volumeMount" . | nindent 12 }} resources: diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index ba7d409ea047..7c89e243cebb 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -8,6 +8,7 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" componentbaseconfig "k8s.io/component-base/config" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/util" ) @@ -33,9 +34,6 @@ type Options struct { // SecurePort is the port that the server serves at. SecurePort int - // Failover indicates if scheduler should reschedule on cluster failure. - Failover bool - // KubeAPIQPS is the QPS to use while talking with karmada-apiserver. KubeAPIQPS float32 // KubeAPIBurst is the burst to allow while talking with karmada-apiserver. @@ -76,10 +74,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.") fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.") fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.") - fs.BoolVar(&o.Failover, "failover", false, "Reschedule on cluster failure.") fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.") fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.") fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.") + features.DefaultFeatureGate.AddFlag(fs) } diff --git a/cmd/scheduler/app/scheduler.go b/cmd/scheduler/app/scheduler.go index f763f715dc0c..e4c1752a2b5a 100644 --- a/cmd/scheduler/app/scheduler.go +++ b/cmd/scheduler/app/scheduler.go @@ -66,7 +66,6 @@ func run(opts *options.Options, stopChan <-chan struct{}) error { cancel() }() - scheduler.Failover = opts.Failover sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, opts) if !opts.LeaderElection.LeaderElect { sched.Run(ctx) diff --git a/pkg/features/features.go b/pkg/features/features.go new file mode 100644 index 000000000000..53745ab325e6 --- /dev/null +++ b/pkg/features/features.go @@ -0,0 +1,121 @@ +package features + +import ( + "fmt" + "sort" + "strconv" + "strings" + "sync" + + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +var ( + allFeatures = sets.NewString(SchedulerFailover) + defaultFeatures = map[string]bool{ + SchedulerFailover: false, + } + // DefaultFeatureGate is a shared global FeatureGate. + DefaultFeatureGate = NewDefaultFeatureGate() +) + +const ( + // SchedulerFailover indicates if scheduler should reschedule on cluster failure. + SchedulerFailover string = "SchedulerFailover" +) + +// FeatureGate indicates whether a given feature is enabled or not. +type FeatureGate interface { + // AddFlag adds a flag for setting global feature gates to the specified FlagSet. + AddFlag(flagset *pflag.FlagSet) + // Enabled returns true if the key is enabled. + Enabled(key string) bool + // Set parses and stores flag gates for known features + // from a string like feature1=true,feature2=false,... + Set(value string) error + // SetFromMap stores flag gates for enabled features from a map[string]bool + SetFromMap(m map[string]bool) + // String returns a string representation of feature gate. + String() string +} + +var _ pflag.Value = &featureGate{} + +type featureGate struct { + lock sync.Mutex + enabledFeatures map[string]bool +} + +func (f *featureGate) AddFlag(flagset *pflag.FlagSet) { + flagset.Var(f, "features", fmt.Sprintf("A set of key={true,false} pairs to enable/disable features, available features: %s", strings.Join(allFeatures.List(), ","))) +} + +func (f *featureGate) Enabled(key string) bool { + if b, ok := f.enabledFeatures[key]; ok { + return b + } + return false +} + +// String returns a string containing all enabled feature gates, formatted as "key1=value1,key2=value2,...". +func (f *featureGate) String() string { + pairs := make([]string, 0, len(f.enabledFeatures)) + for k, v := range f.enabledFeatures { + pairs = append(pairs, fmt.Sprintf("%s=%t", k, v)) + } + sort.Strings(pairs) + return strings.Join(pairs, ",") +} + +func (f *featureGate) Set(value string) error { + m := make(map[string]bool) + for _, s := range strings.Split(value, ",") { + if len(s) == 0 { + continue + } + arr := strings.SplitN(s, "=", 2) + k := strings.TrimSpace(arr[0]) + if len(arr) != 2 { + return fmt.Errorf("missing bool value for %s", k) + } + v := strings.TrimSpace(arr[1]) + boolValue, err := strconv.ParseBool(v) + if err != nil { + return fmt.Errorf("invalid value of %s=%s, err: %v", k, v, err) + } + m[k] = boolValue + } + f.SetFromMap(m) + return nil +} + +func (f *featureGate) Type() string { + return "mapStringBool" +} + +func (f *featureGate) SetFromMap(m map[string]bool) { + f.lock.Lock() + defer f.lock.Unlock() + + for k, v := range m { + f.enabledFeatures[k] = v + } + + klog.V(1).Infof("feature gates: %v", f.enabledFeatures) +} + +// NewFeatureGate returns a new FeatureGate. +func NewFeatureGate() FeatureGate { + return &featureGate{ + enabledFeatures: make(map[string]bool), + } +} + +// NewDefaultFeatureGate returns the shared global FeatureGate. +func NewDefaultFeatureGate() FeatureGate { + f := NewFeatureGate() + f.SetFromMap(defaultFeatures) + return f +} diff --git a/pkg/features/features_test.go b/pkg/features/features_test.go new file mode 100644 index 000000000000..ed368c046472 --- /dev/null +++ b/pkg/features/features_test.go @@ -0,0 +1,94 @@ +package features + +import "testing" + +func TestSet(t *testing.T) { + tests := []struct { + name string + setStr string + wantStr string + }{ + { + name: "set multiple features", + setStr: "a=true,b=false", + wantStr: "a=true,b=false", + }, + { + name: "set multiple features", + setStr: "a=True,b=False", + wantStr: "a=true,b=false", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gates := NewFeatureGate() + _ = gates.Set(tt.setStr) + got := gates.String() + if got != tt.wantStr { + t.Errorf("want: %s, got %s", tt.wantStr, got) + } + }) + } +} + +func TestSetFromMap(t *testing.T) { + tests := []struct { + name string + setMap map[string]bool + wantStr string + }{ + { + name: "set multiple features", + setMap: map[string]bool{ + "a": true, + "b": false, + }, + wantStr: "a=true,b=false", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gates := NewFeatureGate() + gates.SetFromMap(tt.setMap) + got := gates.String() + if got != tt.wantStr { + t.Errorf("want: %s, got %s", tt.wantStr, got) + } + }) + } +} + +func TestEnabled(t *testing.T) { + tests := []struct { + name string + setMap map[string]bool + wantEnabled map[string]bool + }{ + { + name: "set multiple features", + setMap: map[string]bool{ + "a": true, + "b": false, + }, + wantEnabled: map[string]bool{ + "a": true, + "b": false, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gates := NewFeatureGate() + gates.SetFromMap(tt.setMap) + for k, want := range tt.wantEnabled { + got := gates.Enabled(k) + if got != want { + t.Errorf("[feature: %s] want %v, got %v", k, want, got) + } + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d6d1d3d6d83d..6e38c741f1cb 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" + "github.com/karmada-io/karmada/pkg/features" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" @@ -61,10 +62,6 @@ const ( Unknown ScheduleType = "Unknown" ) -// Failover indicates if the scheduler should performs re-scheduler in case of cluster failure. -// TODO(RainbowMango): Remove the temporary solution by introducing feature flag -var Failover bool - // Scheduler is the scheduler schema, which is used to schedule a specific resource to specific clusters type Scheduler struct { DynamicClient dynamic.Interface @@ -403,7 +400,7 @@ func (s *Scheduler) scheduleNext() bool { klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string)) metrics.BindingSchedule(string(ScaleSchedule), metrics.SinceInSeconds(start), err) case FailoverSchedule: - if Failover { + if features.DefaultFeatureGate.Enabled(features.SchedulerFailover) { err = s.rescheduleOne(key.(string)) klog.Infof("Reschedule binding(%s) as cluster failure", key.(string)) metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err) @@ -545,9 +542,9 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) { // Check if cluster becomes failure if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { - klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover) + klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.DefaultFeatureGate.Enabled(features.SchedulerFailover)) - if Failover { // Trigger reschedule on cluster failure only when flag is true. + if features.DefaultFeatureGate.Enabled(features.SchedulerFailover) { // Trigger reschedule on cluster failure only when flag is true. s.enqueueAffectedBinding(newCluster.Name) s.enqueueAffectedClusterBinding(newCluster.Name) return