Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beater/api/intake: fix rate limiting #5518

Merged
merged 2 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

// copy batchProcessor to avoid updating closure below
batchProcessor := batchProcessor
if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok {
// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
Expand Down
26 changes: 26 additions & 0 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ func TestRateLimiting(t *testing.T) {
}
}

func TestRateLimitingRequests(t *testing.T) {
// Check that rate limiting across multiple requests is handled correctly.
//
// ratelimit.ndjson contains 19 events, and we rate limit in batches of 10
// events. The burst of 41 should be enough for 2 iterations with one left.
limiter := rate.NewLimiter(1, 41)
processor := stream.BackendProcessor(config.DefaultConfig())
handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{})

data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
for i := 0; i < 2; i++ {
r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data))
r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter))
r.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()
c := request.NewContext()
c.Reset(w, r)
handler(c)
assert.Equal(t, http.StatusAccepted, w.Code)
}
assert.True(t, limiter.Allow())
assert.False(t, limiter.Allow())
}

type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
Expand Down
6 changes: 2 additions & 4 deletions systemtest/rum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,11 @@ func TestRUMRateLimit(t *testing.T) {
// Just check that rate limiting is wired up. More specific rate limiting scenarios are unit tested.

var g errgroup.Group
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) })
assert.NoError(t, g.Wait())

g = errgroup.Group{}
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) })
g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) })
err = g.Wait()
require.Error(t, err)
Expand Down