Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter to the Vertical Pod autoscaler updater component #2326

Merged
merged 1 commit into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package logic

import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"
guillaumebreton marked this conversation as resolved.
Show resolved Hide resolved

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -44,7 +47,7 @@ import (
// Updater performs updates on pods if recommended by Vertical Pod Autoscaler
type Updater interface {
// RunOnce represents single iteration in the main-loop of Updater
RunOnce()
RunOnce(context.Context)
}

type updater struct {
Expand All @@ -54,11 +57,13 @@ type updater struct {
evictionFactory eviction.PodsEvictionRestrictionFactory
recommendationProcessor vpa_api_util.RecommendationProcessor
evictionAdmission priority.PodEvictionAdmission
evictionRateLimiter *rate.Limiter
selectorFetcher target.VpaTargetSelectorFetcher
}

// NewUpdater creates Updater with given configuration
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEvicition int, evictionRateLimit float64, evictionRateBurst int, evictionToleranceFraction float64, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher) (Updater, error) {
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction)
if err != nil {
return nil, fmt.Errorf("Failed to create eviction restriction factory: %v", err)
Expand All @@ -69,13 +74,14 @@ func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clien
eventRecorder: newEventRecorder(kubeClient),
evictionFactory: factory,
recommendationProcessor: recommendationProcessor,
evictionRateLimiter: evictionRateLimiter,
evictionAdmission: evictionAdmission,
selectorFetcher: selectorFetcher,
}, nil
}

