From 2bb122b829a06795c0c3c1c4ef4aa2b85584a27a Mon Sep 17 00:00:00 2001 From: cuisongliu Date: Wed, 12 Jul 2023 23:19:52 +0800 Subject: [PATCH] refactor(main): add rate limit (#128) Signed-off-by: cuisongliu --- library/controller/rate_limiter.go | 34 ++++++++++++++++-------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/library/controller/rate_limiter.go b/library/controller/rate_limiter.go index 14649da..c6e9107 100644 --- a/library/controller/rate_limiter.go +++ b/library/controller/rate_limiter.go @@ -16,6 +16,7 @@ package controller import ( "flag" + "golang.org/x/time/rate" "time" "k8s.io/client-go/util/workqueue" @@ -23,36 +24,37 @@ import ( ) const ( - defaultMinRetryDelay = 750 * time.Millisecond - defaultMaxRetryDelay = 15 * time.Minute + defaultMinRetryDelay = 5 * time.Millisecond + defaultMaxRetryDelay = 1000 * time.Second + defaultQPS = float64(10.0) + defaultBurst = 100 flagMinRetryDelay = "min-retry-delay" flagMaxRetryDelay = "max-retry-delay" + flagQPS = "default-qps" + flagBurst = "default-burst" ) // RateLimiterOptions used on reconcilers. type RateLimiterOptions struct { MinRetryDelay time.Duration - + QPS float64 + Burst int MaxRetryDelay time.Duration } func (o *RateLimiterOptions) BindFlags(fs *flag.FlagSet) { fs.DurationVar(&o.MinRetryDelay, flagMinRetryDelay, defaultMinRetryDelay, - "The minimum amount of time for which an object being reconciled will have to wait before a retry.") + "Specifies the minimum delay time before retrying the reconciliation of an object. This delay provides a buffer to prevent rapid-fire retries.") fs.DurationVar(&o.MaxRetryDelay, flagMaxRetryDelay, defaultMaxRetryDelay, - "The maximum amount of time for which an object being reconciled will have to wait before a retry.") + "Specifies the maximum delay time before retrying the reconciliation of an object. This cap ensures that retry delays don't grow excessively long.") + fs.Float64Var(&o.QPS, flagQPS, defaultQPS, "Sets the maximum allowed quantity of process units (batches) that can be processed per second. This limit helps maintain a controlled processing rate.") + fs.IntVar(&o.Burst, flagBurst, defaultBurst, "Sets the maximum quantity of process units (batches) that can be processed in a burst. This limit helps control the processing rate during short periods of high activity.") } func GetRateLimiter(opts RateLimiterOptions) ratelimiter.RateLimiter { - return workqueue.NewItemExponentialFailureRateLimiter( - opts.MinRetryDelay, - opts.MaxRetryDelay) -} - -// GetDefaultRateLimiter -// rate-limiter.RateLimiter with the default configuration. -func GetDefaultRateLimiter() ratelimiter.RateLimiter { - return workqueue.NewItemExponentialFailureRateLimiter( - defaultMinRetryDelay, - defaultMaxRetryDelay) + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(opts.MinRetryDelay, opts.MaxRetryDelay), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.QPS), opts.Burst)}, + ) }