Skip to content

Commit

Permalink
refactor(main): add rate limit (#128)
Browse files Browse the repository at this point in the history
Signed-off-by: cuisongliu <[email protected]>
  • Loading branch information
cuisongliu authored Jul 12, 2023
1 parent 33416b6 commit 2bb122b
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions library/controller/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,45 @@ package controller

import (
"flag"
"golang.org/x/time/rate"
"time"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
)

const (
defaultMinRetryDelay = 750 * time.Millisecond
defaultMaxRetryDelay = 15 * time.Minute
defaultMinRetryDelay = 5 * time.Millisecond
defaultMaxRetryDelay = 1000 * time.Second
defaultQPS = float64(10.0)
defaultBurst = 100
flagMinRetryDelay = "min-retry-delay"
flagMaxRetryDelay = "max-retry-delay"
flagQPS = "default-qps"
flagBurst = "default-burst"
)

// RateLimiterOptions used on reconcilers.
type RateLimiterOptions struct {
MinRetryDelay time.Duration

QPS float64
Burst int
MaxRetryDelay time.Duration
}

func (o *RateLimiterOptions) BindFlags(fs *flag.FlagSet) {
fs.DurationVar(&o.MinRetryDelay, flagMinRetryDelay, defaultMinRetryDelay,
"The minimum amount of time for which an object being reconciled will have to wait before a retry.")
"Specifies the minimum delay time before retrying the reconciliation of an object. This delay provides a buffer to prevent rapid-fire retries.")
fs.DurationVar(&o.MaxRetryDelay, flagMaxRetryDelay, defaultMaxRetryDelay,
"The maximum amount of time for which an object being reconciled will have to wait before a retry.")
"Specifies the maximum delay time before retrying the reconciliation of an object. This cap ensures that retry delays don't grow excessively long.")
fs.Float64Var(&o.QPS, flagQPS, defaultQPS, "Sets the maximum allowed quantity of process units (batches) that can be processed per second. This limit helps maintain a controlled processing rate.")
fs.IntVar(&o.Burst, flagBurst, defaultBurst, "Sets the maximum quantity of process units (batches) that can be processed in a burst. This limit helps control the processing rate during short periods of high activity.")
}

func GetRateLimiter(opts RateLimiterOptions) ratelimiter.RateLimiter {
return workqueue.NewItemExponentialFailureRateLimiter(
opts.MinRetryDelay,
opts.MaxRetryDelay)
}

// GetDefaultRateLimiter
// rate-limiter.RateLimiter with the default configuration.
func GetDefaultRateLimiter() ratelimiter.RateLimiter {
return workqueue.NewItemExponentialFailureRateLimiter(
defaultMinRetryDelay,
defaultMaxRetryDelay)
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(opts.MinRetryDelay, opts.MaxRetryDelay),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.QPS), opts.Burst)},
)
}

0 comments on commit 2bb122b

Please sign in to comment.