Skip to content

Commit

Permalink
Add rate limiter to the Vertical Pod autoscaler updater component
Browse files Browse the repository at this point in the history
This patch adds a rate limiter to the vertical pod autoscaler updater.
It adds two flags
- eviction-rate-limit to control the number of pods that can be evicted
every seconds.
- eviction-rate-limit-burst to control the burst of that can be evicted
immediately.

fixes #2178
  • Loading branch information
Guillaume Breton committed Sep 11, 2019
1 parent e87aea4 commit 19844a2
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
27 changes: 24 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package logic

import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -44,7 +46,7 @@ import (
// Updater performs updates on pods if recommended by Vertical Pod Autoscaler
type Updater interface {
// RunOnce represents single iteration in the main-loop of Updater
RunOnce()
RunOnce(context.Context)
}

type updater struct {
Expand All @@ -54,11 +56,13 @@ type updater struct {
evictionFactory eviction.PodsEvictionRestrictionFactory
recommendationProcessor vpa_api_util.RecommendationProcessor
evictionAdmission priority.PodEvictionAdmission
evictionRateLimiter *rate.Limiter
selectorFetcher target.VpaTargetSelectorFetcher
}

// NewUpdater creates Updater with given configuration
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionRateLimit float64, evictionRateLimitBurst int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateLimitBurst)
factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction)
if err != nil {
return nil, fmt.Errorf("Failed to create eviction restriction factory: %v", err)
Expand All @@ -69,13 +73,14 @@ func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clien
eventRecorder: newEventRecorder(kubeClient),
evictionFactory: factory,
recommendationProcessor: recommendationProcessor,
evictionRateLimiter: evictionRateLimiter,
evictionAdmission: evictionAdmission,
selectorFetcher: selectorFetcher,
}, nil
}

// RunOnce represents single iteration in the main-loop of Updater
func (u *updater) RunOnce() {
func (u *updater) RunOnce(ctx context.Context) {
timer := metrics_updater.NewExecutionTimer()

vpaList, err := u.vpaLister.List(labels.Everything())
Expand Down Expand Up @@ -144,6 +149,11 @@ func (u *updater) RunOnce() {
if !evictionLimiter.CanEvict(pod) {
continue
}
err := u.evictionRateLimiter.Wait(ctx)
if err != nil {
klog.Warningf("evicting pod %v failed: %v", pod.Name, err)
return
}
klog.V(2).Infof("evicting pod %v", pod.Name)
evictErr := evictionLimiter.Evict(pod, u.eventRecorder)
if evictErr != nil {
Expand All @@ -155,6 +165,17 @@ func (u *updater) RunOnce() {
timer.ObserveTotal()
}

func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter {
var evictionRateLimiter *rate.Limiter
if evictionRateLimit == -1 || evictionRateLimit == 0 {
evictionRateLimiter = rate.NewLimiter(rate.Inf, evictionRateLimitBurst)
klog.Warning("Rate limit disabled")
} else {
evictionRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1.0/evictionRateLimit*float64(time.Second))), evictionRateLimitBurst)
}
return evictionRateLimiter
}

// getPodsUpdateOrder returns list of pods that should be updated ordered by update priority
func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod {
priorityCalculator := priority.NewUpdatePriorityCalculator(vpa.Spec.ResourcePolicy, vpa.Status.Conditions, nil, u.recommendationProcessor)
Expand Down
31 changes: 28 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ limitations under the License.
package logic

import (
"context"
"strconv"
"testing"
"time"

"golang.org/x/time/rate"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -87,12 +92,13 @@ func TestRunOnce(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: mockSelectorFetcher,
}

mockSelectorFetcher.EXPECT().Fetch(gomock.Eq(vpaObj)).Return(selector, nil)
updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 5)
}

Expand Down Expand Up @@ -134,11 +140,12 @@ func TestVPAOff(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: target_mock.NewMockVpaTargetSelectorFetcher(ctrl),
}

updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 0)
}

Expand All @@ -153,9 +160,27 @@ func TestRunOnceNotingToProcess(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
}
updater.RunOnce()
updater.RunOnce(context.Background())
}

func TestGetRateLimiter(t *testing.T) {
cases := []struct {
rateLimit float64
rateLimitBurst int
expectedLimiter *rate.Limiter
}{
{0.0, 1, rate.NewLimiter(rate.Inf, 1)},
{-1.0, 2, rate.NewLimiter(rate.Inf, 2)},
{10.0, 3, rate.NewLimiter(rate.Every(time.Duration(1.0/10*float64(time.Second))), 3)},
}
for _, tc := range cases {
limiter := getRateLimiter(tc.rateLimit, tc.rateLimitBurst)
assert.Equal(t, tc.expectedLimiter.Burst(), limiter.Burst())
assert.InDelta(t, float64(tc.expectedLimiter.Limit()), float64(limiter.Limit()), 1e-6)
}
}

type fakeEvictFactory struct {
Expand Down
13 changes: 11 additions & 2 deletions vertical-pod-autoscaler/pkg/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"time"

Expand Down Expand Up @@ -45,6 +46,12 @@ var (
evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5,
`Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`)

evictionRateLimit = flag.Float64("eviction-rate-limit", -1, `
Number of pods that can be evicted per seconds.`)

evictionRateLimitBurst = flag.Int("eviction-rate-limit-burst", 1,
`Burst of pods that can be evicted.`)

address = flag.String("address", ":8943", "The address to expose Prometheus metrics.")
)

Expand Down Expand Up @@ -76,13 +83,15 @@ func main() {
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
}
// TODO: use SharedInformerFactory in updater
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionRateLimit, *evictionRateLimitBurst, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
if err != nil {
klog.Fatalf("Failed to create updater: %v", err)
}
ticker := time.Tick(*updaterInterval)
for range ticker {
updater.RunOnce()
ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval)
updater.RunOnce(ctx)
cancel()
healthCheck.UpdateLastActivity()
}
}

0 comments on commit 19844a2

Please sign in to comment.