Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.14] beater/api/intake: fix rate limiting (backport #5518) #5667

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

<<<<<<< HEAD
=======
// copy batchProcessor to avoid updating closure below
batchProcessor := batchProcessor
if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok {
// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
rateLimitBatchProcessor(limiter, batchSize),
batchProcessor,
}
}

>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
reader, err := decoder.CompressedRequestReader(c.Request)
if err != nil {
writeError(c, compressedRequestReaderError{err})
Expand Down
90 changes: 90 additions & 0 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,96 @@ func TestIntakeHandler(t *testing.T) {
}
}

<<<<<<< HEAD
=======
func TestRateLimiting(t *testing.T) {
type test struct {
limiter *rate.Limiter
preconsumed int
expectLimited bool
}

for name, test := range map[string]test{
"LimiterAllowAll": {
limiter: rate.NewLimiter(rate.Limit(40), 40*5),
expectLimited: false,
},
"LimiterPartiallyUsedLimitAllow": {
limiter: rate.NewLimiter(rate.Limit(10), 10*2),
preconsumed: 10,
expectLimited: false,
},
"LimiterDenyAll": {
limiter: rate.NewLimiter(rate.Limit(0), 2),
expectLimited: true,
},
"LimiterPartiallyUsedLimitDeny": {
limiter: rate.NewLimiter(rate.Limit(7), 7*2),
preconsumed: 10,
expectLimited: true,
},
"LimiterDeny": {
limiter: rate.NewLimiter(rate.Limit(6), 6*2),
expectLimited: true,
},
} {
t.Run(name, func(t *testing.T) {
var tc testcaseIntakeHandler
tc.path = "ratelimit.ndjson"
tc.setup(t)

tc.c.Request = tc.c.Request.WithContext(
ratelimit.ContextWithLimiter(tc.c.Request.Context(), test.limiter),
)
if test.preconsumed > 0 {
test.limiter.AllowN(time.Now(), test.preconsumed)
}

h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor)
h(tc.c)

if test.expectLimited {
assert.Equal(t, request.IDResponseErrorsRateLimit, tc.c.Result.ID)
assert.Equal(t, http.StatusTooManyRequests, tc.w.Code)
assert.Error(t, tc.c.Result.Err)
} else {
assert.Equal(t, request.IDResponseValidAccepted, tc.c.Result.ID)
assert.Equal(t, http.StatusAccepted, tc.w.Code)
assert.NoError(t, tc.c.Result.Err)
}
assert.NotZero(t, tc.w.Body.Len())
approvaltest.ApproveJSON(t, "test_approved/"+t.Name(), tc.w.Body.Bytes())
})
}
}

func TestRateLimitingRequests(t *testing.T) {
// Check that rate limiting across multiple requests is handled correctly.
//
// ratelimit.ndjson contains 19 events, and we rate limit in batches of 10
// events. The burst of 41 should be enough for 2 iterations with one left.
limiter := rate.NewLimiter(1, 41)
processor := stream.BackendProcessor(config.DefaultConfig())
handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{})

data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
for i := 0; i < 2; i++ {
r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data))
r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter))
r.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()
c := request.NewContext()
c.Reset(w, r)
handler(c)
assert.Equal(t, http.StatusAccepted, w.Code)
}
assert.True(t, limiter.Allow())
assert.False(t, limiter.Allow())
}

>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
Expand Down
11 changes: 11 additions & 0 deletions systemtest/rum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestRUMRateLimit(t *testing.T) {
err = sendEvents("10.11.12.13", 3*srv.Config.RUM.RateLimit.EventLimit)
assert.NoError(t, err)

<<<<<<< HEAD
// Sending the events over multiple requests should have the same outcome.
for i := 0; i < 3; i++ {
err = sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit)
Expand All @@ -171,6 +172,16 @@ func TestRUMRateLimit(t *testing.T) {
// The rate limiter cache only has space for 2 IPs, so the 3rd one reuses an existing
// limiter, which will have already been exhausted.
err = sendEvents("10.11.12.15", 10)
=======
var g errgroup.Group
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) })
assert.NoError(t, g.Wait())

g = errgroup.Group{}
g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) })
err = g.Wait()
>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
require.Error(t, err)

// The exact error differs, depending on whether rate limiting was applied at the request
Expand Down