Skip to content

Commit

Permalink
Merge pull request #6985 from ialidzhikov/enh/vpa-leader-election
Browse files Browse the repository at this point in the history
vpa-{updater,recommender}: Add support for leader election
  • Loading branch information
k8s-ci-robot authored Jul 9, 2024
2 parents 35359b9 + d5c12cf commit d7091a9
Show file tree
Hide file tree
Showing 22 changed files with 1,594 additions and 15 deletions.
14 changes: 14 additions & 0 deletions vertical-pod-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ Name | Type | Description | Default
`memory-histogram-decay-half-life` | Duration | The amount of time it takes a historical memory usage sample to lose half of its weight. In other words, a fresh usage sample is twice as 'important' as one with age equal to the half life period. | model.DefaultMemoryHistogramDecayHalfLife
`cpu-histogram-decay-half-life` | Duration | The amount of time it takes a historical CPU usage sample to lose half of its weight. | model.DefaultCPUHistogramDecayHalfLife
`cpu-integer-post-processor-enabled` | Bool | Enable the CPU integer recommendation post processor | false
`leader-elect` | Bool | Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability. | false
`leader-elect-lease-duration` | Duration | The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. | 15s
`leader-elect-renew-deadline` | Duration | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than the lease duration. This is only applicable if leader election is enabled. | 10s
`leader-elect-resource-lock` | String | The type of resource object that is used for locking during leader election. Supported options are 'leases', 'endpointsleases' and 'configmapsleases'. | "leases"
`leader-elect-resource-name` | String | The name of resource object that is used for locking during leader election. | "vpa-recommender"
`leader-elect-resource-namespace` | String | The namespace of resource object that is used for locking during leader election. | "kube-system"
`leader-elect-retry-period` | Duration | The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled. | 2s

### What are the parameters to VPA updater?

Expand All @@ -224,3 +231,10 @@ Name | Type | Description | Default
`kube-api-burst` | Float64 | QPS burst limit when making requests to Kubernetes apiserver | 10.0
`use-admission-controller-status` | Bool | If true, updater will only evict pods when admission controller status is valid. | true
`vpa-object-namespace` | String | Namespace to search for VPA objects. Empty means all namespaces will be used. | apiv1.NamespaceAll
`leader-elect` | Bool | Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability. | false
`leader-elect-lease-duration` | Duration | The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. | 15s
`leader-elect-renew-deadline` | Duration | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than the lease duration. This is only applicable if leader election is enabled. | 10s
`leader-elect-resource-lock` | String | The type of resource object that is used for locking during leader election. Supported options are 'leases', 'endpointsleases' and 'configmapsleases'. | "leases"
`leader-elect-resource-name` | String | The name of resource object that is used for locking during leader election. | "vpa-updater"
`leader-elect-resource-namespace` | String | The namespace of resource object that is used for locking during leader election. | "kube-system"
`leader-elect-retry-period` | Duration | The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled. | 2s
74 changes: 74 additions & 0 deletions vertical-pod-autoscaler/deploy/vpa-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,77 @@ subjects:
- kind: ServiceAccount
name: vpa-updater
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: system:leader-locking-vpa-updater
namespace: kube-system
rules:
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- apiGroups:
- "coordination.k8s.io"
resourceNames:
- vpa-updater
resources:
- leases
verbs:
- get
- watch
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: system:leader-locking-vpa-updater
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: system:leader-locking-vpa-updater
subjects:
- kind: ServiceAccount
name: vpa-updater
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: system:leader-locking-vpa-recommender
namespace: kube-system
rules:
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- apiGroups:
- "coordination.k8s.io"
resourceNames:
- vpa-recommender
resources:
- leases
verbs:
- get
- watch
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: system:leader-locking-vpa-recommender
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: system:leader-locking-vpa-recommender
subjects:
- kind: ServiceAccount
name: vpa-recommender
namespace: kube-system
2 changes: 1 addition & 1 deletion vertical-pod-autoscaler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16
github.com/prometheus/common v0.44.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
golang.org/x/time v0.4.0
k8s.io/api v0.28.3
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion vertical-pod-autoscaler/pkg/admission-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
kube_flag.InitFlags()
klog.V(1).Infof("Vertical Pod Autoscaler %s Admission Controller", common.VerticalPodAutoscalerVersion)

healthCheck := metrics.NewHealthCheck(time.Minute, false)
healthCheck := metrics.NewHealthCheck(time.Minute)
metrics.Initialize(*address, healthCheck)
metrics_admission.Register()

