Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the Limiter interface to accept a context.Context object #11

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 22 additions & 31 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"sync"
"time"

"go.uber.org/ratelimit/internal/clock"
"context"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no blank between stdlib imports

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still see a blank between "time" and "context"

"math"
)

// Note: This file is inspired by:
Expand All @@ -35,24 +36,16 @@ import (
// may block to throttle the goroutine.
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}

// Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
Now() time.Time
Sleep(time.Duration)
Take(context.Context) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the bool argument mean?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, Take returns a True if it was unblocked without any cancellations from the passed in context, and False if it was unblocked because of the context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the documentation to make that part of the API

I'm also not sure how valuable it is, since the caller can just check ctx.Err() != nil

What is the value in having this in the API?

}

type limiter struct {
sync.Mutex
last time.Time
timer *time.Timer
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}

// Option configures a Limiter.
Expand All @@ -63,24 +56,14 @@ func New(rate int, opts ...Option) Limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
timer: time.NewTimer(time.Duration(math.MaxInt64)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than use a max duration, should we create a timer, stop it (maybe in init) and use that everywhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also leave the timer field as nil within the struct and have a check for it in Take & set it when needed - but I'm not sure if there are any performance implications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nil check works too, but less branches is always nice, so might be better to have an expired timer.

}
for _, opt := range opts {
opt(l)
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}

// WithClock returns an option for ratelimit.New that provides an alternate
// Clock implementation, typically a mock Clock for testing.
func WithClock(clock Clock) Option {
return func(l *limiter) {
l.clock = clock
}
}

// WithoutSlack is an option for ratelimit.New that initializes the limiter
// without any initial tolerance for bursts of traffic.
var WithoutSlack Option = withoutSlackOption
Expand All @@ -91,23 +74,24 @@ func withoutSlackOption(l *limiter) {

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
func (t *limiter) Take(ctx context.Context) bool {
t.Lock()
defer t.Unlock()

now := t.clock.Now()
now := time.Now()

// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
return true
}

// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)
t.last = now

// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
Expand All @@ -118,14 +102,21 @@ func (t *limiter) Take() time.Time {

// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
if !t.timer.Stop() {
<-t.timer.C
}
t.timer.Reset(t.sleepFor)
select {
case <-t.timer.C:
case <-ctx.Done():
return false
}

t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}

return t.last
return true
}

type unlimited struct{}
Expand All @@ -135,6 +126,6 @@ func NewUnlimited() Limiter {
return unlimited{}
}

func (unlimited) Take() time.Time {
return time.Now()
func (unlimited) Take(_ context.Context) bool {
return true
}
68 changes: 31 additions & 37 deletions ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"sync"
"testing"
"time"
"context"

"go.uber.org/ratelimit"
"go.uber.org/ratelimit/internal/clock"

"github.com/stretchr/testify/assert"
"github.com/uber-go/atomic"
Expand All @@ -17,8 +17,11 @@ func ExampleRatelimit() {
rl := ratelimit.New(100) // per second

prev := time.Now()
ctx := context.Background()
for i := 0; i < 10; i++ {
now := rl.Take()
rl.Take(ctx)
now := time.Now()

if i > 0 {
fmt.Println(i, now.Sub(prev))
}
Expand All @@ -40,8 +43,9 @@ func ExampleRatelimit() {
func TestUnlimited(t *testing.T) {
now := time.Now()
rl := ratelimit.NewUnlimited()
ctx := context.Background()
for i := 0; i < 1000; i++ {
rl.Take()
rl.Take(ctx)
}
assert.Condition(t, func() bool { return time.Now().Sub(now) < 1*time.Millisecond }, "no artificial delay")
}
Expand All @@ -51,91 +55,81 @@ func TestRateLimiter(t *testing.T) {
wg.Add(1)
defer wg.Wait()

clock := clock.NewMock()
rl := ratelimit.New(100, ratelimit.WithClock(clock), ratelimit.WithoutSlack)
rl := ratelimit.New(100, ratelimit.WithoutSlack)

count := atomic.NewInt32(0)

// Until we're done...
done := make(chan struct{})
defer close(done)
ctx := context.Background()

// Create copious counts concurrently.
go job(rl, count, done)
go job(rl, count, done)
go job(rl, count, done)
go job(rl, count, done)
go job(rl, count, ctx)
go job(rl, count, ctx)
go job(rl, count, ctx)
go job(rl, count, ctx)

clock.AfterFunc(1*time.Second, func() {
time.AfterFunc(1*time.Second, func() {
assert.InDelta(t, 100, count.Load(), 10, "count within rate limit")
})

clock.AfterFunc(2*time.Second, func() {
time.AfterFunc(2*time.Second, func() {
assert.InDelta(t, 200, count.Load(), 10, "count within rate limit")
})

clock.AfterFunc(3*time.Second, func() {
time.AfterFunc(3*time.Second, func() {
assert.InDelta(t, 300, count.Load(), 10, "count within rate limit")
wg.Done()
})

clock.Add(4 * time.Second)

clock.Add(5 * time.Second)
}

func TestDelayedRateLimiter(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()

clock := clock.NewMock()
slow := ratelimit.New(10, ratelimit.WithClock(clock))
fast := ratelimit.New(100, ratelimit.WithClock(clock))
slow := ratelimit.New(10)
fast := ratelimit.New(100)

count := atomic.NewInt32(0)

// Until we're done...
done := make(chan struct{})
defer close(done)
ctx := context.Background()

// Run a slow job
go func() {
for {
slow.Take()
fast.Take()
slow.Take(ctx)
fast.Take(ctx)
count.Inc()
select {
case <-done:
case <-ctx.Done():
return
default:
}
}
}()

// Accumulate slack for 10 seconds,
clock.AfterFunc(20*time.Second, func() {
time.AfterFunc(20*time.Second, func() {
// Then start working.
go job(fast, count, done)
go job(fast, count, done)
go job(fast, count, done)
go job(fast, count, done)
go job(fast, count, ctx)
go job(fast, count, ctx)
go job(fast, count, ctx)
go job(fast, count, ctx)
})

clock.AfterFunc(30*time.Second, func() {
time.AfterFunc(30*time.Second, func() {
assert.InDelta(t, 1200, count.Load(), 10, "count within rate limit")
wg.Done()
})

clock.Add(40 * time.Second)
}

func job(rl ratelimit.Limiter, count *atomic.Int32, done <-chan struct{}) {
func job(rl ratelimit.Limiter, count *atomic.Int32, ctx context.Context) {
for {
rl.Take()
rl.Take(ctx)
count.Inc()
select {
case <-done:
case <-ctx.Done():
return
default:
}
Expand Down