Skip to content

Commit

Permalink
Introduce feature gates for the scheduler component
Browse files Browse the repository at this point in the history
Signed-off-by: iawia002 <[email protected]>
  • Loading branch information
iawia002 committed Oct 12, 2021
1 parent 5def820 commit b71d37a
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 14 deletions.
2 changes: 1 addition & 1 deletion artifacts/deploy/karmada-scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ spec:
- --kubeconfig=/etc/kubeconfig
- --bind-address=0.0.0.0
- --secure-port=10351
- --failover=true
- --features=SchedulerFailover=true
- --enable-scheduler-estimator=true
- --v=4
volumeMounts:
Expand Down
2 changes: 1 addition & 1 deletion charts/templates/karmada_scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ spec:
- --kubeconfig=/etc/kubeconfig
- --bind-address=0.0.0.0
- --secure-port=10351
- --failover=true
- --features=SchedulerFailover=true
volumeMounts:
{{- include "karmada.kubeconfig.volumeMount" . | nindent 12 }}
resources:
Expand Down
6 changes: 2 additions & 4 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"

"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/util"
)

Expand All @@ -33,9 +34,6 @@ type Options struct {
// SecurePort is the port that the server serves at.
SecurePort int

// Failover indicates if scheduler should reschedule on cluster failure.
Failover bool

// KubeAPIQPS is the QPS to use while talking with karmada-apiserver.
KubeAPIQPS float32
// KubeAPIBurst is the burst to allow while talking with karmada-apiserver.
Expand Down Expand Up @@ -76,10 +74,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.")
fs.BoolVar(&o.Failover, "failover", false, "Reschedule on cluster failure.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
features.DefaultFeatureGate.AddFlag(fs)
}
1 change: 0 additions & 1 deletion cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
cancel()
}()

scheduler.Failover = opts.Failover
sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, opts)
if !opts.LeaderElection.LeaderElect {
sched.Run(ctx)
Expand Down
121 changes: 121 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package features

import (
"fmt"
"sort"
"strconv"
"strings"
"sync"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

var (
allFeatures = sets.NewString(SchedulerFailover)
defaultFeatures = map[string]bool{
SchedulerFailover: false,
}
// DefaultFeatureGate is a shared global FeatureGate.
DefaultFeatureGate = NewDefaultFeatureGate()
)

const (
// SchedulerFailover indicates if scheduler should reschedule on cluster failure.
SchedulerFailover string = "SchedulerFailover"
)

// FeatureGate indicates whether a given feature is enabled or not.
type FeatureGate interface {
// AddFlag adds a flag for setting global feature gates to the specified FlagSet.
AddFlag(flagset *pflag.FlagSet)
// Enabled returns true if the key is enabled.
Enabled(key string) bool
// Set parses and stores flag gates for known features
// from a string like feature1=true,feature2=false,...
Set(value string) error
// SetFromMap stores flag gates for enabled features from a map[string]bool
SetFromMap(m map[string]bool)
// String returns a string representation of feature gate.
String() string
}

var _ pflag.Value = &featureGate{}

type featureGate struct {
lock sync.Mutex
enabledFeatures map[string]bool
}

func (f *featureGate) AddFlag(flagset *pflag.FlagSet) {
flagset.Var(f, "features", fmt.Sprintf("A set of key={true,false} pairs to enable/disable features, available features: %s", strings.Join(allFeatures.List(), ",")))
}

func (f *featureGate) Enabled(key string) bool {
if b, ok := f.enabledFeatures[key]; ok {
return b
}
return false
}

// String returns a string containing all enabled feature gates, formatted as "key1=value1,key2=value2,...".
func (f *featureGate) String() string {
pairs := make([]string, 0, len(f.enabledFeatures))
for k, v := range f.enabledFeatures {
pairs = append(pairs, fmt.Sprintf("%s=%t", k, v))
}
sort.Strings(pairs)
return strings.Join(pairs, ",")
}

func (f *featureGate) Set(value string) error {
m := make(map[string]bool)
for _, s := range strings.Split(value, ",") {
if len(s) == 0 {
continue
}
arr := strings.SplitN(s, "=", 2)
k := strings.TrimSpace(arr[0])
if len(arr) != 2 {
return fmt.Errorf("missing bool value for %s", k)
}
v := strings.TrimSpace(arr[1])
boolValue, err := strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("invalid value of %s=%s, err: %v", k, v, err)
}
m[k] = boolValue
}
f.SetFromMap(m)
return nil
}

func (f *featureGate) Type() string {
return "mapStringBool"
}

func (f *featureGate) SetFromMap(m map[string]bool) {
f.lock.Lock()
defer f.lock.Unlock()

for k, v := range m {
f.enabledFeatures[k] = v
}

klog.V(1).Infof("feature gates: %v", f.enabledFeatures)
}

// NewFeatureGate returns a new FeatureGate.
func NewFeatureGate() FeatureGate {
return &featureGate{
enabledFeatures: make(map[string]bool),
}
}

// NewDefaultFeatureGate returns the shared global FeatureGate.
func NewDefaultFeatureGate() FeatureGate {
f := NewFeatureGate()
f.SetFromMap(defaultFeatures)
return f
}
94 changes: 94 additions & 0 deletions pkg/features/features_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package features

import "testing"

func TestSet(t *testing.T) {
tests := []struct {
name string
setStr string
wantStr string
}{
{
name: "set multiple features",
setStr: "a=true,b=false",
wantStr: "a=true,b=false",
},
{
name: "set multiple features",
setStr: "a=True,b=False",
wantStr: "a=true,b=false",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gates := NewFeatureGate()
_ = gates.Set(tt.setStr)
got := gates.String()
if got != tt.wantStr {
t.Errorf("want: %s, got %s", tt.wantStr, got)
}
})
}
}

