diff --git a/glide.lock b/glide.lock index 25b79ac..d910db8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,10 +1,18 @@ hash: 1a0062ef2d587d01d6f576a71957f65375c8bb6e241175c9568a1bb78e75e961 -updated: 2016-09-15T11:05:34.222002397-07:00 +updated: 2017-10-04T21:35:24.107303757-07:00 imports: [] testImports: +- name: github.com/davecgh/go-spew + version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib - name: github.com/stretchr/testify - version: d77da356e56a7428ad25149ca77381849a6a5232 + version: 890a5c3458b43e6104ff5da8dfa139d013d77544 subpackages: - assert - name: github.com/uber-go/atomic - version: 0c9e689d64f004564b79d9a663634756df322902 + version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 diff --git a/ratelimit.go b/ratelimit.go index fc06ef0..823dfe1 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -24,7 +24,8 @@ import ( "sync" "time" - "go.uber.org/ratelimit/internal/clock" + "context" + "math" ) // Note: This file is inspired by: @@ -35,24 +36,16 @@ import ( // may block to throttle the goroutine. type Limiter interface { // Take should block to make sure that the RPS is met. - Take() time.Time -} - -// Clock is the minimum necessary interface to instantiate a rate limiter with -// a clock or mock clock, compatible with clocks created using -// github.com/andres-erbsen/clock. -type Clock interface { - Now() time.Time - Sleep(time.Duration) + Take(context.Context) bool } type limiter struct { sync.Mutex last time.Time + timer *time.Timer sleepFor time.Duration perRequest time.Duration maxSlack time.Duration - clock Clock } // Option configures a Limiter. @@ -63,24 +56,14 @@ func New(rate int, opts ...Option) Limiter { l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: -10 * time.Second / time.Duration(rate), + timer: time.NewTimer(time.Duration(math.MaxInt64)), } for _, opt := range opts { opt(l) } - if l.clock == nil { - l.clock = clock.New() - } return l } -// WithClock returns an option for ratelimit.New that provides an alternate -// Clock implementation, typically a mock Clock for testing. -func WithClock(clock Clock) Option { - return func(l *limiter) { - l.clock = clock - } -} - // WithoutSlack is an option for ratelimit.New that initializes the limiter // without any initial tolerance for bursts of traffic. var WithoutSlack Option = withoutSlackOption @@ -91,16 +74,16 @@ func withoutSlackOption(l *limiter) { // Take blocks to ensure that the time spent between multiple // Take calls is on average time.Second/rate. -func (t *limiter) Take() time.Time { +func (t *limiter) Take(ctx context.Context) bool { t.Lock() defer t.Unlock() - now := t.clock.Now() + now := time.Now() // If this is our first request, then we allow it. if t.last.IsZero() { t.last = now - return t.last + return true } // sleepFor calculates how much time we should sleep based on @@ -108,6 +91,7 @@ func (t *limiter) Take() time.Time { // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. t.sleepFor += t.perRequest - now.Sub(t.last) + t.last = now // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get @@ -118,14 +102,21 @@ func (t *limiter) Take() time.Time { // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { - t.clock.Sleep(t.sleepFor) + if !t.timer.Stop() { + <-t.timer.C + } + t.timer.Reset(t.sleepFor) + select { + case <-t.timer.C: + case <-ctx.Done(): + return false + } + t.last = now.Add(t.sleepFor) t.sleepFor = 0 - } else { - t.last = now } - return t.last + return true } type unlimited struct{} @@ -135,6 +126,6 @@ func NewUnlimited() Limiter { return unlimited{} } -func (unlimited) Take() time.Time { - return time.Now() +func (unlimited) Take(_ context.Context) bool { + return true } diff --git a/ratelimit_test.go b/ratelimit_test.go index 0728e94..298879d 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -5,9 +5,9 @@ import ( "sync" "testing" "time" + "context" "go.uber.org/ratelimit" - "go.uber.org/ratelimit/internal/clock" "github.com/stretchr/testify/assert" "github.com/uber-go/atomic" @@ -17,8 +17,11 @@ func ExampleRatelimit() { rl := ratelimit.New(100) // per second prev := time.Now() + ctx := context.Background() for i := 0; i < 10; i++ { - now := rl.Take() + rl.Take(ctx) + now := time.Now() + if i > 0 { fmt.Println(i, now.Sub(prev)) } @@ -40,8 +43,9 @@ func ExampleRatelimit() { func TestUnlimited(t *testing.T) { now := time.Now() rl := ratelimit.NewUnlimited() + ctx := context.Background() for i := 0; i < 1000; i++ { - rl.Take() + rl.Take(ctx) } assert.Condition(t, func() bool { return time.Now().Sub(now) < 1*time.Millisecond }, "no artificial delay") } @@ -51,37 +55,31 @@ func TestRateLimiter(t *testing.T) { wg.Add(1) defer wg.Wait() - clock := clock.NewMock() - rl := ratelimit.New(100, ratelimit.WithClock(clock), ratelimit.WithoutSlack) + rl := ratelimit.New(100, ratelimit.WithoutSlack) count := atomic.NewInt32(0) // Until we're done... - done := make(chan struct{}) - defer close(done) + ctx := context.Background() // Create copious counts concurrently. - go job(rl, count, done) - go job(rl, count, done) - go job(rl, count, done) - go job(rl, count, done) + go job(rl, count, ctx) + go job(rl, count, ctx) + go job(rl, count, ctx) + go job(rl, count, ctx) - clock.AfterFunc(1*time.Second, func() { + time.AfterFunc(1*time.Second, func() { assert.InDelta(t, 100, count.Load(), 10, "count within rate limit") }) - clock.AfterFunc(2*time.Second, func() { + time.AfterFunc(2*time.Second, func() { assert.InDelta(t, 200, count.Load(), 10, "count within rate limit") }) - clock.AfterFunc(3*time.Second, func() { + time.AfterFunc(3*time.Second, func() { assert.InDelta(t, 300, count.Load(), 10, "count within rate limit") wg.Done() }) - - clock.Add(4 * time.Second) - - clock.Add(5 * time.Second) } func TestDelayedRateLimiter(t *testing.T) { @@ -89,24 +87,22 @@ func TestDelayedRateLimiter(t *testing.T) { wg.Add(1) defer wg.Wait() - clock := clock.NewMock() - slow := ratelimit.New(10, ratelimit.WithClock(clock)) - fast := ratelimit.New(100, ratelimit.WithClock(clock)) + slow := ratelimit.New(10) + fast := ratelimit.New(100) count := atomic.NewInt32(0) // Until we're done... - done := make(chan struct{}) - defer close(done) + ctx := context.Background() // Run a slow job go func() { for { - slow.Take() - fast.Take() + slow.Take(ctx) + fast.Take(ctx) count.Inc() select { - case <-done: + case <-ctx.Done(): return default: } @@ -114,28 +110,26 @@ func TestDelayedRateLimiter(t *testing.T) { }() // Accumulate slack for 10 seconds, - clock.AfterFunc(20*time.Second, func() { + time.AfterFunc(20*time.Second, func() { // Then start working. - go job(fast, count, done) - go job(fast, count, done) - go job(fast, count, done) - go job(fast, count, done) + go job(fast, count, ctx) + go job(fast, count, ctx) + go job(fast, count, ctx) + go job(fast, count, ctx) }) - clock.AfterFunc(30*time.Second, func() { + time.AfterFunc(30*time.Second, func() { assert.InDelta(t, 1200, count.Load(), 10, "count within rate limit") wg.Done() }) - - clock.Add(40 * time.Second) } -func job(rl ratelimit.Limiter, count *atomic.Int32, done <-chan struct{}) { +func job(rl ratelimit.Limiter, count *atomic.Int32, ctx context.Context) { for { - rl.Take() + rl.Take(ctx) count.Inc() select { - case <-done: + case <-ctx.Done(): return default: }