From 19844a2d6f52cfcad15ae0ee62d2749dc578755d Mon Sep 17 00:00:00 2001 From: Guillaume Breton Date: Wed, 11 Sep 2019 15:44:48 +0200 Subject: [PATCH] Add rate limiter to the Vertical Pod autoscaler updater component This patch adds a rate limiter to the vertical pod autoscaler updater. It adds two flags - eviction-rate-limit to control the number of pods that can be evicted every seconds. - eviction-rate-limit-burst to control the burst of that can be evicted immediately. fixes #2178 --- .../pkg/updater/logic/updater.go | 27 ++++++++++++++-- .../pkg/updater/logic/updater_test.go | 31 +++++++++++++++++-- vertical-pod-autoscaler/pkg/updater/main.go | 13 ++++++-- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater.go b/vertical-pod-autoscaler/pkg/updater/logic/updater.go index 70186eb31d4b..16caa5c7d868 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater.go @@ -17,9 +17,11 @@ limitations under the License. package logic import ( + "context" "fmt" "time" + "golang.org/x/time/rate" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -44,7 +46,7 @@ import ( // Updater performs updates on pods if recommended by Vertical Pod Autoscaler type Updater interface { // RunOnce represents single iteration in the main-loop of Updater - RunOnce() + RunOnce(context.Context) } type updater struct { @@ -54,11 +56,13 @@ type updater struct { evictionFactory eviction.PodsEvictionRestrictionFactory recommendationProcessor vpa_api_util.RecommendationProcessor evictionAdmission priority.PodEvictionAdmission + evictionRateLimiter *rate.Limiter selectorFetcher target.VpaTargetSelectorFetcher } // NewUpdater creates Updater with given configuration -func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) { +func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionRateLimit float64, evictionRateLimitBurst int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) { + evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateLimitBurst) factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction) if err != nil { return nil, fmt.Errorf("Failed to create eviction restriction factory: %v", err) @@ -69,13 +73,14 @@ func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clien eventRecorder: newEventRecorder(kubeClient), evictionFactory: factory, recommendationProcessor: recommendationProcessor, + evictionRateLimiter: evictionRateLimiter, evictionAdmission: evictionAdmission, selectorFetcher: selectorFetcher, }, nil } // RunOnce represents single iteration in the main-loop of Updater -func (u *updater) RunOnce() { +func (u *updater) RunOnce(ctx context.Context) { timer := metrics_updater.NewExecutionTimer() vpaList, err := u.vpaLister.List(labels.Everything()) @@ -144,6 +149,11 @@ func (u *updater) RunOnce() { if !evictionLimiter.CanEvict(pod) { continue } + err := u.evictionRateLimiter.Wait(ctx) + if err != nil { + klog.Warningf("evicting pod %v failed: %v", pod.Name, err) + return + } klog.V(2).Infof("evicting pod %v", pod.Name) evictErr := evictionLimiter.Evict(pod, u.eventRecorder) if evictErr != nil { @@ -155,6 +165,17 @@ func (u *updater) RunOnce() { timer.ObserveTotal() } +func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter { + var evictionRateLimiter *rate.Limiter + if evictionRateLimit == -1 || evictionRateLimit == 0 { + evictionRateLimiter = rate.NewLimiter(rate.Inf, evictionRateLimitBurst) + klog.Warning("Rate limit disabled") + } else { + evictionRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1.0/evictionRateLimit*float64(time.Second))), evictionRateLimitBurst) + } + return evictionRateLimiter +} + // getPodsUpdateOrder returns list of pods that should be updated ordered by update priority func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod { priorityCalculator := priority.NewUpdatePriorityCalculator(vpa.Spec.ResourcePolicy, vpa.Status.Conditions, nil, u.recommendationProcessor) diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go index 2f15090d450d..085a12f08e59 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go @@ -17,10 +17,15 @@ limitations under the License. package logic import ( + "context" "strconv" "testing" + "time" + + "golang.org/x/time/rate" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -87,12 +92,13 @@ func TestRunOnce(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, selectorFetcher: mockSelectorFetcher, } mockSelectorFetcher.EXPECT().Fetch(gomock.Eq(vpaObj)).Return(selector, nil) - updater.RunOnce() + updater.RunOnce(context.Background()) eviction.AssertNumberOfCalls(t, "Evict", 5) } @@ -134,11 +140,12 @@ func TestVPAOff(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, selectorFetcher: target_mock.NewMockVpaTargetSelectorFetcher(ctrl), } - updater.RunOnce() + updater.RunOnce(context.Background()) eviction.AssertNumberOfCalls(t, "Evict", 0) } @@ -153,9 +160,27 @@ func TestRunOnceNotingToProcess(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, } - updater.RunOnce() + updater.RunOnce(context.Background()) +} + +func TestGetRateLimiter(t *testing.T) { + cases := []struct { + rateLimit float64 + rateLimitBurst int + expectedLimiter *rate.Limiter + }{ + {0.0, 1, rate.NewLimiter(rate.Inf, 1)}, + {-1.0, 2, rate.NewLimiter(rate.Inf, 2)}, + {10.0, 3, rate.NewLimiter(rate.Every(time.Duration(1.0/10*float64(time.Second))), 3)}, + } + for _, tc := range cases { + limiter := getRateLimiter(tc.rateLimit, tc.rateLimitBurst) + assert.Equal(t, tc.expectedLimiter.Burst(), limiter.Burst()) + assert.InDelta(t, float64(tc.expectedLimiter.Limit()), float64(limiter.Limit()), 1e-6) + } } type fakeEvictFactory struct { diff --git a/vertical-pod-autoscaler/pkg/updater/main.go b/vertical-pod-autoscaler/pkg/updater/main.go index 9bb6a8f229e8..7942a2e90da6 100644 --- a/vertical-pod-autoscaler/pkg/updater/main.go +++ b/vertical-pod-autoscaler/pkg/updater/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "time" @@ -45,6 +46,12 @@ var ( evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5, `Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`) + evictionRateLimit = flag.Float64("eviction-rate-limit", -1, ` + Number of pods that can be evicted per seconds.`) + + evictionRateLimitBurst = flag.Int("eviction-rate-limit-burst", 1, + `Burst of pods that can be evicted.`) + address = flag.String("address", ":8943", "The address to expose Prometheus metrics.") ) @@ -76,13 +83,15 @@ func main() { limitRangeCalculator = limitrange.NewNoopLimitsCalculator() } // TODO: use SharedInformerFactory in updater - updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher) + updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionRateLimit, *evictionRateLimitBurst, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher) if err != nil { klog.Fatalf("Failed to create updater: %v", err) } ticker := time.Tick(*updaterInterval) for range ticker { - updater.RunOnce() + ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval) + updater.RunOnce(ctx) + cancel() healthCheck.UpdateLastActivity() } }