func TestSetFromMap(t *testing.T) {
tests := []struct {
name string
setMap map[string]bool
wantStr string
}{
{
name: "set multiple features",
setMap: map[string]bool{
"a": true,
"b": false,
},
wantStr: "a=true,b=false",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gates := NewFeatureGate()
gates.SetFromMap(tt.setMap)
got := gates.String()
if got != tt.wantStr {
t.Errorf("want: %s, got %s", tt.wantStr, got)
}
})
}
}

func TestEnabled(t *testing.T) {
tests := []struct {
name string
setMap map[string]bool
wantEnabled map[string]bool
}{
{
name: "set multiple features",
setMap: map[string]bool{
"a": true,
"b": false,
},
wantEnabled: map[string]bool{
"a": true,
"b": false,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gates := NewFeatureGate()
gates.SetFromMap(tt.setMap)
for k, want := range tt.wantEnabled {
got := gates.Enabled(k)
if got != want {
t.Errorf("[feature: %s] want %v, got %v", k, want, got)
}
}
})
}
}
11 changes: 4 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
"github.com/karmada-io/karmada/pkg/features"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
Expand Down Expand Up @@ -61,10 +62,6 @@ const (
Unknown ScheduleType = "Unknown"
)

// Failover indicates if the scheduler should performs re-scheduler in case of cluster failure.
// TODO(RainbowMango): Remove the temporary solution by introducing feature flag
var Failover bool

// Scheduler is the scheduler schema, which is used to schedule a specific resource to specific clusters
type Scheduler struct {
DynamicClient dynamic.Interface
Expand Down Expand Up @@ -403,7 +400,7 @@ func (s *Scheduler) scheduleNext() bool {
klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string))
metrics.BindingSchedule(string(ScaleSchedule), metrics.SinceInSeconds(start), err)
case FailoverSchedule:
if Failover {
if features.DefaultFeatureGate.Enabled(features.SchedulerFailover) {
err = s.rescheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as cluster failure", key.(string))
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
Expand Down Expand Up @@ -545,9 +542,9 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {

// Check if cluster becomes failure
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover)
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, features.DefaultFeatureGate.Enabled(features.SchedulerFailover))

if Failover { // Trigger reschedule on cluster failure only when flag is true.
if features.DefaultFeatureGate.Enabled(features.SchedulerFailover) { // Trigger reschedule on cluster failure only when flag is true.
s.enqueueAffectedBinding(newCluster.Name)
s.enqueueAffectedClusterBinding(newCluster.Name)
return
Expand Down

0 comments on commit b71d37a

Please sign in to comment.