From fa470e277ed4549521a2c9858e422517ff7ff84b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 4 Oct 2017 22:49:52 -0700 Subject: [PATCH 1/4] Update the interfacae for the Limiter interfacae --- glide.lock | 14 +++++++++++--- ratelimit.go | 27 ++++++++++++++++++--------- ratelimit_test.go | 45 +++++++++++++++++++++++++-------------------- 3 files changed, 54 insertions(+), 32 deletions(-) diff --git a/glide.lock b/glide.lock index 25b79ac..d910db8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,10 +1,18 @@ hash: 1a0062ef2d587d01d6f576a71957f65375c8bb6e241175c9568a1bb78e75e961 -updated: 2016-09-15T11:05:34.222002397-07:00 +updated: 2017-10-04T21:35:24.107303757-07:00 imports: [] testImports: +- name: github.com/davecgh/go-spew + version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib - name: github.com/stretchr/testify - version: d77da356e56a7428ad25149ca77381849a6a5232 + version: 890a5c3458b43e6104ff5da8dfa139d013d77544 subpackages: - assert - name: github.com/uber-go/atomic - version: 0c9e689d64f004564b79d9a663634756df322902 + version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 diff --git a/ratelimit.go b/ratelimit.go index fc06ef0..1b202fc 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -25,6 +25,8 @@ import ( "time" "go.uber.org/ratelimit/internal/clock" + "context" + "math" ) // Note: This file is inspired by: @@ -35,7 +37,7 @@ import ( // may block to throttle the goroutine. type Limiter interface { // Take should block to make sure that the RPS is met. - Take() time.Time + Take(context.Context) bool } // Clock is the minimum necessary interface to instantiate a rate limiter with @@ -49,6 +51,7 @@ type Clock interface { type limiter struct { sync.Mutex last time.Time + timer *time.Timer sleepFor time.Duration perRequest time.Duration maxSlack time.Duration @@ -63,6 +66,7 @@ func New(rate int, opts ...Option) Limiter { l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: -10 * time.Second / time.Duration(rate), + timer: time.NewTimer(time.Duration(math.MaxInt64)), } for _, opt := range opts { opt(l) @@ -91,7 +95,7 @@ func withoutSlackOption(l *limiter) { // Take blocks to ensure that the time spent between multiple // Take calls is on average time.Second/rate. -func (t *limiter) Take() time.Time { +func (t *limiter) Take(ctx context.Context) bool { t.Lock() defer t.Unlock() @@ -100,7 +104,7 @@ func (t *limiter) Take() time.Time { // If this is our first request, then we allow it. if t.last.IsZero() { t.last = now - return t.last + return true } // sleepFor calculates how much time we should sleep based on @@ -108,6 +112,7 @@ func (t *limiter) Take() time.Time { // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. t.sleepFor += t.perRequest - now.Sub(t.last) + t.last = now // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get @@ -118,14 +123,18 @@ func (t *limiter) Take() time.Time { // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { - t.clock.Sleep(t.sleepFor) + t.timer.Reset(t.sleepFor) + select { + case <-t.timer.C: + case <-ctx.Done(): + return false + } + t.last = now.Add(t.sleepFor) t.sleepFor = 0 - } else { - t.last = now } - return t.last + return true } type unlimited struct{} @@ -135,6 +144,6 @@ func NewUnlimited() Limiter { return unlimited{} } -func (unlimited) Take() time.Time { - return time.Now() +func (unlimited) Take(_ context.Context) bool{ + return true } diff --git a/ratelimit_test.go b/ratelimit_test.go index 0728e94..00919be 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -11,14 +11,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/uber-go/atomic" + "context" ) func ExampleRatelimit() { rl := ratelimit.New(100) // per second prev := time.Now() + ctx := context.Background() for i := 0; i < 10; i++ { - now := rl.Take() + rl.Take(ctx) + now := time.Now() + if i > 0 { fmt.Println(i, now.Sub(prev)) } @@ -40,8 +44,9 @@ func ExampleRatelimit() { func TestUnlimited(t *testing.T) { now := time.Now() rl := ratelimit.NewUnlimited() + ctx := context.Background() for i := 0; i < 1000; i++ { - rl.Take() + rl.Take(ctx) } assert.Condition(t, func() bool { return time.Now().Sub(now) < 1*time.Millisecond }, "no artificial delay") } @@ -57,14 +62,14 @@ func TestRateLimiter(t *testing.T) { count := atomic.NewInt32(0) // Until we're done... - done := make(chan struct{}) - defer close(done) + ctx, done := context.WithCancel(context.Background()) + defer done() // Create copious counts concurrently. - go job(rl, count, done) - go job(rl, count, done) - go job(rl, count, done) - go job(rl, count, done) + go job(rl, count, ctx) + go job(rl, count, ctx) + go job(rl, count, ctx) + go job(rl, count, ctx) clock.AfterFunc(1*time.Second, func() { assert.InDelta(t, 100, count.Load(), 10, "count within rate limit") @@ -96,17 +101,17 @@ func TestDelayedRateLimiter(t *testing.T) { count := atomic.NewInt32(0) // Until we're done... - done := make(chan struct{}) - defer close(done) + ctx, done := context.WithCancel(context.Background()) + defer done() // Run a slow job go func() { for { - slow.Take() - fast.Take() + slow.Take(ctx) + fast.Take(ctx) count.Inc() select { - case <-done: + case <-ctx.Done(): return default: } @@ -116,10 +121,10 @@ func TestDelayedRateLimiter(t *testing.T) { // Accumulate slack for 10 seconds, clock.AfterFunc(20*time.Second, func() { // Then start working. - go job(fast, count, done) - go job(fast, count, done) - go job(fast, count, done) - go job(fast, count, done) + go job(fast, count, ctx) + go job(fast, count, ctx) + go job(fast, count, ctx) + go job(fast, count, ctx) }) clock.AfterFunc(30*time.Second, func() { @@ -130,12 +135,12 @@ func TestDelayedRateLimiter(t *testing.T) { clock.Add(40 * time.Second) } -func job(rl ratelimit.Limiter, count *atomic.Int32, done <-chan struct{}) { +func job(rl ratelimit.Limiter, count *atomic.Int32, ctx context.Context) { for { - rl.Take() + rl.Take(ctx) count.Inc() select { - case <-done: + case <-ctx.Done(): return default: } From 593774ba316f64402e713f0c388d9153d1de1718 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 9 Oct 2017 14:41:08 -0700 Subject: [PATCH 2/4] Update the ratelimit interface to accept a context.Context object, remove the Clock abstraction --- ratelimit.go | 26 +++----------------------- ratelimit_test.go | 33 ++++++++++++--------------------- 2 files changed, 15 insertions(+), 44 deletions(-) diff --git a/ratelimit.go b/ratelimit.go index 1b202fc..7de9cb1 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -24,7 +24,6 @@ import ( "sync" "time" - "go.uber.org/ratelimit/internal/clock" "context" "math" ) @@ -40,14 +39,6 @@ type Limiter interface { Take(context.Context) bool } -// Clock is the minimum necessary interface to instantiate a rate limiter with -// a clock or mock clock, compatible with clocks created using -// github.com/andres-erbsen/clock. -type Clock interface { - Now() time.Time - Sleep(time.Duration) -} - type limiter struct { sync.Mutex last time.Time @@ -55,7 +46,6 @@ type limiter struct { sleepFor time.Duration perRequest time.Duration maxSlack time.Duration - clock Clock } // Option configures a Limiter. @@ -71,20 +61,9 @@ func New(rate int, opts ...Option) Limiter { for _, opt := range opts { opt(l) } - if l.clock == nil { - l.clock = clock.New() - } return l } -// WithClock returns an option for ratelimit.New that provides an alternate -// Clock implementation, typically a mock Clock for testing. -func WithClock(clock Clock) Option { - return func(l *limiter) { - l.clock = clock - } -} - // WithoutSlack is an option for ratelimit.New that initializes the limiter // without any initial tolerance for bursts of traffic. var WithoutSlack Option = withoutSlackOption @@ -99,7 +78,7 @@ func (t *limiter) Take(ctx context.Context) bool { t.Lock() defer t.Unlock() - now := t.clock.Now() + now := time.Now() // If this is our first request, then we allow it. if t.last.IsZero() { @@ -123,6 +102,7 @@ func (t *limiter) Take(ctx context.Context) bool { // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { + t.timer.Stop() t.timer.Reset(t.sleepFor) select { case <-t.timer.C: @@ -144,6 +124,6 @@ func NewUnlimited() Limiter { return unlimited{} } -func (unlimited) Take(_ context.Context) bool{ +func (unlimited) Take(_ context.Context) bool { return true } diff --git a/ratelimit_test.go b/ratelimit_test.go index 00919be..8ae3fa7 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -5,13 +5,12 @@ import ( "sync" "testing" "time" + "context" "go.uber.org/ratelimit" - "go.uber.org/ratelimit/internal/clock" "github.com/stretchr/testify/assert" "github.com/uber-go/atomic" - "context" ) func ExampleRatelimit() { @@ -56,14 +55,12 @@ func TestRateLimiter(t *testing.T) { wg.Add(1) defer wg.Wait() - clock := clock.NewMock() - rl := ratelimit.New(100, ratelimit.WithClock(clock), ratelimit.WithoutSlack) + rl := ratelimit.New(100, ratelimit.WithoutSlack) count := atomic.NewInt32(0) // Until we're done... - ctx, done := context.WithCancel(context.Background()) - defer done() + ctx := context.Background() // Create copious counts concurrently. go job(rl, count, ctx) @@ -71,22 +68,20 @@ func TestRateLimiter(t *testing.T) { go job(rl, count, ctx) go job(rl, count, ctx) - clock.AfterFunc(1*time.Second, func() { + time.AfterFunc(1*time.Second, func() { assert.InDelta(t, 100, count.Load(), 10, "count within rate limit") }) - clock.AfterFunc(2*time.Second, func() { + time.AfterFunc(2*time.Second, func() { assert.InDelta(t, 200, count.Load(), 10, "count within rate limit") }) - clock.AfterFunc(3*time.Second, func() { + time.AfterFunc(3*time.Second, func() { assert.InDelta(t, 300, count.Load(), 10, "count within rate limit") wg.Done() }) - clock.Add(4 * time.Second) - - clock.Add(5 * time.Second) + <-time.NewTimer(10 * time.Second).C } func TestDelayedRateLimiter(t *testing.T) { @@ -94,15 +89,13 @@ func TestDelayedRateLimiter(t *testing.T) { wg.Add(1) defer wg.Wait() - clock := clock.NewMock() - slow := ratelimit.New(10, ratelimit.WithClock(clock)) - fast := ratelimit.New(100, ratelimit.WithClock(clock)) + slow := ratelimit.New(10) + fast := ratelimit.New(100) count := atomic.NewInt32(0) // Until we're done... - ctx, done := context.WithCancel(context.Background()) - defer done() + ctx := context.Background() // Run a slow job go func() { @@ -119,7 +112,7 @@ func TestDelayedRateLimiter(t *testing.T) { }() // Accumulate slack for 10 seconds, - clock.AfterFunc(20*time.Second, func() { + time.AfterFunc(20*time.Second, func() { // Then start working. go job(fast, count, ctx) go job(fast, count, ctx) @@ -127,12 +120,10 @@ func TestDelayedRateLimiter(t *testing.T) { go job(fast, count, ctx) }) - clock.AfterFunc(30*time.Second, func() { + time.AfterFunc(30*time.Second, func() { assert.InDelta(t, 1200, count.Load(), 10, "count within rate limit") wg.Done() }) - - clock.Add(40 * time.Second) } func job(rl ratelimit.Limiter, count *atomic.Int32, ctx context.Context) { From 83d8d3e0a94d5b998056b82ec52d23538b4a801d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 9 Oct 2017 14:48:02 -0700 Subject: [PATCH 3/4] Remove artifical test delay --- ratelimit_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ratelimit_test.go b/ratelimit_test.go index 8ae3fa7..298879d 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -80,8 +80,6 @@ func TestRateLimiter(t *testing.T) { assert.InDelta(t, 300, count.Load(), 10, "count within rate limit") wg.Done() }) - - <-time.NewTimer(10 * time.Second).C } func TestDelayedRateLimiter(t *testing.T) { From 12880b7daeedbae15e46d79b5ab9aa08f803aadd Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 24 Oct 2017 14:47:09 -0700 Subject: [PATCH 4/4] Deal with race condition for timer.Stop --- ratelimit.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ratelimit.go b/ratelimit.go index 7de9cb1..823dfe1 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -102,7 +102,9 @@ func (t *limiter) Take(ctx context.Context) bool { // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { - t.timer.Stop() + if !t.timer.Stop() { + <-t.timer.C + } t.timer.Reset(t.sleepFor) select { case <-t.timer.C: