Skip to content

Commit

Permalink
Merge pull request #2326 from monzo/eviction-rate-limiter
Browse files Browse the repository at this point in the history
Add rate limiter to the Vertical Pod autoscaler updater component
  • Loading branch information
k8s-ci-robot authored Oct 25, 2019
2 parents a32987c + 59df00f commit 0e425f3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
30 changes: 27 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package logic

import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -44,7 +47,7 @@ import (
// Updater performs updates on pods if recommended by Vertical Pod Autoscaler
type Updater interface {
// RunOnce represents single iteration in the main-loop of Updater
RunOnce()
RunOnce(context.Context)
}

type updater struct {
Expand All @@ -54,11 +57,13 @@ type updater struct {
evictionFactory eviction.PodsEvictionRestrictionFactory
recommendationProcessor vpa_api_util.RecommendationProcessor
evictionAdmission priority.PodEvictionAdmission
evictionRateLimiter *rate.Limiter
selectorFetcher target.VpaTargetSelectorFetcher
}

// NewUpdater creates Updater with given configuration
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionRateLimit float64, evictionRateBurst int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction)
if err != nil {
return nil, fmt.Errorf("Failed to create eviction restriction factory: %v", err)
Expand All @@ -69,13 +74,14 @@ func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clien
eventRecorder: newEventRecorder(kubeClient),
evictionFactory: factory,
recommendationProcessor: recommendationProcessor,
evictionRateLimiter: evictionRateLimiter,
evictionAdmission: evictionAdmission,
selectorFetcher: selectorFetcher,
}, nil
}

// RunOnce represents single iteration in the main-loop of Updater
func (u *updater) RunOnce() {
func (u *updater) RunOnce(ctx context.Context) {
timer := metrics_updater.NewExecutionTimer()

vpaList, err := u.vpaLister.List(labels.Everything())
Expand Down Expand Up @@ -144,6 +150,11 @@ func (u *updater) RunOnce() {
if !evictionLimiter.CanEvict(pod) {
continue
}
err := u.evictionRateLimiter.Wait(ctx)
if err != nil {
klog.Warningf("evicting pod %v failed: %v", pod.Name, err)
return
}
klog.V(2).Infof("evicting pod %v", pod.Name)
evictErr := evictionLimiter.Evict(pod, u.eventRecorder)
if evictErr != nil {
Expand All @@ -155,6 +166,19 @@ func (u *updater) RunOnce() {
timer.ObserveTotal()
}

func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter {
var evictionRateLimiter *rate.Limiter
if evictionRateLimit <= 0 {
// As a special case if the rate is set to rate.Inf, the burst rate is ignored
// see https://github.com/golang/time/blob/master/rate/rate.go#L37
evictionRateLimiter = rate.NewLimiter(rate.Inf, 0)
klog.V(1).Info("Rate limit disabled")
} else {
evictionRateLimiter = rate.NewLimiter(rate.Limit(evictionRateLimit), evictionRateLimitBurst)
}
return evictionRateLimiter
}

// getPodsUpdateOrder returns list of pods that should be updated ordered by update priority
func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod {
priorityCalculator := priority.NewUpdatePriorityCalculator(vpa.Spec.ResourcePolicy, vpa.Status.Conditions, nil, u.recommendationProcessor)
Expand Down
30 changes: 27 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package logic

import (
"context"
"strconv"
"testing"

"golang.org/x/time/rate"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -87,12 +91,13 @@ func TestRunOnce(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: mockSelectorFetcher,
}

mockSelectorFetcher.EXPECT().Fetch(gomock.Eq(vpaObj)).Return(selector, nil)
updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 5)
}

Expand Down Expand Up @@ -134,11 +139,12 @@ func TestVPAOff(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: target_mock.NewMockVpaTargetSelectorFetcher(ctrl),
}

updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 0)
}

Expand All @@ -153,9 +159,27 @@ func TestRunOnceNotingToProcess(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
}
updater.RunOnce()
updater.RunOnce(context.Background())
}

func TestGetRateLimiter(t *testing.T) {
cases := []struct {
rateLimit float64
rateLimitBurst int
expectedLimiter *rate.Limiter
}{
{0.0, 1, rate.NewLimiter(rate.Inf, 0)},
{-1.0, 2, rate.NewLimiter(rate.Inf, 0)},
{10.0, 3, rate.NewLimiter(rate.Limit(10), 3)},
}
for _, tc := range cases {
limiter := getRateLimiter(tc.rateLimit, tc.rateLimitBurst)
assert.Equal(t, tc.expectedLimiter.Burst(), limiter.Burst())
assert.InDelta(t, float64(tc.expectedLimiter.Limit()), float64(limiter.Limit()), 1e-6)
}
}

type fakeEvictFactory struct {
Expand Down
13 changes: 11 additions & 2 deletions vertical-pod-autoscaler/pkg/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"time"

Expand Down Expand Up @@ -45,6 +46,12 @@ var (
evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5,
`Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`)

evictionRateLimit = flag.Float64("eviction-rate-limit", -1,
`Number of pods that can be evicted per seconds. A rate limit set to 0 or -1 will disable
the rate limiter.`)

evictionRateBurst = flag.Int("eviction-rate-burst", 1, `Burst of pods that can be evicted.`)

address = flag.String("address", ":8943", "The address to expose Prometheus metrics.")
)

Expand Down Expand Up @@ -76,13 +83,15 @@ func main() {
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
}
// TODO: use SharedInformerFactory in updater
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionRateLimit, *evictionRateBurst, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
if err != nil {
klog.Fatalf("Failed to create updater: %v", err)
}
ticker := time.Tick(*updaterInterval)
for range ticker {
updater.RunOnce()
ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval)
defer cancel()
updater.RunOnce(ctx)
healthCheck.UpdateLastActivity()
}
}

0 comments on commit 0e425f3

Please sign in to comment.