From 071c193af4636c29ef6671d183fc12516365f2ef Mon Sep 17 00:00:00 2001 From: hasheddan Date: Mon, 15 Feb 2021 14:24:11 -0600 Subject: [PATCH] Add default provider and controller rate limiters Adds default rate limiters to be used at both the provider and controller level. The provider rate limiter is a configurable token bucket and the controller limiter is a max of limiter that uses the provider limiter and a per-item exponential backoff. Signed-off-by: hasheddan --- pkg/ratelimiter/default.go | 48 +++++++++++++++++++++++++++++++++ pkg/ratelimiter/default_test.go | 42 +++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 pkg/ratelimiter/default.go create mode 100644 pkg/ratelimiter/default_test.go diff --git a/pkg/ratelimiter/default.go b/pkg/ratelimiter/default.go new file mode 100644 index 000000000..de23bca16 --- /dev/null +++ b/pkg/ratelimiter/default.go @@ -0,0 +1,48 @@ +/* +Copyright 2021 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "time" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" +) + +// DefaultProviderRPS is the recommended default average requeues per second +// tolerated by a provider's controller manager. +const DefaultProviderRPS = 1 + +// NewDefaultProviderRateLimiter returns a token bucket rate limiter meant for +// limiting the number of average total requeues per second for all controllers +// registered with a controller manager. The bucket size is a linear function of +// the requeues per second. +func NewDefaultProviderRateLimiter(rps int) *workqueue.BucketRateLimiter { + return &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rps), rps*10)} +} + +// NewDefaultManagedRateLimiter returns a rate limiter that takes the maximum +// delay between the passed providerRateLimiter and a per-item exponential +// backoff limiter. The exponential backoff limiter has a base delay of 1s and a +// maximum of 60s. +func NewDefaultManagedRateLimiter(providerRateLimiter ratelimiter.RateLimiter) ratelimiter.RateLimiter { + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 60*time.Second), + providerRateLimiter, + ) +} diff --git a/pkg/ratelimiter/default_test.go b/pkg/ratelimiter/default_test.go new file mode 100644 index 000000000..656ba5616 --- /dev/null +++ b/pkg/ratelimiter/default_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2021 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "sync" + "testing" + "time" +) + +func TestDefaultMangedRateLimiter(t *testing.T) { + limiter := NewDefaultManagedRateLimiter(NewDefaultProviderRateLimiter(DefaultProviderRPS)) + backoffSchedule := []int{1, 2, 4, 8, 16, 32, 60} + for _, d := range backoffSchedule { + if e, a := time.Duration(d)*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + limiter.Forget("one") + if e, a := time.Duration(backoffSchedule[0])*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + wg.Done() + }() +}