diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater.go b/vertical-pod-autoscaler/pkg/updater/logic/updater.go index 70186eb31d4b..a2442c5c5df2 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater.go @@ -17,9 +17,12 @@ limitations under the License. package logic import ( + "context" "fmt" "time" + "golang.org/x/time/rate" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -44,7 +47,7 @@ import ( // Updater performs updates on pods if recommended by Vertical Pod Autoscaler type Updater interface { // RunOnce represents single iteration in the main-loop of Updater - RunOnce() + RunOnce(context.Context) } type updater struct { @@ -54,11 +57,13 @@ type updater struct { evictionFactory eviction.PodsEvictionRestrictionFactory recommendationProcessor vpa_api_util.RecommendationProcessor evictionAdmission priority.PodEvictionAdmission + evictionRateLimiter *rate.Limiter selectorFetcher target.VpaTargetSelectorFetcher } // NewUpdater creates Updater with given configuration -func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) { +func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionRateLimit float64, evictionRateBurst int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) { + evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction) if err != nil { return nil, fmt.Errorf("Failed to create eviction restriction factory: %v", err) @@ -69,13 +74,14 @@ func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clien eventRecorder: newEventRecorder(kubeClient), evictionFactory: factory, recommendationProcessor: recommendationProcessor, + evictionRateLimiter: evictionRateLimiter, evictionAdmission: evictionAdmission, selectorFetcher: selectorFetcher, }, nil } // RunOnce represents single iteration in the main-loop of Updater -func (u *updater) RunOnce() { +func (u *updater) RunOnce(ctx context.Context) { timer := metrics_updater.NewExecutionTimer() vpaList, err := u.vpaLister.List(labels.Everything()) @@ -144,6 +150,11 @@ func (u *updater) RunOnce() { if !evictionLimiter.CanEvict(pod) { continue } + err := u.evictionRateLimiter.Wait(ctx) + if err != nil { + klog.Warningf("evicting pod %v failed: %v", pod.Name, err) + return + } klog.V(2).Infof("evicting pod %v", pod.Name) evictErr := evictionLimiter.Evict(pod, u.eventRecorder) if evictErr != nil { @@ -155,6 +166,19 @@ func (u *updater) RunOnce() { timer.ObserveTotal() } +func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter { + var evictionRateLimiter *rate.Limiter + if evictionRateLimit <= 0 { + // As a special case if the rate is set to rate.Inf, the burst rate is ignored + // see https://github.com/golang/time/blob/master/rate/rate.go#L37 + evictionRateLimiter = rate.NewLimiter(rate.Inf, 0) + klog.V(1).Info("Rate limit disabled") + } else { + evictionRateLimiter = rate.NewLimiter(rate.Limit(evictionRateLimit), evictionRateLimitBurst) + } + return evictionRateLimiter +} + // getPodsUpdateOrder returns list of pods that should be updated ordered by update priority func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod { priorityCalculator := priority.NewUpdatePriorityCalculator(vpa.Spec.ResourcePolicy, vpa.Status.Conditions, nil, u.recommendationProcessor) diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go index 2f15090d450d..9f993eb31b88 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go @@ -17,10 +17,14 @@ limitations under the License. package logic import ( + "context" "strconv" "testing" + "golang.org/x/time/rate" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -87,12 +91,13 @@ func TestRunOnce(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, selectorFetcher: mockSelectorFetcher, } mockSelectorFetcher.EXPECT().Fetch(gomock.Eq(vpaObj)).Return(selector, nil) - updater.RunOnce() + updater.RunOnce(context.Background()) eviction.AssertNumberOfCalls(t, "Evict", 5) } @@ -134,11 +139,12 @@ func TestVPAOff(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, selectorFetcher: target_mock.NewMockVpaTargetSelectorFetcher(ctrl), } - updater.RunOnce() + updater.RunOnce(context.Background()) eviction.AssertNumberOfCalls(t, "Evict", 0) } @@ -153,9 +159,27 @@ func TestRunOnceNotingToProcess(t *testing.T) { vpaLister: vpaLister, podLister: podLister, evictionFactory: factory, + evictionRateLimiter: rate.NewLimiter(rate.Inf, 0), recommendationProcessor: &test.FakeRecommendationProcessor{}, } - updater.RunOnce() + updater.RunOnce(context.Background()) +} + +func TestGetRateLimiter(t *testing.T) { + cases := []struct { + rateLimit float64 + rateLimitBurst int + expectedLimiter *rate.Limiter + }{ + {0.0, 1, rate.NewLimiter(rate.Inf, 0)}, + {-1.0, 2, rate.NewLimiter(rate.Inf, 0)}, + {10.0, 3, rate.NewLimiter(rate.Limit(10), 3)}, + } + for _, tc := range cases { + limiter := getRateLimiter(tc.rateLimit, tc.rateLimitBurst) + assert.Equal(t, tc.expectedLimiter.Burst(), limiter.Burst()) + assert.InDelta(t, float64(tc.expectedLimiter.Limit()), float64(limiter.Limit()), 1e-6) + } } type fakeEvictFactory struct { diff --git a/vertical-pod-autoscaler/pkg/updater/main.go b/vertical-pod-autoscaler/pkg/updater/main.go index 9bb6a8f229e8..7f0e8adb3a4d 100644 --- a/vertical-pod-autoscaler/pkg/updater/main.go +++ b/vertical-pod-autoscaler/pkg/updater/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "time" @@ -45,6 +46,12 @@ var ( evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5, `Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`) + evictionRateLimit = flag.Float64("eviction-rate-limit", -1, + `Number of pods that can be evicted per seconds. A rate limit set to 0 or -1 will disable + the rate limiter.`) + + evictionRateBurst = flag.Int("eviction-rate-burst", 1, `Burst of pods that can be evicted.`) + address = flag.String("address", ":8943", "The address to expose Prometheus metrics.") ) @@ -76,13 +83,15 @@ func main() { limitRangeCalculator = limitrange.NewNoopLimitsCalculator() } // TODO: use SharedInformerFactory in updater - updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher) + updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionRateLimit, *evictionRateBurst, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher) if err != nil { klog.Fatalf("Failed to create updater: %v", err) } ticker := time.Tick(*updaterInterval) for range ticker { - updater.RunOnce() + ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval) + defer cancel() + updater.RunOnce(ctx) healthCheck.UpdateLastActivity() } }