Skip to content

Commit

Permalink
beater/api/intake: fix rate limiting (#5518)
Browse files Browse the repository at this point in the history
* beater/api/intake: fix rate limiting

Copy the batch processor in the closure before
copying, to avoid wrapping it multiple times.

* systemtest: fix rate limiting test

The test was wrong, and only passed because of
the bug that has been fixed. We should be using
up the per-IP event limits on the first two
requests, but weren't taking into account the
burst multiplier (limit x 3).
  • Loading branch information
axw authored Jun 23, 2021
1 parent 92335c5 commit 97a6f9b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
2 changes: 2 additions & 0 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

// copy batchProcessor to avoid updating closure below
batchProcessor := batchProcessor
if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok {
// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
Expand Down
26 changes: 26 additions & 0 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ func TestRateLimiting(t *testing.T) {
}
}

func TestRateLimitingRequests(t *testing.T) {
// Check that rate limiting across multiple requests is handled correctly.
//
// ratelimit.ndjson contains 19 events, and we rate limit in batches of 10
// events. The burst of 41 should be enough for 2 iterations with one left.
limiter := rate.NewLimiter(1, 41)
processor := stream.BackendProcessor(config.DefaultConfig())
handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{})

data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
for i := 0; i < 2; i++ {
r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data))
r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter))
r.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()
c := request.NewContext()
c.Reset(w, r)
handler(c)
assert.Equal(t, http.StatusAccepted, w.Code)
}
assert.True(t, limiter.Allow())
assert.False(t, limiter.Allow())
}

type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
Expand Down
6 changes: 2 additions & 4 deletions systemtest/rum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,11 @@ func TestRUMRateLimit(t *testing.T) {
// Just check that rate limiting is wired up. More specific rate limiting scenarios are unit tested.

var g errgroup.Group
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) })
assert.NoError(t, g.Wait())

g = errgroup.Group{}
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) })
err = g.Wait()
require.Error(t, err)
Expand Down

0 comments on commit 97a6f9b

Please sign in to comment.