// RunOnce represents single iteration in the main-loop of Updater
func (u *updater) RunOnce() {
func (u *updater) RunOnce(ctx context.Context) {
timer := metrics_updater.NewExecutionTimer()

vpaList, err := u.vpaLister.List(labels.Everything())
Expand Down Expand Up @@ -144,6 +150,11 @@ func (u *updater) RunOnce() {
if !evictionLimiter.CanEvict(pod) {
continue
}
err := u.evictionRateLimiter.Wait(ctx)
Copy link

@bandesz bandesz Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait always asks for one token, so it doesn't make much sense to set the burst rate to anything else than 1. If I'm correct then we should remove the eviction-rate-limit-burst parameter.

Sorry, the golang.org/x/time/rate documentation is not really clear. Looking at the code burst is also the maximum number of tokens you can accrue over time. This is important as VPA couldn't suddenly evict lots of pods after not evicting anything for a long time.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without knowing the autoscaler well I'm a little bit worried if Wait will block here for long time (e.g. minutes). The state of the cluster might change significantly during that time.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An idea for solving the Wait()/blocking problem:

  1. Let's collect all pods to be evicted in a list
  2. Randomize the list (to give all pods a fair chance to be evicted in a RunOnce run)
  3. Go through the list and evict a pod if u.evictionRateLimiter.Allow() returns true.
  4. If u.evictionRateLimiter.Allow() returns false then return from the RunOnce function. Any remaining pods to be evicted will be retried in the next RunOnce run

Copy link
Contributor Author

@guillaumebreton guillaumebreton Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your input 🙂

I'm not really sure why the Wait is an issue here :

  • A context with timeout it passed to the Wait function so the Wait will be cancelled if the delay is larger than the tick interval.
  • As far as I understand it. The state of the cluster is updated by the recommender, and the admission controller applies it. The updater is only responsible for eviction here, and when the pod restarts, the last value of the VPA are applied by the admission controller.

In your proposed solution, we would hit the rate limiter quickly and exit. So we would "waste" n seconds of the ticker loop doing nothing. Running on fresher data is an excellent idea, but the only option I see would be to refresh the pod list between every Wait, which doesn't seem ideal to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default tick for the updater is 1 minute, so as @guillaumebreton writes, the Wait will be canceled after that time at most (unless someone uses a custom tick). So we can basically look at info that is stale up to 1 minute, which is also true if updater has a lot of work to do. I'm inclined to agree this solution is acceptable.

As to randomizing the list of pods to evict - the updater has a priority mechanism to select which pods to evict first based on multiple factors (including how far away they are from their recommended resources). Shuffling the list wouldn't play well with that.

if err != nil {
klog.Warningf("evicting pod %v failed: %v", pod.Name, err)
return
}
klog.V(2).Infof("evicting pod %v", pod.Name)
evictErr := evictionLimiter.Evict(pod, u.eventRecorder)
if evictErr != nil {
Expand All @@ -155,6 +166,19 @@ func (u *updater) RunOnce() {
timer.ObserveTotal()
}

func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter {
var evictionRateLimiter *rate.Limiter
if evictionRateLimit <= 0 {
// As a special case if the rate is set to rate.Inf, the burst rate is ignored
// see https://github.com/golang/time/blob/master/rate/rate.go#L37
evictionRateLimiter = rate.NewLimiter(rate.Inf, 0)
klog.V(1).Info("Rate limit disabled")
} else {
evictionRateLimiter = rate.NewLimiter(rate.Limit(evictionRateLimit), evictionRateLimitBurst)
}
return evictionRateLimiter
}

// getPodsUpdateOrder returns list of pods that should be updated ordered by update priority
func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod {
priorityCalculator := priority.NewUpdatePriorityCalculator(vpa.Spec.ResourcePolicy, vpa.Status.Conditions, nil, u.recommendationProcessor)
Expand Down
30 changes: 27 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package logic

import (
"context"
"strconv"
"testing"

"golang.org/x/time/rate"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -87,12 +91,13 @@ func TestRunOnce(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: mockSelectorFetcher,
}

mockSelectorFetcher.EXPECT().Fetch(gomock.Eq(vpaObj)).Return(selector, nil)
updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 5)
}

Expand Down Expand Up @@ -134,11 +139,12 @@ func TestVPAOff(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
selectorFetcher: target_mock.NewMockVpaTargetSelectorFetcher(ctrl),
}

updater.RunOnce()
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 0)
}

Expand All @@ -153,9 +159,27 @@ func TestRunOnceNotingToProcess(t *testing.T) {
vpaLister: vpaLister,
podLister: podLister,
evictionFactory: factory,
evictionRateLimiter: rate.NewLimiter(rate.Inf, 0),
recommendationProcessor: &test.FakeRecommendationProcessor{},
}
updater.RunOnce()
updater.RunOnce(context.Background())
}

func TestGetRateLimiter(t *testing.T) {
cases := []struct {
rateLimit float64
rateLimitBurst int
expectedLimiter *rate.Limiter
}{
{0.0, 1, rate.NewLimiter(rate.Inf, 0)},
{-1.0, 2, rate.NewLimiter(rate.Inf, 0)},
{10.0, 3, rate.NewLimiter(rate.Limit(10), 3)},
}
for _, tc := range cases {
limiter := getRateLimiter(tc.rateLimit, tc.rateLimitBurst)
assert.Equal(t, tc.expectedLimiter.Burst(), limiter.Burst())
assert.InDelta(t, float64(tc.expectedLimiter.Limit()), float64(limiter.Limit()), 1e-6)
}
}

type fakeEvictFactory struct {
Expand Down
13 changes: 11 additions & 2 deletions vertical-pod-autoscaler/pkg/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"time"

Expand Down Expand Up @@ -45,6 +46,12 @@ var (
evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5,
`Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`)

evictionRateLimit = flag.Float64("eviction-rate-limit", -1,
`Number of pods that can be evicted per seconds. A rate limit set to 0 or -1 will disable
the rate limiter.`)

evictionRateBurst = flag.Int("eviction-rate-burst", 1, `Burst of pods that can be evicted.`)

address = flag.String("address", ":8943", "The address to expose Prometheus metrics.")
)

Expand Down Expand Up @@ -76,13 +83,15 @@ func main() {
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
}
// TODO: use SharedInformerFactory in updater
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
updater, err := updater.NewUpdater(kubeClient, vpaClient, *minReplicas, *evictionRateLimit, *evictionRateBurst, *evictionToleranceFraction, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator), nil, targetSelectorFetcher)
if err != nil {
klog.Fatalf("Failed to create updater: %v", err)
}
ticker := time.Tick(*updaterInterval)
for range ticker {
updater.RunOnce()
ctx, cancel := context.WithTimeout(context.Background(), *updaterInterval)
defer cancel()
updater.RunOnce(ctx)
healthCheck.UpdateLastActivity()
}
}