Expand Down
95 changes: 86 additions & 9 deletions vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ package main
import (
"context"
"flag"
"os"
"time"

resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"

"github.com/spf13/pflag"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
kube_flag "k8s.io/component-base/cli/flag"
klog "k8s.io/klog/v2"
componentbaseconfig "k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options"
"k8s.io/klog/v2"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"

"k8s.io/autoscaler/vertical-pod-autoscaler/common"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
Expand All @@ -39,7 +46,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/routines"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics"
metrics_quality "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/quality"
metrics_recommender "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/recommender"
Expand Down Expand Up @@ -107,9 +114,81 @@ const (

func main() {
klog.InitFlags(nil)

leaderElection := defaultLeaderElectionConfiguration()
componentbaseoptions.BindLeaderElectionFlags(&leaderElection, pflag.CommandLine)

kube_flag.InitFlags()
klog.V(1).Infof("Vertical Pod Autoscaler %s Recommender: %v", common.VerticalPodAutoscalerVersion, *recommenderName)

healthCheck := metrics.NewHealthCheck(*metricsFetcherInterval * 5)
metrics.Initialize(*address, healthCheck)
metrics_recommender.Register()
metrics_quality.Register()

if !leaderElection.LeaderElect {
run(healthCheck)
} else {
id, err := os.Hostname()
if err != nil {
klog.Fatalf("Unable to get hostname: %v", err)
}
id = id + "_" + string(uuid.NewUUID())

config := common.CreateKubeConfigOrDie(*kubeconfig, float32(*kubeApiQps), int(*kubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)

lock, err := resourcelock.New(
leaderElection.ResourceLock,
leaderElection.ResourceNamespace,
leaderElection.ResourceName,
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
},
)
if err != nil {
klog.Fatalf("Unable to create leader election lock: %v", err)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaderElection.LeaseDuration.Duration,
RenewDeadline: leaderElection.RenewDeadline.Duration,
RetryPeriod: leaderElection.RetryPeriod.Duration,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
run(healthCheck)
},
OnStoppedLeading: func() {
klog.Fatal("lost master")
},
},
})
}
}

const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConfiguration {
return componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: defaultLeaseDuration},
RenewDeadline: metav1.Duration{Duration: defaultRenewDeadline},
RetryPeriod: metav1.Duration{Duration: defaultRetryPeriod},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: "vpa-recommender",
ResourceNamespace: metav1.NamespaceSystem,
}
}

func run(healthCheck *metrics.HealthCheck) {
config := common.CreateKubeConfigOrDie(*kubeconfig, float32(*kubeApiQps), int(*kubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)
Expand All @@ -119,11 +198,6 @@ func main() {

model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))

healthCheck := metrics.NewHealthCheck(*metricsFetcherInterval*5, true)
metrics.Initialize(*address, healthCheck)
metrics_recommender.Register()
metrics_quality.Register()

useCheckpoints := *storage != "prometheus"

var postProcessors []routines.RecommendationPostProcessor
Expand Down Expand Up @@ -211,6 +285,9 @@ func main() {
recommender.GetClusterStateFeeder().InitFromHistoryProvider(provider)
}

// Start updating health check endpoint.
healthCheck.StartMonitoring()

ticker := time.Tick(*metricsFetcherInterval)
for range ticker {
recommender.RunOnce()
Expand Down
80 changes: 79 additions & 1 deletion vertical-pod-autoscaler/pkg/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (
"os"
"time"

"github.com/spf13/pflag"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
kube_flag "k8s.io/component-base/cli/flag"
componentbaseconfig "k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options"
"k8s.io/klog/v2"

"k8s.io/autoscaler/vertical-pod-autoscaler/common"
Expand Down Expand Up @@ -78,13 +85,80 @@ const (

func main() {
klog.InitFlags(nil)

leaderElection := defaultLeaderElectionConfiguration()
componentbaseoptions.BindLeaderElectionFlags(&leaderElection, pflag.CommandLine)

kube_flag.InitFlags()
klog.V(1).Infof("Vertical Pod Autoscaler %s Updater", common.VerticalPodAutoscalerVersion)

healthCheck := metrics.NewHealthCheck(*updaterInterval*5, true)
healthCheck := metrics.NewHealthCheck(*updaterInterval * 5)
metrics.Initialize(*address, healthCheck)
metrics_updater.Register()

if !leaderElection.LeaderElect {
run(healthCheck)
} else {
id, err := os.Hostname()
if err != nil {
klog.Fatalf("Unable to get hostname: %v", err)
}
id = id + "_" + string(uuid.NewUUID())

config := common.CreateKubeConfigOrDie(*kubeconfig, float32(*kubeApiQps), int(*kubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)

lock, err := resourcelock.New(
leaderElection.ResourceLock,
leaderElection.ResourceNamespace,
leaderElection.ResourceName,
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
},
)
if err != nil {
klog.Fatalf("Unable to create leader election lock: %v", err)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaderElection.LeaseDuration.Duration,
RenewDeadline: leaderElection.RenewDeadline.Duration,
RetryPeriod: leaderElection.RetryPeriod.Duration,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
run(healthCheck)
},
OnStoppedLeading: func() {
klog.Fatal("lost master")
},
},
})
}
}

const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConfiguration {
return componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: defaultLeaseDuration},
RenewDeadline: metav1.Duration{Duration: defaultRenewDeadline},
RetryPeriod: metav1.Duration{Duration: defaultRetryPeriod},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: "vpa-updater",
ResourceNamespace: metav1.NamespaceSystem,
}
}

func run(healthCheck *metrics.HealthCheck) {
config := common.CreateKubeConfigOrDie(*kubeconfig, float32(*kubeApiQps), int(*kubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)
vpaClient := vpa_clientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -121,6 +195,10 @@ func main() {
if err != nil {
klog.Fatalf("Failed to create updater: %v", err)
}

// Start updating health check endpoint.
healthCheck.StartMonitoring()

ticker := time.Tick(*updaterInterval)
for range ticker {
ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval)
Expand Down
Loading

0 comments on commit d7091a9

Please sign in to comment.